Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove subdir command from task commands #47837

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

ambika-garg
Copy link
Contributor

related: #45647

@ambika-garg
Copy link
Contributor Author

Hey @jedcunningham, should we remove the get_dags function in task_clear, since the task_clear command only accepts a single dag_id?

@@ -231,7 +231,13 @@ def task_failed_deps(args) -> None:
Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks
to have succeeded, but found 1 non-success(es).
"""
dag = get_dag(args.subdir, args.dag_id)
dag = _parse_and_get_dag(args.dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just use serialized dag from the db, not reparse it on the fly like this.

@@ -255,7 +261,13 @@ def task_state(args) -> None:
>>> airflow tasks state tutorial sleep 2015-01-01
success
"""
dag = get_dag(args.subdir, args.dag_id)
dag = _parse_and_get_dag(args.dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@@ -266,7 +278,13 @@ def task_state(args) -> None:
@providers_configuration_loaded
def task_list(args, dag: DAG | None = None) -> None:
"""List the tasks within a DAG at the command line."""
dag = dag or get_dag(args.subdir, args.dag_id)
dag = dag or _parse_and_get_dag(args.dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

@@ -421,8 +444,13 @@ def task_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> N
@providers_configuration_loaded
def task_render(args, dag: DAG | None = None) -> None:
"""Render and displays templated fields for a given task."""
dag = dag or _parse_and_get_dag(args.dag_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

@jedcunningham
Copy link
Member

should we remove the get_dags function in task_clear, since the task_clear command only accepts a single dag_id?

Not sure without digging in more. Might be some leftover cruft from subdags possibly?

@ambika-garg
Copy link
Contributor Author

should we remove the get_dags function in task_clear, since the task_clear command only accepts a single dag_id?

Not sure without digging in more. Might be some leftover cruft from subdags possibly?

I was referring to this function.
https://github.com/apache/airflow/blob/main/airflow/cli/commands/remote_commands/task_command.py#L446

@@ -447,12 +456,11 @@ def task_clear(args) -> None:
"""Clear all task instances or only those matched by regex for a DAG(s)."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)

if args.dag_id and not args.subdir and not args.dag_regex and not args.task_regex:
if args.dag_id and not args.dag_regex and not args.task_regex:
dags = [get_dag_by_file_location(args.dag_id)]
else:
# todo clear command only accepts a single dag_id. no reason for get_dags with 's' except regex?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be maintained because if the user wants to use regex, dag_id is treated as a regex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I made the changes accordingly.

@ambika-garg ambika-garg force-pushed the remove-subarg-task-command branch from 0a9c459 to 6ac6d36 Compare March 23, 2025 18:29
@ambika-garg
Copy link
Contributor Author

Hey @jedcunningham, can you please take a look and help me understand why the test case is failing?

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roughly tracing all the way down, it seems that the task_to_execute we pass to ExecutionCallableRunner

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L610

is a BaseOperator instead of a PythonOperator (or another Python-based operator inheriting from BaseOperator). Compared to the example_python_operator DAG

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_python_operator.py

this results in a NotImplementedError being raised from BaseOperator.execute

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/baseoperator.py#L441-L449

#
# No code was selected, so we can't improve anything.#
Copy link
Member

@jason810496 jason810496 Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Seems like your colleague has left here xD

@ambika-garg
Copy link
Contributor Author

Roughly tracing all the way down, it seems that the task_to_execute we pass to ExecutionCallableRunner

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L610

is a BaseOperator instead of a PythonOperator (or another Python-based operator inheriting from BaseOperator). Compared to the example_python_operator DAG

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/example_dags/example_python_operator.py

this results in a NotImplementedError being raised from BaseOperator.execute

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/baseoperator.py#L441-L449

Thanks for the amazing insights, @jason810496! I’m wondering why these are failing since nothing related to them is being changed in the PR.

@jason810496
Copy link
Member

Thanks for the amazing insights, @jason810496! I’m wondering why these are failing since nothing related to them is being changed in the PR.

I'm not sure at the moment, so we need to dig along the path I described to find out.

@ambika-garg ambika-garg force-pushed the remove-subarg-task-command branch from 7833efd to 0e644d7 Compare March 26, 2025 14:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants