Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retries and callbacks are not honoured for ending deferred task from trigger #46224

Open
1 of 2 tasks
tirkarthi opened this issue Jan 29, 2025 · 1 comment
Open
1 of 2 tasks
Labels

Comments

@tirkarthi
Copy link
Contributor

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers

On tasks with retry when a TaskFailedEvent is from trigger the task is marked as failed with the retries not executed for the failed tasks. Similar issue with callbacks and also email_on_failure, email_on_retry also not being honored. This happens in main and also 2.10.4 . PR related #40084

What you think should happen instead?

Retries, callbacks and emails on failure should be honored.

How to reproduce

  1. Create following triggers and dag with retries and callback.
  2. Run the dag with TaskFailedEvent emitted from the trigger.
  3. The dag is marked as failed without retries and callbacks executed.
  4. Uncomment yielding TriggerEvent and run the dag again with new code.
  5. Retry happens and the failure callback is also executed.
# plugins/custom_trigger.py
from __future__ import annotations

import asyncio
import logging

from airflow.triggers.base import BaseTrigger, TriggerEvent, TaskSuccessEvent, TaskFailedEvent
from airflow.utils import timezone

class StateTrigger(BaseTrigger):
    def __init__(self, state):
        super().__init__()
        self.state = state

    def serialize(self):
        return ("custom_trigger.StateTrigger", {"state": self.state})

    async def run(self):
        if self.state == "success":
            yield TaskSuccessEvent()
        else:
            yield TaskFailedEvent()

        # yield TriggerEvent(self.state)
# dag_state_test.py

from __future__ import annotations

from datetime import datetime

from custom_trigger import StateTrigger

from airflow import DAG
from airflow.models.baseoperator import BaseOperator


class MultipleDeferTrigger(BaseOperator):
    """Multiple defer trigger."""

    def __init__(self, state=None, *args, **kwargs):
        self.state = state
        super().__init__(*args, **kwargs)

    def execute(self, context):
        self.defer(
            trigger=StateTrigger(self.state),
            method_name="execute_complete",
        )

    def execute_complete(self, context, event=None):
        raise Exception(event)

with DAG(
    dag_id="state_defer",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule=None,
    default_args = {
        "on_success_callback": lambda context: open("/tmp/on_success_callback", "w+").write(str(datetime.now())),
        "on_failure_callback": lambda context: open("/tmp/on_failure_callback", "w+").write(str(datetime.now())),
        "retries": 1,
        "retry_delay": 5.0
    }
) as dag:
    success = MultipleDeferTrigger(task_id="success", state="success", retry_delay=5.0)
    failed = MultipleDeferTrigger(task_id="failed", state="failed", retry_delay=5.0)

    success
    failed

Operating System

Ubuntu 20.04.3 LTS

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tirkarthi tirkarthi added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet area:Triggerer and removed needs-triage label for new issues that we didn't triage yet labels Jan 29, 2025
@tirkarthi
Copy link
Contributor Author

cc: @sunank200

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant