From 42a19c93189bf9abcc5d70fdcf8b78a8657e53b1 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Wed, 29 Jan 2025 21:42:29 +0530 Subject: [PATCH 1/2] update backfill logic after unique constraint on logical_date is restored --- airflow/models/backfill.py | 120 +++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 58 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index a2347e76fdc6e..3c108e74ed190 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -37,6 +37,7 @@ func, select, ) +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import relationship, validates from sqlalchemy_jsonfield import JSONField @@ -46,7 +47,7 @@ from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, nulls_first from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -263,20 +264,11 @@ def _create_backfill_dag_run( backfill_sort_ordinal, session, ): - with session.begin_nested() as nested: - dr = session.scalar( - with_row_locks( - query=_get_latest_dag_run_row_query(info, session), - session=session, - ), - ) + with session.begin_nested(): + dr = session.scalar(_get_latest_dag_run_row_query(info, session)) if dr: non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) if non_create_reason: - # rolling back here restores to start of this nested tran - # which releases the lock on the latest dag run, since we - # are not creating a new one - nested.rollback() session.add( BackfillDagRun( backfill_id=backfill_id, @@ -288,32 +280,42 @@ def _create_backfill_dag_run( ) return dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) - dr = dag.create_dagrun( - run_id=dag.timetable.generate_run_id( - run_type=DagRunType.BACKFILL_JOB, + try: + dr = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.BACKFILL_JOB, + logical_date=info.logical_date, + data_interval=info.data_interval, + ), logical_date=info.logical_date, data_interval=info.data_interval, - ), - logical_date=info.logical_date, - data_interval=info.data_interval, - run_after=info.run_after, + run_after=info.run_after, conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - triggered_by=DagRunTriggeredByType.BACKFILL, - dag_version=dag_version, - state=DagRunState.QUEUED, - start_date=timezone.utcnow(), - backfill_id=backfill_id, - session=session, - ) - session.add( - BackfillDagRun( + run_type=DagRunType.BACKFILL_JOB, + triggered_by=DagRunTriggeredByType.BACKFILL, + dag_version=dag_version, + state=DagRunState.QUEUED, + start_date=timezone.utcnow(), backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, - logical_date=info.logical_date, + session=session, ) - ) + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, + ) + ) + except IntegrityError: + log.info( + "Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)", + dr.dag_id, + dr.id, + info.logical_date, + ) + log.info("Doing session rollback.") + session.rollback() def _get_info_list( @@ -389,34 +391,36 @@ def _create_backfill( dag=dag, ) - log.info("obtaining lock on dag %s", dag_id) - # we obtain a lock on dag model so that nothing else will create - # dag runs at the same time. mainly this is required by non-uniqueness - # of logical_date. otherwise we could just create run in a try-except. - dag_model = session.scalar( - with_row_locks( - select(DagModel).where(DagModel.dag_id == dag_id), - session=session, - ) - ) + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) + if not dag_model: raise RuntimeError(f"Dag {dag_id} not found") for info in dagrun_info_list: backfill_sort_ordinal += 1 - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - reprocess_behavior=br.reprocess_behavior, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) + try: + _create_backfill_dag_run( + dag=dag, + info=info, + backfill_id=br.id, + dag_run_conf=br.dag_run_conf, + reprocess_behavior=br.reprocess_behavior, + backfill_sort_ordinal=backfill_sort_ordinal, + session=session, + ) + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + br.id, + info, + ) + except IntegrityError: + log.info( + "Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)", + dag.dag_id, + br.id, + info.logical_date, + ) + log.info("Doing session rollback.") + session.rollback() return br From 8d3415de0299441171024ed890889b0ff88fc3cb Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Wed, 5 Feb 2025 16:54:19 +0530 Subject: [PATCH 2/2] remove exception handling and add backfill dag run if insert fails --- airflow/models/backfill.py | 88 +++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 3c108e74ed190..735f09f3a3f74 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -254,6 +254,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess return logical_dates +def should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session +) -> bool: + """Determine if a backfill DAG run should be created and add a BackfillDagRun if required.""" + dr = session.scalar(_get_latest_dag_run_row_query(info, session)) + if not dr: + return False + non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + if non_create_reason: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=non_create_reason, + sort_ordinal=backfill_sort_ordinal, + ) + ) + return True + return False + + def _create_backfill_dag_run( *, dag: DAG, @@ -265,20 +287,12 @@ def _create_backfill_dag_run( session, ): with session.begin_nested(): - dr = session.scalar(_get_latest_dag_run_row_query(info, session)) - if dr: - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) - if non_create_reason: - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, - sort_ordinal=backfill_sort_ordinal, - ) - ) - return + should_skip_create_backfill = should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + ) + if should_skip_create_backfill: + return + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) try: dr = dag.create_dagrun( @@ -290,7 +304,7 @@ def _create_backfill_dag_run( logical_date=info.logical_date, data_interval=info.data_interval, run_after=info.run_after, - conf=dag_run_conf, + conf=dag_run_conf, run_type=DagRunType.BACKFILL_JOB, triggered_by=DagRunTriggeredByType.BACKFILL, dag_version=dag_version, @@ -317,6 +331,10 @@ def _create_backfill_dag_run( log.info("Doing session rollback.") session.rollback() + should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session + ) + def _get_info_list( *, @@ -398,29 +416,19 @@ def _create_backfill( for info in dagrun_info_list: backfill_sort_ordinal += 1 - try: - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - reprocess_behavior=br.reprocess_behavior, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) - except IntegrityError: - log.info( - "Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)", - dag.dag_id, - br.id, - info.logical_date, - ) - log.info("Doing session rollback.") - session.rollback() + _create_backfill_dag_run( + dag=dag, + info=info, + backfill_id=br.id, + dag_run_conf=br.dag_run_conf, + reprocess_behavior=br.reprocess_behavior, + backfill_sort_ordinal=backfill_sort_ordinal, + session=session, + ) + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + br.id, + info, + ) return br