Skip to content

Commit

Permalink
Fix k8s flaky test - test_integration_run_dag_with_scheduler_failure (a…
Browse files Browse the repository at this point in the history
…pache#46502)

* Fix k8s flaky test with test_integration_run_dag_with_scheduler_failure

* Fix criteria for _is_deployed_with_same_executor
  • Loading branch information
jason810496 authored and insomnes committed Feb 6, 2025
1 parent 8b4e914 commit c83fadd
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
50 changes: 48 additions & 2 deletions dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
option_verbose,
)
from airflow_breeze.commands.production_image_commands import run_build_production_image
from airflow_breeze.global_constants import ALLOWED_EXECUTORS, ALLOWED_KUBERNETES_VERSIONS
from airflow_breeze.global_constants import (
ALLOWED_EXECUTORS,
ALLOWED_KUBERNETES_VERSIONS,
CELERY_EXECUTOR,
KUBERNETES_EXECUTOR,
)
from airflow_breeze.params.build_prod_params import BuildProdParams
from airflow_breeze.utils.ci_group import ci_group
from airflow_breeze.utils.click_utils import BreezeGroup
Expand Down Expand Up @@ -782,6 +787,12 @@ def upload_k8s_image(
if return_code == 0:
get_console().print("\n[warning]NEXT STEP:[/][info] You might now deploy airflow by:\n")
get_console().print("\nbreeze k8s deploy-airflow\n")
get_console().print(
"\n[warning]Note:[/]\nIf you want to run tests with [info]--executor KubernetesExecutor[/], you should deploy airflow with [info]--multi-namespace-mode --executor KubernetesExecutor[/] flag.\n"
)
get_console().print(
"\nbreeze k8s deploy-airflow --multi-namespace-mode --executor KubernetesExecutor\n"
)
sys.exit(return_code)


Expand Down Expand Up @@ -1406,6 +1417,31 @@ def _get_parallel_test_args(
return combo_titles, combos, pytest_args, short_combo_titles


def _is_deployed_with_same_executor(python: str, kubernetes_version: str, executor: str) -> bool:
"""Check if the current cluster is deployed with the same executor that the current tests are using.
This is especially useful when running tests with executors like KubernetesExecutor, CeleryExecutor, etc.
It verifies by checking the label of the airflow-scheduler deployment.
"""
result = run_command_with_k8s_env(
[
"kubectl",
"get",
"deployment",
"-n",
"airflow",
"airflow-scheduler",
"-o",
"jsonpath='{.metadata.labels.executor}'",
],
python=python,
kubernetes_version=kubernetes_version,
capture_output=True,
check=False,
)
return executor == result.stdout.decode().strip().replace("'", "")


def _run_tests(
python: str,
kubernetes_version: str,
Expand All @@ -1422,7 +1458,17 @@ def _run_tests(
extra_shell_args.append("--no-rcs")
elif shell_binary.endswith("bash"):
extra_shell_args.extend(["--norc", "--noprofile"])
the_tests: list[str] = ["kubernetes_tests/"]
if (
executor == KUBERNETES_EXECUTOR or executor == CELERY_EXECUTOR
) and not _is_deployed_with_same_executor(python, kubernetes_version, executor):
get_console(output=output).print(
f"[warning]{executor} not deployed. Please deploy airflow with {executor} first."
)
get_console(output=output).print(
f"[info]You can deploy airflow with {executor} by running:[/]\nbreeze k8s configure-cluster\nbreeze k8s deploy-airflow --multi-namespace-mode --executor {executor}"
)
return 1, f"Tests {kubectl_cluster_name}"
the_tests: list[str] = ["kubernetes_tests/test_kubernetes_executor.py::TestKubernetesExecutor"]
command_to_run = " ".join([quote(arg) for arg in ["uv", "run", "pytest", *the_tests, *test_args]])
get_console(output).print(f"[info] Command to run:[/] {command_to_run}")
result = run_command(
Expand Down
8 changes: 8 additions & 0 deletions kubernetes_tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state,
print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
assert state == expected_final_state

@staticmethod
def ensure_deployment_health(deployment_name: str, namespace: str = "airflow"):
"""Watch the deployment until it is healthy."""
deployment_rollout_status = check_output(
["kubectl", "rollout", "status", "deployment", deployment_name, "-n", namespace, "--watch"]
).decode()
assert "successfully rolled out" in deployment_rollout_status

def ensure_dag_expected_state(self, host, logical_date, dag_id, expected_final_state, timeout):
tries = 0
state = ""
Expand Down
5 changes: 1 addition & 4 deletions kubernetes_tests/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
# under the License.
from __future__ import annotations

import time

import pytest

from kubernetes_tests.test_base import (
Expand Down Expand Up @@ -59,8 +57,7 @@ def test_integration_run_dag_with_scheduler_failure(self):
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)

self._delete_airflow_pod("scheduler")

time.sleep(10) # give time for pod to restart
self.ensure_deployment_health("airflow-scheduler")

# Wait some time for the operator to complete
self.monitor_task(
Expand Down

0 comments on commit c83fadd

Please sign in to comment.