Skip to content

Commit

Permalink
AIP-83: Restore Uniqueness Constraint on Logical Date, Make It Nullab…
Browse files Browse the repository at this point in the history
…le (#46295)

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
vatsrahul1001 and uranusjr authored Feb 6, 2025
1 parent f156ef3 commit 9aa3309
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# under the License.

"""
Drop ``execution_date`` unique constraint on DagRun.
Make logical_date nullable.
The column has also been renamed to logical_date, although the Python model is
The column has been renamed to logical_date, although the Python model is
not changed. This allows us to not need to fix all the Python code at once, but
still do the two changes in one migration instead of two.
Expand Down Expand Up @@ -49,10 +49,15 @@ def upgrade():
"execution_date",
new_column_name="logical_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
nullable=True,
)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_logical_date_key",
columns=["dag_id", "logical_date"],
)


def downgrade():
Expand All @@ -61,9 +66,11 @@ def downgrade():
"logical_date",
new_column_name="execution_date",
existing_type=TIMESTAMP(timezone=True),
existing_nullable=False,
nullable=False,
)

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique")
batch_op.create_unique_constraint(
"dag_run_dag_id_execution_date_key",
columns=["dag_id", "execution_date"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ def create_dagrun(
self,
*,
run_id: str,
logical_date: datetime,
logical_date: datetime | None,
data_interval: tuple[datetime, datetime],
run_after: datetime,
conf: dict | None = None,
Expand Down
11 changes: 8 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class DagRun(Base, LoggingMixin):
id = Column(Integer, primary_key=True)
dag_id = Column(StringID(), nullable=False)
queued_at = Column(UtcDateTime)
logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
logical_date = Column(UtcDateTime, nullable=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
_state = Column("state", String(50), default=DagRunState.QUEUED)
Expand Down Expand Up @@ -186,6 +186,7 @@ class DagRun(Base, LoggingMixin):
__table_args__ = (
Index("dag_id_state", dag_id, _state),
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"),
Index("idx_dag_run_dag_id", dag_id),
Index("idx_dag_run_run_after", run_after),
Index(
Expand Down Expand Up @@ -1321,8 +1322,12 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.run_type == DagRunType.BACKFILL_JOB
or (task.start_date is None or task.start_date <= self.logical_date)
and (task.end_date is None or self.logical_date <= task.end_date)
or (
task.start_date is None
or self.logical_date is None
or task.start_date <= self.logical_date
)
and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date)
)

created_counts: dict[str, int] = defaultdict(int)
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412
76818a684a0e05c1fd3ecee6c74b204c9a8d59b22966c62ba08089312fbd6ff4
1 change: 0 additions & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. |
| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
8 changes: 5 additions & 3 deletions tests/api_fastapi/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import datetime
import os

import pytest
Expand Down Expand Up @@ -56,12 +57,13 @@ def make_dag_with_multiple_versions(dag_maker):

for version_number in range(1, 4):
with dag_maker(dag_id) as dag:
for i in range(version_number):
EmptyOperator(task_id=f"task{i+1}")
for task_number in range(version_number):
EmptyOperator(task_id=f"task{task_number + 1}")
dag.sync_to_db()
SerializedDagModel.write_dag(dag, bundle_name="dag_maker")
dag_maker.create_dagrun(
run_id=f"run{i+1}",
run_id=f"run{version_number}",
logical_date=datetime.datetime(2020, 1, version_number, tzinfo=datetime.timezone.utc),
dag_version=DagVersion.get_version(dag_id=dag_id, version_number=version_number),
)

Expand Down
10 changes: 7 additions & 3 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _create_dag_run(session, num: int = 2):
dag_id="source_dag_id",
run_id=f"source_run_id_{i}",
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE + timedelta(days=i - 1),
start_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
external_trigger=True,
Expand Down Expand Up @@ -579,7 +579,9 @@ def test_should_respond_200(self, test_client, session):
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"logical_date": from_datetime_to_zulu_without_ms(
DEFAULT_DATE + timedelta(days=1),
),
"start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"state": "success",
Expand Down Expand Up @@ -747,7 +749,9 @@ def test_should_mask_sensitive_extra(self, test_client, session):
{
"run_id": "source_run_id_2",
"dag_id": "source_dag_id",
"logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"logical_date": from_datetime_to_zulu_without_ms(
DEFAULT_DATE + timedelta(days=1),
),
"start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"state": "success",
Expand Down
44 changes: 21 additions & 23 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio
)

@time_machine.travel(timezone.utcnow(), tick=False)
def test_should_response_200_for_duplicate_logical_date(self, test_client):
def test_should_response_409_for_duplicate_logical_date(self, test_client):
RUN_ID_1 = "random_1"
RUN_ID_2 = "random_2"
now = timezone.utcnow().isoformat().replace("+00:00", "Z")
Expand All @@ -1333,28 +1333,26 @@ def test_should_response_200_for_duplicate_logical_date(self, test_client):
json={"dag_run_id": RUN_ID_2, "note": note},
)

assert response_1.status_code == response_2.status_code == 200
body1 = response_1.json()
body2 = response_2.json()

for each_run_id, each_body in [(RUN_ID_1, body1), (RUN_ID_2, body2)]:
assert each_body == {
"dag_run_id": each_run_id,
"dag_id": DAG1_ID,
"logical_date": now,
"queued_at": now,
"start_date": None,
"end_date": None,
"data_interval_start": now,
"data_interval_end": now,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": note,
}
assert response_1.status_code == 200
assert response_1.json() == {
"dag_run_id": RUN_ID_1,
"dag_id": DAG1_ID,
"logical_date": now,
"queued_at": now,
"start_date": None,
"end_date": None,
"data_interval_start": now,
"data_interval_end": now,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": note,
}

assert response_2.status_code == 409

@pytest.mark.parametrize(
"data_interval_start, data_interval_end",
Expand Down
21 changes: 14 additions & 7 deletions tests/api_fastapi/core_api/routes/public/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import datetime as dt
import itertools
import os
from datetime import timedelta
from unittest import mock

import pendulum
Expand Down Expand Up @@ -225,7 +226,7 @@ def test_should_respond_200_with_versions(self, test_client, run_id, expected_ve
"dag_id": "dag_with_multiple_versions",
"dag_run_id": run_id,
"map_index": -1,
"logical_date": "2016-01-01T00:00:00Z",
"logical_date": mock.ANY,
"start_date": None,
"end_date": mock.ANY,
"duration": None,
Expand Down Expand Up @@ -1109,20 +1110,26 @@ def test_should_respond_200_for_dag_id_filter(self, test_client, session):
assert count == len(response.json()["task_instances"])

@pytest.mark.parametrize(
"order_by_field", ["start_date", "logical_date", "data_interval_start", "data_interval_end"]
"order_by_field, base_date",
[
("start_date", DEFAULT_DATETIME_1 + timedelta(days=20)),
("logical_date", DEFAULT_DATETIME_2),
("data_interval_start", DEFAULT_DATETIME_1 + timedelta(days=5)),
("data_interval_end", DEFAULT_DATETIME_2 + timedelta(days=8)),
],
)
def test_should_respond_200_for_order_by(self, order_by_field, test_client, session):
def test_should_respond_200_for_order_by(self, order_by_field, base_date, test_client, session):
dag_id = "example_python_operator"

dag_runs = [
DagRun(
dag_id=dag_id,
run_id=f"run_{i}",
run_type=DagRunType.MANUAL,
logical_date=DEFAULT_DATETIME_1 + dt.timedelta(days=i),
logical_date=base_date + dt.timedelta(days=i),
data_interval=(
DEFAULT_DATETIME_1 + dt.timedelta(days=i),
DEFAULT_DATETIME_1 + dt.timedelta(days=i, hours=1),
base_date + dt.timedelta(days=i),
base_date + dt.timedelta(days=i, hours=1),
),
)
for i in range(10)
Expand All @@ -1133,7 +1140,7 @@ def test_should_respond_200_for_order_by(self, order_by_field, test_client, sess
self.create_task_instances(
session,
task_instances=[
{"run_id": f"run_{i}", "start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))}
{"run_id": f"run_{i}", "start_date": base_date + dt.timedelta(minutes=(i + 1))}
for i in range(10)
],
dag_id=dag_id,
Expand Down
5 changes: 5 additions & 0 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
assert all(x.conf == expected_run_conf for x in dag_runs)


# Marking test xfail as backfill reprocess behaviour impacted by restoring logical date unique constraints in #46295
# TODO: Fix backfill reprocess behaviour as per #46295
@pytest.mark.xfail(
reason="Backfill reprocess behaviour impacted by restoring logical date unique constraints."
)
@pytest.mark.parametrize(
"reprocess_behavior, run_counts",
[
Expand Down
1 change: 1 addition & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session):

dr = dag_maker.create_dagrun(
run_id="test_dagrun_no_deadlock_1",
run_type=DagRunType.SCHEDULED,
start_date=DEFAULT_DATE,
)
dr2 = dag_maker.create_dagrun_after(
Expand Down
24 changes: 21 additions & 3 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2303,7 +2303,13 @@ def test_outlet_assets(self, create_task_instance, testing_dag_bundle):
session.flush()

run_id = str(uuid4())
dr = DagRun(dag1.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING)
dr = DagRun(
dag1.dag_id,
run_id=run_id,
run_type="manual",
state=DagRunState.RUNNING,
logical_date=timezone.utcnow(),
)
session.merge(dr)
task = dag1.get_task("producing_task_1")
task.bash_command = "echo 1" # make it go faster
Expand Down Expand Up @@ -2362,7 +2368,13 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle):
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag.sync_to_db("testing", None, session=session)
run_id = str(uuid4())
dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING)
dr = DagRun(
dag_with_fail_task.dag_id,
run_id=run_id,
run_type="manual",
state=DagRunState.RUNNING,
logical_date=timezone.utcnow(),
)
session.merge(dr)
task = dag_with_fail_task.get_task("fail_task")
ti = TaskInstance(task, run_id=run_id)
Expand Down Expand Up @@ -2421,7 +2433,13 @@ def test_outlet_assets_skipped(self, testing_dag_bundle):
session.flush()

run_id = str(uuid4())
dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING)
dr = DagRun(
dag_with_skip_task.dag_id,
run_id=run_id,
run_type="manual",
state=DagRunState.RUNNING,
logical_date=timezone.utcnow(),
)
session.merge(dr)
task = dag_with_skip_task.get_task("skip_task")
ti = TaskInstance(task, run_id=run_id)
Expand Down
3 changes: 3 additions & 0 deletions tests/www/views/test_views_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ def test_rendered_k8s_without_k8s(admin_client):


def test_tree_trigger_origin_tree_view(app, admin_client):
clear_db_runs()
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
Expand All @@ -414,6 +415,7 @@ def test_tree_trigger_origin_tree_view(app, admin_client):


def test_graph_trigger_origin_grid_view(app, admin_client):
clear_db_runs()
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
Expand All @@ -433,6 +435,7 @@ def test_graph_trigger_origin_grid_view(app, admin_client):


def test_gantt_trigger_origin_grid_view(app, admin_client):
clear_db_runs()
app.dag_bag.get_dag("example_bash_operator").create_dagrun(
run_id="test",
run_type=DagRunType.SCHEDULED,
Expand Down

0 comments on commit 9aa3309

Please sign in to comment.