Skip to content

Commit 3c013fc

Browse files
scripts: Add a new restarter that gates on metrics (#9925)
1 parent e11b1de commit 3c013fc

File tree

2 files changed

+126
-15
lines changed

2 files changed

+126
-15
lines changed

scripts/prod/metrics_lib.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,43 +9,42 @@
99
import socket
1010
import urllib.error
1111
import urllib.request
12+
from common_lib import Colors, get_namespace_args, print_colored, print_error
1213
from prometheus_client.parser import text_string_to_metric_families
1314

14-
from scripts.prod.common_lib import Colors, get_namespace_args, print_colored, print_error
15-
1615

1716
class MetricConditionGater:
1817
"""Gates progress on a metric satisfying a condition.
1918
2019
This class was meant to be used with counter/gauge metrics. It may not work properly with histogram metrics.
2120
"""
2221

23-
class MetricCondition:
22+
class Metric:
2423
def __init__(
2524
self,
25+
name: str,
2626
value_condition: Callable[[Any], bool],
2727
condition_description: Optional[str] = None,
2828
):
29+
self.name = name
2930
self.value_condition = value_condition
3031
self.condition_description = condition_description
3132

3233
def __init__(
3334
self,
34-
metric_name: str,
35+
metric: "MetricConditionGater.Metric",
3536
namespace: str,
3637
cluster: Optional[str],
3738
pod: str,
3839
metrics_port: int,
39-
metric_value_condition: "MetricConditionGater.MetricCondition",
4040
refresh_interval_seconds: int = 3,
4141
):
42-
self.metric_name = metric_name
42+
self.metric = metric
4343
self.local_port = self._get_free_port()
4444
self.namespace = namespace
4545
self.cluster = cluster
4646
self.pod = pod
4747
self.metrics_port = metrics_port
48-
self.metric_value_condition = metric_value_condition
4948
self.refresh_interval_seconds = refresh_interval_seconds
5049

5150
@staticmethod
@@ -77,8 +76,8 @@ def _get_metrics_raw_string(self) -> str:
7776
def _poll_until_condition_met(self):
7877
"""Poll metrics until the condition is met for the metric."""
7978
condition_description = (
80-
f"({self.metric_value_condition.condition_description}) "
81-
if self.metric_value_condition.condition_description is not None
79+
f"({self.metric.condition_description}) "
80+
if self.metric.condition_description is not None
8281
else ""
8382
)
8483

@@ -89,26 +88,26 @@ def _poll_until_condition_met(self):
8988
metric_families = text_string_to_metric_families(metrics)
9089
val = None
9190
for metric_family in metric_families:
92-
if metric_family.name == self.metric_name:
91+
if metric_family.name == self.metric.name:
9392
if len(metric_family.samples) > 1:
9493
print_error(
95-
f"Multiple samples found for metric {self.metric_name}. Using the first one.",
94+
f"Multiple samples found for metric {self.metric.name}. Using the first one.",
9695
)
9796
val = metric_family.samples[0].value
9897
break
9998

10099
if val is None:
101100
print_colored(
102-
f"Metric '{self.metric_name}' not found in pod {self.pod}. Assuming the node is not ready."
101+
f"Metric '{self.metric.name}' not found in pod {self.pod}. Assuming the node is not ready."
103102
)
104-
elif self.metric_value_condition.value_condition(val):
103+
elif self.metric.value_condition(val):
105104
print_colored(
106-
f"Metric {self.metric_name} condition {condition_description}met (value={val})."
105+
f"Metric {self.metric.name} condition {condition_description}met (value={val})."
107106
)
108107
return
109108
else:
110109
print_colored(
111-
f"Metric {self.metric_name} condition {condition_description}not met (value={val}). Continuing to wait."
110+
f"Metric {self.metric.name} condition {condition_description}not met (value={val}). Continuing to wait."
112111
)
113112

114113
sleep(self.refresh_interval_seconds)

scripts/prod/restarter_lib.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import sys
44
from abc import ABC, abstractmethod
5+
from time import sleep
56
from typing import Callable, Optional
67

78
from common_lib import (
@@ -15,6 +16,7 @@
1516
run_kubectl_command,
1617
wait_until_y_or_n,
1718
)
19+
from metrics_lib import MetricConditionGater
1820

1921

2022
def _get_pod_names(
@@ -142,3 +144,113 @@ def restart_service(self, instance_index: int) -> bool:
142144
"""No-op."""
143145
print_colored("\nSkipping pod restart.")
144146
return True
147+
148+
149+
class WaitOnMetricRestarter(ChecksBetweenRestarts):
150+
def __init__(
151+
self,
152+
namespace_and_instruction_args: NamespaceAndInstructionArgs,
153+
service: Service,
154+
metrics: list["MetricConditionGater.Metric"],
155+
metrics_port: int,
156+
restart_strategy: RestartStrategy,
157+
):
158+
self.metrics = metrics
159+
self.metrics_port = metrics_port
160+
if restart_strategy == RestartStrategy.ONE_BY_ONE:
161+
check_function = self._check_between_each_restart
162+
elif restart_strategy == RestartStrategy.ALL_AT_ONCE:
163+
check_function = self._check_all_only_after_last_restart
164+
else:
165+
print_error(f"Invalid restart strategy: {restart_strategy} for WaitOnMetricRestarter.")
166+
sys.exit(1)
167+
168+
super().__init__(namespace_and_instruction_args, service, check_function)
169+
170+
def _check_between_each_restart(self, instance_index: int) -> bool:
171+
self._wait_for_pod_to_satisfy_condition(instance_index)
172+
if instance_index == self.namespace_and_instruction_args.size() - 1:
173+
# Last instance, no need to prompt the user about the next restart.
174+
return True
175+
return wait_until_y_or_n(f"Do you want to restart the next pod?")
176+
177+
def _check_all_only_after_last_restart(self, instance_index: int) -> bool:
178+
# Restart all nodes without waiting for confirmation.
179+
if instance_index < self.namespace_and_instruction_args.size() - 1:
180+
return True
181+
182+
# After the last node has been restarted, wait for all pods to satisfy the condition.
183+
for instance_index in range(self.namespace_and_instruction_args.size()):
184+
self._wait_for_pod_to_satisfy_condition(instance_index)
185+
return True
186+
187+
def _wait_for_pod_to_satisfy_condition(self, instance_index: int) -> bool:
188+
# The sleep is to prevent the case where we get the pod name of the old pod we just deleted
189+
# instead of the new one.
190+
# TODO(guy.f): Verify this is not the name of the old pod some other way.
191+
sleep(2)
192+
pod_names = WaitOnMetricRestarter._wait_for_pods_to_be_ready(
193+
self.namespace_and_instruction_args.get_namespace(instance_index),
194+
self.namespace_and_instruction_args.get_cluster(instance_index),
195+
self.service,
196+
)
197+
if pod_names is None:
198+
return False
199+
200+
for pod_name in pod_names:
201+
for metric in self.metrics:
202+
metric_condition_gater = MetricConditionGater(
203+
metric,
204+
self.namespace_and_instruction_args.get_namespace(instance_index),
205+
self.namespace_and_instruction_args.get_cluster(instance_index),
206+
pod_name,
207+
self.metrics_port,
208+
)
209+
metric_condition_gater.gate()
210+
211+
@staticmethod
212+
def _wait_for_pods_to_be_ready(
213+
namespace: str,
214+
cluster: Optional[str],
215+
service: Service,
216+
wait_timeout: int = 180,
217+
num_retry: int = 3,
218+
refresh_delay_sec: int = 3,
219+
) -> Optional[list[str]]:
220+
"""
221+
Wait for pods to be in ready mode as reported by Kubernetes.
222+
"""
223+
224+
for i in range(num_retry):
225+
pods = _get_pod_names(namespace, service, 0, cluster)
226+
if pods:
227+
for pod in pods:
228+
print_colored(
229+
f"Waiting for pod {pod} to be ready... (timeout set to {wait_timeout}s)"
230+
)
231+
kubectl_args = [
232+
"wait",
233+
"--for=condition=ready",
234+
f"pod/{pod}",
235+
"--timeout",
236+
f"{wait_timeout}s",
237+
]
238+
kubectl_args.extend(get_namespace_args(namespace, cluster))
239+
result = run_kubectl_command(kubectl_args, capture_output=False)
240+
241+
if result.returncode != 0:
242+
print_colored(
243+
f"Timed out waiting for pod {pod} to be ready: {result.stderr}, retrying... (attempt {i + 1}/{num_retry})",
244+
Colors.YELLOW,
245+
)
246+
break
247+
return pods
248+
else:
249+
print_colored(
250+
f"Could not get pod names for service {service.pod_name}, retrying... (attempt {i + 1}/{num_retry})",
251+
Colors.YELLOW,
252+
)
253+
sleep(refresh_delay_sec)
254+
255+
print_error(f"Pods for service {service.pod_name} are not ready after {num_retry} attempts")
256+
return None

0 commit comments

Comments
 (0)