Skip to content

Commit

Permalink
remove backfill related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 committed Feb 7, 2025
1 parent 15dfbf9 commit c8b8577
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 36 deletions.
26 changes: 2 additions & 24 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -140,7 +141,6 @@ class BackfillDagRunExceptionReason(str, Enum):
IN_FLIGHT = "in flight"
ALREADY_EXISTS = "already exists"
UNKNOWN = "unknown"
CLEARED_RUN = "cleared existing run"


class BackfillDagRun(Base):
Expand Down Expand Up @@ -194,11 +194,7 @@ def _get_latest_dag_run_row_query(info, session):
def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None:
non_create_reason = None
if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
if dr.clear_number == 0:
non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
else:
non_create_reason = BackfillDagRunExceptionReason.CLEARED_RUN

non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
elif reprocess_behavior is ReprocessBehavior.NONE:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
elif reprocess_behavior is ReprocessBehavior.FAILED:
Expand Down Expand Up @@ -266,23 +262,7 @@ def _create_backfill_dag_run(
dag_run_conf,
backfill_sort_ordinal,
session,
from_date,
to_date,
):
from airflow.models import DAG

dr = session.scalar(_get_latest_dag_run_row_query(info, session))
if (
dr
and dr.state not in {DagRunState.RUNNING}
and reprocess_behavior in {ReprocessBehavior.COMPLETED, ReprocessBehavior.FAILED}
):
DAG.clear_dags(
[dag],
start_date=from_date,
end_date=to_date,
dag_run_state=DagRunState.QUEUED,
)
with session.begin_nested() as nested:
dr = session.scalar(
with_row_locks(
Expand Down Expand Up @@ -432,8 +412,6 @@ def _create_backfill(
reprocess_behavior=br.reprocess_behavior,
backfill_sort_ordinal=backfill_sort_ordinal,
session=session,
from_date=from_date,
to_date=to_date,
)
log.info(
"created backfill dag run dag_id=%s backfill_id=%s, info=%s",
Expand Down
24 changes: 12 additions & 12 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,32 +164,28 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
ReprocessBehavior.NONE,
{
"2021-01-01": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
(
ReprocessBehavior.FAILED,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
(
ReprocessBehavior.COMPLETED,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-05": 1,
"2021-01-06": 1,
"2021-01-07": 1,
"2021-01-09": 1,
},
),
Expand All @@ -214,8 +210,12 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session):
# whether a dag run is created for backfill depends on
# the last run for a logical date
("2021-01-02", ["failed"]),
("2021-01-05", ["success"]),
("2021-01-08", ["running"]),
("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed"
("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success"
("2021-01-05", ["success", "success"]),
("2021-01-06", ["failed", "failed"]),
("2021-01-07", ["running", "running"]),
("2021-01-08", ["failed", "running"]),
]
for state in states
]
Expand Down Expand Up @@ -271,12 +271,12 @@ def _get_bdr(date):

# 2021-01-04 is "failed" so it may or may not be reprocessed depending
# on the configuration
bdr = _get_bdr("2021-01-05")
bdr = _get_bdr("2021-01-04")
actual_reason = bdr.exception_reason
if reprocess_behavior is ReprocessBehavior.FAILED:
assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS
elif reprocess_behavior is ReprocessBehavior.COMPLETED:
assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN
assert actual_reason is None
elif reprocess_behavior is ReprocessBehavior.NONE:
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS

Expand Down

0 comments on commit c8b8577

Please sign in to comment.