Skip to content

Commit 1236292

Browse files
scripts: Add code for waiting until the restarted job is ready and the metric condition is met
1 parent 0b3679e commit 1236292

File tree

2 files changed

+109
-14
lines changed

2 files changed

+109
-14
lines changed

scripts/prod/update_config_and_restart_nodes.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from update_config_and_restart_nodes_lib import (
99
ApolloArgsParserBuilder,
1010
Colors,
11+
MetricConditionGater,
1112
NamespaceAndInstructionArgs,
1213
Service,
1314
ServiceRestarter,
15+
WaitOnMetrticOneByOneRestarter,
1416
print_colored,
1517
print_error,
1618
update_config_and_restart_nodes,
@@ -155,10 +157,19 @@ def main():
155157
None,
156158
)
157159

158-
restarter = ServiceRestarter.from_restart_strategy(
159-
args.restart_strategy,
160+
# Create the appropriate restarter based on the restart strategy
161+
# restarter = ServiceRestarter.from_restart_strategy(
162+
# args.restart_strategy,
163+
# namespace_and_instruction_args,
164+
# args.service,
165+
# )
166+
167+
restarter = WaitOnMetrticOneByOneRestarter(
160168
namespace_and_instruction_args,
161169
args.service,
170+
"mempool_p2p_propagator_local_msgs_processed",
171+
8082,
172+
MetricConditionGater.MetricCondition(lambda val: val > 4247916, "Greater than 4247916"),
162173
)
163174

164175
update_config_and_restart_nodes(

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 96 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,20 @@ def get_context_list_from_args(
433433
]
434434

435435

436+
def _get_pod_names(
437+
namespace: str, service: Service, index: int, cluster: Optional[str] = None
438+
) -> list[str]:
439+
kubectl_args = [
440+
"get",
441+
"pods",
442+
"-o",
443+
"name",
444+
]
445+
kubectl_args.extend(get_namespace_args(namespace, cluster))
446+
pods = run_kubectl_command(kubectl_args, capture_output=True).stdout.splitlines()
447+
return [pod.split("/")[1] for pod in pods if pod.startswith(f"pod/{service.pod_name}")]
448+
449+
436450
class ServiceRestarter(ABC):
437451
"""Abstract class for restarting service instances."""
438452

@@ -449,18 +463,7 @@ def _restart_pod(
449463
namespace: str, service: Service, index: int, cluster: Optional[str] = None
450464
) -> None:
451465
"""Restart pod by deleting it"""
452-
# Get the list of pods (one string per line).
453-
kubectl_args = [
454-
"get",
455-
"pods",
456-
"-o",
457-
"name",
458-
]
459-
kubectl_args.extend(get_namespace_args(namespace, cluster))
460-
pods = run_kubectl_command(kubectl_args, capture_output=True).stdout.splitlines()
461-
462-
# Filter the list of pods to only include the ones that match the service and extract the pod name.
463-
pods = [pod.split("/")[1] for pod in pods if pod.startswith(f"pod/{service.pod_name}")]
466+
pods = _get_pod_names(namespace, service, index, cluster)
464467

465468
if not pods:
466469
print_error(f"Could not find pods for service {service.pod_name}.")
@@ -548,6 +551,87 @@ def restart_service(self, instance_index: int) -> bool:
548551
return self.check_between_restarts(instance_index)
549552

550553

554+
class WaitOnMetrticOneByOneRestarter(ChecksBetweenRestarts):
555+
def __init__(
556+
self,
557+
namespace_and_instruction_args: NamespaceAndInstructionArgs,
558+
service: Service,
559+
metric_name: str,
560+
metrics_port: int,
561+
metric_value_condition: "MetricConditionGater.MetricCondition",
562+
):
563+
def _check_between_restarts(instance_index: int) -> bool:
564+
# This is to prevent the case where we get the pod name of the old pod we just deleted.
565+
# TODO(guy.f): Verify this is not the name of the old pod some other way.
566+
sleep(2)
567+
pod_names = WaitOnMetrticOneByOneRestarter._wait_for_pods_to_be_ready(
568+
namespace_and_instruction_args.get_namespace(instance_index),
569+
namespace_and_instruction_args.get_cluster(instance_index),
570+
service,
571+
)
572+
if pod_names is None:
573+
return False
574+
575+
for pod_name in pod_names:
576+
metric_condition_gater = MetricConditionGater(
577+
metric_name,
578+
namespace_and_instruction_args.get_namespace(instance_index),
579+
namespace_and_instruction_args.get_cluster(instance_index),
580+
pod_name,
581+
metrics_port,
582+
metric_value_condition,
583+
)
584+
metric_condition_gater.gate()
585+
if instance_index == namespace_and_instruction_args.size() - 1:
586+
return True
587+
return wait_until_y_or_n(f"Do you want to restart the next pod?")
588+
589+
super().__init__(namespace_and_instruction_args, service, _check_between_restarts)
590+
591+
@staticmethod
592+
def _wait_for_pods_to_be_ready(
593+
namespace: str,
594+
cluster: Optional[str],
595+
service: Service,
596+
wait_timeout: int = 180,
597+
num_retry: int = 3,
598+
refresh_delay_sec: int = 3,
599+
) -> Optional[list[str]]:
600+
for i in range(num_retry):
601+
pods = _get_pod_names(namespace, service, 0, cluster)
602+
if pods:
603+
for pod in pods:
604+
print_colored(
605+
f"Waiting for pod {pod} to be ready... (timeout: {wait_timeout}s)"
606+
)
607+
kubectl_args = [
608+
"wait",
609+
"--for=condition=ready",
610+
f"pod/{pod}",
611+
"--timeout",
612+
f"{wait_timeout}s",
613+
]
614+
kubectl_args.extend(get_namespace_args(namespace, cluster))
615+
result = run_kubectl_command(kubectl_args, capture_output=False)
616+
617+
if result.returncode != 0:
618+
print_colored(
619+
f"Timed out waiting for pod {pod} to be ready: {result.stderr}, retrying... (attempt {i + 1}/{num_retry})",
620+
Colors.YELLOW,
621+
)
622+
break
623+
return pods
624+
else:
625+
print_colored(
626+
f"Could not get pod names for service {service.pod_name}, retrying... (attempt {i + 1}/{num_retry})",
627+
Colors.YELLOW,
628+
)
629+
sleep(refresh_delay_sec)
630+
631+
print_error(f"Pods for service {service.pod_name} are not ready after {num_retry} attempts")
632+
return None
633+
634+
551635
class NoOpServiceRestarter(ServiceRestarter):
552636
"""No-op service restarter."""
553637

0 commit comments

Comments
 (0)