Skip to content

Commit

Permalink
implement review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vatsrahul1001 committed Feb 7, 2025
1 parent c8b8577 commit 5467412
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 14 deletions.
3 changes: 2 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class TriggerDAGRunPostBody(StrictBaseModel):
logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None

conf: dict = Field(default_factory=dict)
note: str | None = None

Expand All @@ -97,6 +96,8 @@ def check_data_intervals(cls, values):
)
return values

## when logical date is null, the run id should be generated from run_after + random string.
# TODO we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged
@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
Expand Down
18 changes: 5 additions & 13 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -352,6 +353,7 @@ def trigger_dag_run(
)

logical_date = pendulum.instance(body.logical_date) if body.logical_date is not None else None
coerced_logical_date = timezone.coerce_datetime(logical_date)

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -363,22 +365,12 @@ def trigger_dag_run(
)
else:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=logical_date or pendulum.now("UTC")
run_after=coerced_logical_date or pendulum.now("UTC")
)

run_id = (
body.dag_run_id
if body.dag_run_id
else dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=logical_date or pendulum.now("UTC"),
data_interval=data_interval,
)
)

dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
run_id=cast(str, body.dag_run_id),
logical_date=coerced_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=body.conf,
Expand Down

0 comments on commit 5467412

Please sign in to comment.