Skip to content

Commit 3d2a34a

Browse files
scripts: A class which gates progress on metrics satisifying a condition
1 parent af5b941 commit 3d2a34a

File tree

1 file changed

+129
-1
lines changed

1 file changed

+129
-1
lines changed

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
from abc import ABC, abstractmethod
44
import argparse
55
import json
6+
import signal
7+
import socket
68
import subprocess
79
import sys
810
from enum import Enum
11+
from time import sleep
912
from typing import Any, Callable, Optional
1013

1114
import tempfile
1215
import urllib.error
1316
import urllib.parse
1417
import urllib.request
18+
from prometheus_client.parser import text_string_to_metric_families
1519
import yaml
1620
from difflib import unified_diff
1721

@@ -212,6 +216,130 @@ def validate_arguments(args: argparse.Namespace) -> None:
212216
sys.exit(1)
213217

214218

219+
class MetricConditionGater:
220+
"""Gates progress on a metric satisfying a condition."""
221+
222+
def __init__(
223+
self, metric_name: str, pod: str, metrics_port: int, refresh_interval_seconds: int = 3
224+
):
225+
self.metric_name = metric_name
226+
self.local_port = self._get_free_port()
227+
self.pod = pod
228+
self.metrics_port = metrics_port
229+
self.refresh_interval_seconds = refresh_interval_seconds
230+
231+
def _get_free_port():
232+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
233+
s.bind(("", 0))
234+
return s.getsockname()[1]
235+
236+
def _get_metrics(self) -> str:
237+
while True:
238+
try:
239+
with urllib.request.urlopen(
240+
f"http://localhost:{self.local_port}/monitoring/metrics"
241+
) as response:
242+
if response.status == 200:
243+
return response.read().decode("utf-8")
244+
else:
245+
print_colored(
246+
f"Failed to get metrics from for pod {self.pod}, attempt {attempt + 1}: {response.status}"
247+
)
248+
except urllib.error.URLError as e:
249+
print_colored(
250+
f"Failed to get metrics from for pod {self.pod}, attempt {attempt + 1}: {e}"
251+
)
252+
print_colored(
253+
f"Waiting {self.refresh_interval_seconds} seconds to retry getting metrics...",
254+
Colors.YELLOW,
255+
)
256+
sleep(self.refresh_interval_seconds)
257+
258+
def _poll_condition_met(self, metric_value_condition: Callable[[Any], bool]):
259+
"""Poll metrics until the condition is met for the metric."""
260+
while True:
261+
metrics = self._get_metrics()
262+
assert metrics is not None, f"Failed to get metrics from for pod {self.pod}"
263+
264+
metric_families = text_string_to_metric_families(metrics)
265+
val = None
266+
for metric_family in metric_families:
267+
if metric_family.name == self.metric_name:
268+
val = metric_family.samples[0].value
269+
break
270+
271+
if val is None:
272+
print_colored(
273+
f"Metric '{self.metric_name}' not found in pod {self.pod}. Assuming the node is not ready."
274+
)
275+
else:
276+
if metric_value_condition(val):
277+
print_colored(
278+
f"Metric {self.metric_name} condition met (value={val}) has not reached {storage_required_height} yet, continuing to wait."
279+
)
280+
return
281+
else:
282+
print_colored(
283+
f"Metric {self.metric_name} condition not met (value={val}). Continuing to wait."
284+
)
285+
286+
sleep(self.refresh_interval_seconds)
287+
288+
def gate(self, metric_value_condition: Callable[[Any], bool]):
289+
"""Wait until the nodes metrics satisfy the condition."""
290+
# Start kubectl port forwarding to the node and keep it running in the background so we can access the metrics.
291+
cmd = [
292+
"kubectl",
293+
"port-forward",
294+
f"pod/{pod}",
295+
f"{self.local_port}:{self.metrics_port}",
296+
]
297+
298+
print(cmd)
299+
300+
pf_process = None
301+
try:
302+
pf_process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
303+
print("Waiting for forwarding to start")
304+
# Give the forwarding time to start.
305+
# TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
306+
sleep(3)
307+
print("Forwarding started")
308+
309+
# Set up signal handler to ensure forwarding subprocess is terminated on interruption
310+
def signal_handler(signum, frame):
311+
if pf_process and pf_process.poll() is None:
312+
print_colored(
313+
f"Terminating kubectl port-forward process (PID: {pf_process.pid})",
314+
Colors.RED,
315+
)
316+
pf_process.terminate()
317+
try:
318+
pf_process.wait(timeout=5)
319+
except subprocess.TimeoutExpired:
320+
print_colored("Force killing kubectl port-forward process", Colors.RED)
321+
pf_process.kill()
322+
pf_process.wait()
323+
sys.exit(0)
324+
325+
signal.signal(signal.SIGINT, signal_handler)
326+
signal.signal(signal.SIGTERM, signal_handler)
327+
328+
self._poll_condition_met(metric_value_condition)
329+
330+
finally:
331+
# Ensure subprocess is always terminated
332+
if pf_process and pf_process.poll() is None:
333+
print_colored(f"Terminating kubectl port-forward process (PID: {pf_process.pid})")
334+
pf_process.terminate()
335+
try:
336+
pf_process.wait(timeout=5)
337+
except subprocess.TimeoutExpired:
338+
print_colored("Force killing kubectl port-forward process")
339+
pf_process.kill()
340+
pf_process.wait()
341+
342+
215343
class NamespaceAndInstructionArgs:
216344
def __init__(
217345
self,
@@ -381,7 +509,7 @@ def restart_service(self, instance_index: int) -> bool:
381509
# self.namespace_list[instance_index],
382510
# self.service,
383511
# instance_index,
384-
# self.cluster_list[instance_index],
512+
# self.cluster_list[instance_index] if self.cluster_list else None,
385513
# )
386514
instructions = self.namespace_and_instruction_args.get_instruction(instance_index)
387515
print_colored(f"Restarted pod {instance_index}.\n{instructions} ", Colors.YELLOW)

0 commit comments

Comments
 (0)