Skip to content

Commit 4cb5cb6

Browse files
scripts: Add code for waiting until the restarted job is ready and the metric condition is met
1 parent b7d1580 commit 4cb5cb6

File tree

2 files changed

+118
-20
lines changed

2 files changed

+118
-20
lines changed

scripts/prod/update_config_and_restart_nodes.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from update_config_and_restart_nodes_lib import (
99
ApolloArgsParserBuilder,
1010
Colors,
11+
MetricConditionGater,
1112
NamespaceAndInstructionArgs,
1213
Service,
1314
ServiceRestarter,
15+
WaitOnMetrticOneByOneRestarter,
1416
print_colored,
1517
print_error,
1618
update_config_and_restart_nodes,
@@ -155,10 +157,19 @@ def main():
155157
None,
156158
)
157159

158-
restarter = ServiceRestarter.from_restart_strategy(
159-
args.restart_strategy,
160+
# Create the appropriate restarter based on the restart strategy
161+
# restarter = ServiceRestarter.from_restart_strategy(
162+
# args.restart_strategy,
163+
# namespace_and_instruction_args,
164+
# args.service,
165+
# )
166+
167+
restarter = WaitOnMetrticOneByOneRestarter(
160168
namespace_and_instruction_args,
161169
args.service,
170+
"mempool_p2p_propagator_local_msgs_processed",
171+
8082,
172+
MetricConditionGater.MetricCondition(lambda val: val > 4167980, "Greater than 4167980"),
162173
)
163174

164175
update_config_and_restart_nodes(

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 105 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,16 @@ def _poll_until_condition_met(
278278
self, metric_value_condition: "MetricConditionGater.MetricCondition"
279279
):
280280
"""Poll metrics until the condition is met for the metric."""
281+
<<<<<<< HEAD
281282
condition_description = (
282283
f"({metric_value_condition.condition_description}) "
283284
if metric_value_condition.condition_description is not None
284285
else ""
285286
)
286287

288+
=======
289+
condition_desc = f"({metric_value_condition.condition_description if metric_value_condition.condition_description is not None else ""}) "
290+
>>>>>>> 30e33e387 (scripts: Add code for waiting until the restarted job is ready and the metric condition is met)
287291
while True:
288292
metrics = self._get_metrics_raw_string()
289293
assert metrics is not None, f"Failed to get metrics from for pod {self.pod}"
@@ -433,6 +437,20 @@ def get_context_list_from_args(
433437
]
434438

435439

440+
def _get_pod_names(
441+
namespace: str, service: Service, index: int, cluster: Optional[str] = None
442+
) -> list[str]:
443+
kubectl_args = [
444+
"get",
445+
"pods",
446+
"-o",
447+
"name",
448+
]
449+
kubectl_args.extend(get_namespace_args(namespace, cluster))
450+
pods = run_kubectl_command(kubectl_args, capture_output=True).stdout.splitlines()
451+
return [pod.split("/")[1] for pod in pods if pod.startswith(f"pod/{service.pod_name}")]
452+
453+
436454
class ServiceRestarter(ABC):
437455
"""Abstract class for restarting service instances."""
438456

@@ -449,18 +467,7 @@ def _restart_pod(
449467
namespace: str, service: Service, index: int, cluster: Optional[str] = None
450468
) -> None:
451469
"""Restart pod by deleting it"""
452-
# Get the list of pods (one string per line).
453-
kubectl_args = [
454-
"get",
455-
"pods",
456-
"-o",
457-
"name",
458-
]
459-
kubectl_args.extend(get_namespace_args(namespace, cluster))
460-
pods = run_kubectl_command(kubectl_args, capture_output=True).stdout.splitlines()
461-
462-
# Filter the list of pods to only include the ones that match the service and extract the pod name.
463-
pods = [pod.split("/")[1] for pod in pods if pod.startswith(f"pod/{service.pod_name}")]
470+
pods = _get_pod_names(namespace, service, index, cluster)
464471

465472
if not pods:
466473
print_error(f"Could not find pods for service {service.pod_name}.")
@@ -534,12 +541,12 @@ def __init__(
534541

535542
def restart_service(self, instance_index: int) -> bool:
536543
"""Restart the instance one by one, running the use code in between each restart."""
537-
self._restart_pod(
538-
self.namespace_and_instruction_args.get_namespace(instance_index),
539-
self.service,
540-
instance_index,
541-
self.namespace_and_instruction_args.get_cluster(instance_index),
542-
)
544+
# self._restart_pod(
545+
# self.namespace_and_instruction_args.get_namespace(instance_index),
546+
# self.service,
547+
# instance_index,
548+
# self.namespace_and_instruction_args.get_cluster(instance_index),
549+
# )
543550
instructions = self.namespace_and_instruction_args.get_instruction(instance_index)
544551
print_colored(
545552
f"Restarted pod {instance_index}.\n{instructions if instructions is not None else ''} ",
@@ -548,6 +555,86 @@ def restart_service(self, instance_index: int) -> bool:
548555
return self.check_between_restarts(instance_index)
549556

550557

558+
class WaitOnMetrticOneByOneRestarter(ChecksBetweenRestarts):
559+
def __init__(
560+
self,
561+
namespace_and_instruction_args: NamespaceAndInstructionArgs,
562+
service: Service,
563+
metric_name: str,
564+
metrics_port: int,
565+
metric_value_condition: "MetricConditionGater.MetricCondition",
566+
):
567+
def _check_between_restarts(instance_index: int) -> bool:
568+
# This is to prevent the case where we get the pod name of the old pod we just deleted.
569+
# TODO(guy.f): Verify this is not the name of the old pod some other way.
570+
sleep(2)
571+
pod_names = WaitOnMetrticOneByOneRestarter._wait_for_pods_to_be_ready(
572+
namespace_and_instruction_args.get_namespace(instance_index),
573+
namespace_and_instruction_args.get_cluster(instance_index),
574+
service,
575+
)
576+
if pod_names is None:
577+
return False
578+
579+
for pod_name in pod_names:
580+
metric_condition_gater = MetricConditionGater(
581+
metric_name,
582+
namespace_and_instruction_args.get_namespace(instance_index),
583+
namespace_and_instruction_args.get_cluster(instance_index),
584+
pod_name,
585+
metrics_port,
586+
)
587+
metric_condition_gater.gate(metric_value_condition)
588+
if instance_index == namespace_and_instruction_args.size() - 1:
589+
return True
590+
return wait_until_y_or_n(f"Do you want to restart the next pod?")
591+
592+
super().__init__(namespace_and_instruction_args, service, _check_between_restarts)
593+
594+
@staticmethod
595+
def _wait_for_pods_to_be_ready(
596+
namespace: str,
597+
cluster: Optional[str],
598+
service: Service,
599+
wait_timeout: int = 180,
600+
num_retry: int = 3,
601+
refresh_delay_sec: int = 3,
602+
) -> Optional[list[str]]:
603+
for i in range(num_retry):
604+
pods = _get_pod_names(namespace, service, 0, cluster)
605+
if pods:
606+
for pod in pods:
607+
print_colored(
608+
f"Waiting for pod {pod} to be ready... (timeout: {wait_timeout}s)"
609+
)
610+
kubectl_args = [
611+
"wait",
612+
"--for=condition=ready",
613+
f"pod/{pod}",
614+
"--timeout",
615+
f"{wait_timeout}s",
616+
]
617+
kubectl_args.extend(get_namespace_args(namespace, cluster))
618+
result = run_kubectl_command(kubectl_args, capture_output=False)
619+
620+
if result.returncode != 0:
621+
print_colored(
622+
f"Timed out waiting for pod {pod} to be ready: {result.stderr}, retrying... (attempt {i + 1}/{num_retry})",
623+
Colors.YELLOW,
624+
)
625+
break
626+
return pods
627+
else:
628+
print_colored(
629+
f"Could not get pod names for service {service.pod_name}, retrying... (attempt {i + 1}/{num_retry})",
630+
Colors.YELLOW,
631+
)
632+
sleep(refresh_delay_sec)
633+
634+
print_error(f"Pods for service {service.pod_name} are not ready after {num_retry} attempts")
635+
return None
636+
637+
551638
class NoOpServiceRestarter(ServiceRestarter):
552639
"""No-op service restarter."""
553640

0 commit comments

Comments
 (0)