diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index a2347e76fdc6e..735f09f3a3f74 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -37,6 +37,7 @@ func, select, ) +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import relationship, validates from sqlalchemy_jsonfield import JSONField @@ -46,7 +47,7 @@ from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, nulls_first from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -253,6 +254,28 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess return logical_dates +def should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session +) -> bool: + """Determine if a backfill DAG run should be created and add a BackfillDagRun if required.""" + dr = session.scalar(_get_latest_dag_run_row_query(info, session)) + if not dr: + return False + non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) + if non_create_reason: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=non_create_reason, + sort_ordinal=backfill_sort_ordinal, + ) + ) + return True + return False + + def _create_backfill_dag_run( *, dag: DAG, @@ -263,57 +286,54 @@ def _create_backfill_dag_run( backfill_sort_ordinal, session, ): - with session.begin_nested() as nested: - dr = session.scalar( - with_row_locks( - query=_get_latest_dag_run_row_query(info, session), - session=session, - ), + with session.begin_nested(): + should_skip_create_backfill = should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) - if dr: - non_create_reason = _get_dag_run_no_create_reason(dr, reprocess_behavior) - if non_create_reason: - # rolling back here restores to start of this nested tran - # which releases the lock on the latest dag run, since we - # are not creating a new one - nested.rollback() - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, - sort_ordinal=backfill_sort_ordinal, - ) - ) - return + if should_skip_create_backfill: + return + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) - dr = dag.create_dagrun( - run_id=dag.timetable.generate_run_id( - run_type=DagRunType.BACKFILL_JOB, + try: + dr = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.BACKFILL_JOB, + logical_date=info.logical_date, + data_interval=info.data_interval, + ), logical_date=info.logical_date, data_interval=info.data_interval, - ), - logical_date=info.logical_date, - data_interval=info.data_interval, - run_after=info.run_after, - conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - triggered_by=DagRunTriggeredByType.BACKFILL, - dag_version=dag_version, - state=DagRunState.QUEUED, - start_date=timezone.utcnow(), - backfill_id=backfill_id, - session=session, - ) - session.add( - BackfillDagRun( + run_after=info.run_after, + conf=dag_run_conf, + run_type=DagRunType.BACKFILL_JOB, + triggered_by=DagRunTriggeredByType.BACKFILL, + dag_version=dag_version, + state=DagRunState.QUEUED, + start_date=timezone.utcnow(), backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, - logical_date=info.logical_date, + session=session, + ) + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, + ) + ) + except IntegrityError: + log.info( + "Skipped creating backfill dag run for dag_id=%s backfill_id=%s, logical_date=%s (already exists)", + dr.dag_id, + dr.id, + info.logical_date, + ) + log.info("Doing session rollback.") + session.rollback() + + should_create_backfill_dag_run( + info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session ) - ) def _get_info_list( @@ -389,16 +409,8 @@ def _create_backfill( dag=dag, ) - log.info("obtaining lock on dag %s", dag_id) - # we obtain a lock on dag model so that nothing else will create - # dag runs at the same time. mainly this is required by non-uniqueness - # of logical_date. otherwise we could just create run in a try-except. - dag_model = session.scalar( - with_row_locks( - select(DagModel).where(DagModel.dag_id == dag_id), - session=session, - ) - ) + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) + if not dag_model: raise RuntimeError(f"Dag {dag_id} not found")