diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py similarity index 84% rename from airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py rename to airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 399cc8aff91f3..8a63d8112ac28 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -17,9 +17,9 @@ # under the License. """ -Drop ``execution_date`` unique constraint on DagRun. +Make logical_date nullable. -The column has also been renamed to logical_date, although the Python model is +The column has been renamed to logical_date, although the Python model is not changed. This allows us to not need to fix all the Python code at once, but still do the two changes in one migration instead of two. @@ -49,10 +49,15 @@ def upgrade(): "execution_date", new_column_name="logical_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=False, + nullable=True, ) + with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") + batch_op.create_unique_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) def downgrade(): @@ -61,9 +66,11 @@ def downgrade(): "logical_date", new_column_name="execution_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=False, + nullable=False, ) + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", columns=["dag_id", "execution_date"], diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 8fae1604ed761..8caa8a60586a3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1722,7 +1722,7 @@ def create_dagrun( self, *, run_id: str, - logical_date: datetime, + logical_date: datetime | None, data_interval: tuple[datetime, datetime], run_after: datetime, conf: dict | None = None, diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 22757c972e953..619c03cc68a4e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -135,7 +135,7 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(StringID(), nullable=False) queued_at = Column(UtcDateTime) - logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + logical_date = Column(UtcDateTime, nullable=True) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) _state = Column("state", String(50), default=DagRunState.QUEUED) @@ -186,6 +186,7 @@ class DagRun(Base, LoggingMixin): __table_args__ = ( Index("dag_id_state", dag_id, _state), UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), + UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"), Index("idx_dag_run_dag_id", dag_id), Index("idx_dag_run_run_after", run_after), Index( @@ -1315,8 +1316,12 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None: def task_filter(task: Operator) -> bool: return task.task_id not in task_ids and ( self.run_type == DagRunType.BACKFILL_JOB - or (task.start_date is None or task.start_date <= self.logical_date) - and (task.end_date is None or self.logical_date <= task.end_date) + or ( + task.start_date is None + or self.logical_date is None + or task.start_date <= self.logical_date + ) + and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date) ) created_counts: dict[str, int] = defaultdict(int) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e3606c3476353..9a124863bdd4a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -232f2f252ce0d3889fa5a9ceb00c88788e12083a6ea0c155c74d3fe61ad02412 \ No newline at end of file +76818a684a0e05c1fd3ecee6c74b204c9a8d59b22966c62ba08089312fbd6ff4 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 7efa5c1512485..94a49bb5dba18 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1864,7 +1864,6 @@ logical_date [TIMESTAMP] - NOT NULL queued_at diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index c39bcfb3a55f7..027327806f316 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -92,7 +92,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. | +| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 12bccca74b815..2d889a791caf2 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -351,10 +351,11 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session): run_id="test_dagrun_no_deadlock_1", start_date=DEFAULT_DATE, ) - dr2 = dag_maker.create_dagrun_after( - dr, + next_date = DEFAULT_DATE + datetime.timedelta(days=1) + dr2 = dag_maker.create_dagrun( run_id="test_dagrun_no_deadlock_2", start_date=DEFAULT_DATE + datetime.timedelta(days=1), + logical_date=next_date, ) ti1_op1 = dr.get_task_instance(task_id="dop") dr2.get_task_instance(task_id="dop") diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index ab43b61cb95e9..4487fbf64413a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2303,7 +2303,13 @@ def test_outlet_assets(self, create_task_instance, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag1.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag1.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag1.get_task("producing_task_1") task.bash_command = "echo 1" # make it go faster @@ -2362,7 +2368,13 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle): dagbag.collect_dags(only_if_updated=False, safe_mode=False) dagbag.sync_to_db("testing", None, session=session) run_id = str(uuid4()) - dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_fail_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_fail_task.get_task("fail_task") ti = TaskInstance(task, run_id=run_id) @@ -2421,7 +2433,13 @@ def test_outlet_assets_skipped(self, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_skip_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_skip_task.get_task("skip_task") ti = TaskInstance(task, run_id=run_id)