From d0e22ad6d30d55fa9818967a1dd78963f660a614 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 14:44:04 +0530 Subject: [PATCH 1/8] restore unique constraint on logical date and make it nullable --- .../0032_3_0_0_drop_execution_date_unique.py | 28 +++++++++++++++++-- airflow/models/dagrun.py | 3 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 1 - docs/apache-airflow/migrations-ref.rst | 2 +- 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index 399cc8aff91f3..023df3d77bedf 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -17,9 +17,9 @@ # under the License. """ -Drop ``execution_date`` unique constraint on DagRun. +Make logical_date nullable. -The column has also been renamed to logical_date, although the Python model is +The column has been renamed to logical_date, although the Python model is not changed. This allows us to not need to fix all the Python code at once, but still do the two changes in one migration instead of two. @@ -49,11 +49,21 @@ def upgrade(): "execution_date", new_column_name="logical_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=False, + existing_nullable=True, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.create_unique_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.create_unique_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) + def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -63,6 +73,18 @@ def downgrade(): existing_type=TIMESTAMP(timezone=True), existing_nullable=False, ) + + with op.batch_alter_table("dag_run", schema=None) as batch_op: + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.drop_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) + with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 35d8af4322c49..826f2a6d4f18c 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -130,7 +130,7 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(StringID(), nullable=False) queued_at = Column(UtcDateTime) - logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=True) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) _state = Column("state", String(50), default=DagRunState.QUEUED) @@ -179,6 +179,7 @@ class DagRun(Base, LoggingMixin): __table_args__ = ( Index("dag_id_state", dag_id, _state), UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), + UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"), Index("idx_dag_run_dag_id", dag_id), Index( "idx_dag_run_running_dags", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index b058ae34b3783..52fe755bd20b1 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -eb25e0718c9382cdbb02368c9c3e29c90da06ddaba8e8e92d9fc53417b714039 \ No newline at end of file +4afe2ab3d7a9ab2f43e5355b6a720082c81f98f9100e074700e2b30600ce1a1d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 92ab9bfb7855c..21b055d371d5f 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1852,7 +1852,6 @@ logical_date [TIMESTAMP] - NOT NULL queued_at diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 62013ff8f799c..95572283cf800 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -86,7 +86,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. | +| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From 2b11aed1d773a1b6f5d1f2b8e4735f67673c46b2 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:06:06 +0530 Subject: [PATCH 2/8] restore unique constraint on logical date and make it nullable --- ...tion_date_to_logical_date_and_nullable.py} | 30 ++++++++----------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 14 insertions(+), 18 deletions(-) rename airflow/migrations/versions/{0032_3_0_0_drop_execution_date_unique.py => 0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py} (83%) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py similarity index 83% rename from airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py rename to airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 023df3d77bedf..175840b56de96 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -51,18 +51,17 @@ def upgrade(): existing_type=TIMESTAMP(timezone=True), existing_nullable=True, ) - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.create_unique_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) + batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_logical_date_key", columns=["dag_id", "logical_date"], ) + batch_op.create_unique_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) def downgrade(): @@ -75,17 +74,14 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) - batch_op.drop_constraint( - "dag_run_dag_id_logical_date_key", - columns=["dag_id", "logical_date"], - ) - - with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.drop_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", columns=["dag_id", "execution_date"], diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 52fe755bd20b1..39b515342f97c 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -4afe2ab3d7a9ab2f43e5355b6a720082c81f98f9100e074700e2b30600ce1a1d \ No newline at end of file +8134c08556ea8dd84625f77c5fa0e3f06dc75393026c07c53b60b364e697d08b \ No newline at end of file From 237360de42e244bf230dd732a66aa68f9168011d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:12:23 +0530 Subject: [PATCH 3/8] fix migration file --- ...name_execution_date_to_logical_date_and_nullable.py | 10 ++-------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 175840b56de96..1269cf77c5c28 100644 --- a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -74,14 +74,8 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) - batch_op.drop_constraint( - "dag_run_dag_id_logical_date_key", - columns=["dag_id", "logical_date"], - ) + batch_op.drop_constraint("dag_run_dag_id_run_id_key", type_="unique") + batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", columns=["dag_id", "execution_date"], diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 39b515342f97c..e515f06d962cb 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -8134c08556ea8dd84625f77c5fa0e3f06dc75393026c07c53b60b364e697d08b \ No newline at end of file +4517b7d80a3ec9653cec714d7959c961767e154117a2bd5c136c76545467b446 \ No newline at end of file From 18efafba30d482bfcfcc5ca8c1d929d22a41e529 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:36:17 +0530 Subject: [PATCH 4/8] fix migration file --- ...0_0_rename_execution_date_to_logical_date_and_nullable.py | 5 ----- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 1269cf77c5c28..b559bd2102485 100644 --- a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -58,10 +58,6 @@ def upgrade(): "dag_run_dag_id_logical_date_key", columns=["dag_id", "logical_date"], ) - batch_op.create_unique_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) def downgrade(): @@ -74,7 +70,6 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint("dag_run_dag_id_run_id_key", type_="unique") batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e515f06d962cb..ce2da0b1c30ae 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -4517b7d80a3ec9653cec714d7959c961767e154117a2bd5c136c76545467b446 \ No newline at end of file +1a1ce116676867566f504fdd398304ca1a2cb27cfa9f2e3799b2c2eb9cd38119 \ No newline at end of file From c3e29c77ffbff391596333d8a7a476097fc91df3 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 10:49:18 +0530 Subject: [PATCH 5/8] remove default date from logical date in dag run model --- airflow/models/dag.py | 5 +++-- airflow/models/dagrun.py | 12 +++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a0f6e901dc3e9..509602a75ffa8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1719,7 +1719,7 @@ def create_dagrun( self, *, run_id: str, - logical_date: datetime, + logical_date: datetime | None, data_interval: tuple[datetime, datetime], conf: dict | None = None, run_type: DagRunType, @@ -1743,7 +1743,8 @@ def create_dagrun( :meta private: """ - logical_date = timezone.coerce_datetime(logical_date) + if logical_date is not None: + logical_date = timezone.coerce_datetime(logical_date) if data_interval and not isinstance(data_interval, DataInterval): data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index c1f0c8d24e60e..c366a70e22142 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -130,7 +130,7 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(StringID(), nullable=False) queued_at = Column(UtcDateTime) - logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=True) + logical_date = Column(UtcDateTime, nullable=True) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) _state = Column("state", String(50), default=DagRunState.QUEUED) @@ -1305,8 +1305,14 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None: def task_filter(task: Operator) -> bool: return task.task_id not in task_ids and ( self.run_type == DagRunType.BACKFILL_JOB - or (task.start_date is None or task.start_date <= self.logical_date) - and (task.end_date is None or self.logical_date <= task.end_date) + or ( + task.start_date is None + or (self.logical_date is not None and task.start_date <= self.logical_date) + ) + and ( + task.end_date is None + or (self.logical_date is not None and self.logical_date <= task.end_date) + ) ) created_counts: dict[str, int] = defaultdict(int) From 2b4251d7e8658192721b9fecf3fb6b6a52a3d732 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 16:34:25 +0530 Subject: [PATCH 6/8] fix task_filter --- airflow/models/dagrun.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index c366a70e22142..c1ddc9a808270 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1307,12 +1307,10 @@ def task_filter(task: Operator) -> bool: self.run_type == DagRunType.BACKFILL_JOB or ( task.start_date is None - or (self.logical_date is not None and task.start_date <= self.logical_date) - ) - and ( - task.end_date is None - or (self.logical_date is not None and self.logical_date <= task.end_date) + or self.logical_date is None + or task.start_date <= self.logical_date ) + and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date) ) created_counts: dict[str, int] = defaultdict(int) From 7d6c9f4d094772e904cc5373930049cf5849968d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 20:10:13 +0530 Subject: [PATCH 7/8] fix tests --- tests/models/test_taskinstance.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index ab43b61cb95e9..4487fbf64413a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2303,7 +2303,13 @@ def test_outlet_assets(self, create_task_instance, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag1.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag1.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag1.get_task("producing_task_1") task.bash_command = "echo 1" # make it go faster @@ -2362,7 +2368,13 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle): dagbag.collect_dags(only_if_updated=False, safe_mode=False) dagbag.sync_to_db("testing", None, session=session) run_id = str(uuid4()) - dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_fail_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_fail_task.get_task("fail_task") ti = TaskInstance(task, run_id=run_id) @@ -2421,7 +2433,13 @@ def test_outlet_assets_skipped(self, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_skip_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_skip_task.get_task("skip_task") ti = TaskInstance(task, run_id=run_id) From 83b62bb7e55121c73995adc91968c11e919efe34 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 4 Feb 2025 09:57:25 +0530 Subject: [PATCH 8/8] implement review comments --- ..._0_0_rename_execution_date_to_logical_date_and_nullable.py | 4 ++-- airflow/models/dag.py | 3 +-- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index b559bd2102485..8a63d8112ac28 100644 --- a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -49,7 +49,7 @@ def upgrade(): "execution_date", new_column_name="logical_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=True, + nullable=True, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -66,7 +66,7 @@ def downgrade(): "logical_date", new_column_name="execution_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=False, + nullable=False, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 94da46d61e83a..8caa8a60586a3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1747,8 +1747,7 @@ def create_dagrun( :meta private: """ - if logical_date is not None: - logical_date = timezone.coerce_datetime(logical_date) + logical_date = timezone.coerce_datetime(logical_date) if data_interval and not isinstance(data_interval, DataInterval): data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 75c8d1f5a5b92..9a124863bdd4a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -c27770b242bc31b945e731d3b82844c02a1a9a4974061e55af1d957985741881 \ No newline at end of file +76818a684a0e05c1fd3ecee6c74b204c9a8d59b22966c62ba08089312fbd6ff4 \ No newline at end of file