Skip to content

Commit b9abd90

Browse files
scripts: A class which gates progress on metrics satisifying a condition
1 parent ec0ac96 commit b9abd90

File tree

1 file changed

+128
-1
lines changed

1 file changed

+128
-1
lines changed

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
from abc import ABC, abstractmethod
44
import argparse
55
import json
6+
import signal
7+
import socket
68
import subprocess
79
import sys
810
from enum import Enum
11+
from time import sleep
912
from typing import Any, Callable, Optional
1013

1114
import tempfile
1215
import urllib.error
1316
import urllib.parse
1417
import urllib.request
18+
from prometheus_client.parser import text_string_to_metric_families
1519
import yaml
1620
from difflib import unified_diff
1721

@@ -212,6 +216,129 @@ def validate_arguments(args: argparse.Namespace) -> None:
212216
sys.exit(1)
213217

214218

219+
class MetricConditionGater:
220+
"""Gates progress on a metric satisfying a condition."""
221+
222+
def __init__(
223+
self,
224+
metric_name: str,
225+
pod: str,
226+
metrics_port: int,
227+
namespace: str,
228+
cluster: Optional[str] = None,
229+
refresh_interval_seconds: int = 3,
230+
):
231+
self.metric_name = metric_name
232+
self.local_port = self._get_free_port()
233+
self.namespace = namespace
234+
self.cluster = cluster
235+
self.pod = pod
236+
self.metrics_port = metrics_port
237+
self.refresh_interval_seconds = refresh_interval_seconds
238+
239+
def _get_free_port():
240+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
241+
s.bind(("", 0))
242+
return s.getsockname()[1]
243+
244+
def _get_metrics(self) -> str:
245+
while True:
246+
try:
247+
with urllib.request.urlopen(
248+
f"http://localhost:{self.local_port}/monitoring/metrics"
249+
) as response:
250+
if response.status == 200:
251+
return response.read().decode("utf-8")
252+
else:
253+
print_colored(
254+
f"Failed to get metrics from for pod {self.pod}, attempt {attempt + 1}: {response.status}"
255+
)
256+
except urllib.error.URLError as e:
257+
print_colored(
258+
f"Failed to get metrics from for pod {self.pod}, attempt {attempt + 1}: {e}"
259+
)
260+
print_colored(
261+
f"Waiting {self.refresh_interval_seconds} seconds to retry getting metrics...",
262+
Colors.YELLOW,
263+
)
264+
sleep(self.refresh_interval_seconds)
265+
266+
def _poll_condition_met(self, metric_value_condition: Callable[[Any], bool]):
267+
"""Poll metrics until the condition is met for the metric."""
268+
while True:
269+
metrics = self._get_metrics()
270+
assert metrics is not None, f"Failed to get metrics from for pod {self.pod}"
271+
272+
metric_families = text_string_to_metric_families(metrics)
273+
val = None
274+
for metric_family in metric_families:
275+
if metric_family.name == self.metric_name:
276+
val = metric_family.samples[0].value
277+
break
278+
279+
if val is None:
280+
print_colored(
281+
f"Metric '{self.metric_name}' not found in pod {self.pod}. Assuming the node is not ready."
282+
)
283+
else:
284+
if metric_value_condition(val):
285+
print_colored(
286+
f"Metric {self.metric_name} condition met (value={val}) has not reached {storage_required_height} yet, continuing to wait."
287+
)
288+
return
289+
else:
290+
print_colored(
291+
f"Metric {self.metric_name} condition not met (value={val}). Continuing to wait."
292+
)
293+
294+
sleep(self.refresh_interval_seconds)
295+
296+
def gate(self, metric_value_condition: Callable[[Any], bool]):
297+
"""Wait until the nodes metrics satisfy the condition."""
298+
# Start kubectl port forwarding to the node and keep it running in the background so we can access the metrics.
299+
cmd = [
300+
"kubectl",
301+
"port-forward",
302+
f"pod/{pod}",
303+
f"{self.local_port}:{self.metrics_port}",
304+
]
305+
cmd.extend(get_namespace_args(self.namespace, self.cluster))
306+
307+
pf_process = None
308+
309+
def _terminate_port_forward_process(pf_process: subprocess.Popen):
310+
if pf_process and pf_process.poll() is None:
311+
print_colored(f"Terminating kubectl port-forward process (PID: {pf_process.pid})")
312+
pf_process.terminate()
313+
try:
314+
pf_process.wait(timeout=5)
315+
except subprocess.TimeoutExpired:
316+
print_colored("Force killing kubectl port-forward process")
317+
pf_process.kill()
318+
pf_process.wait()
319+
320+
try:
321+
pf_process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
322+
print("Waiting for forwarding to start")
323+
# Give the forwarding time to start.
324+
# TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
325+
sleep(3)
326+
print("Forwarding started")
327+
328+
# Set up signal handler to ensure forwarding subprocess is terminated on interruption
329+
def signal_handler(signum, frame):
330+
_terminate_port_forward_process(pf_process)
331+
sys.exit(0)
332+
333+
signal.signal(signal.SIGINT, signal_handler)
334+
signal.signal(signal.SIGTERM, signal_handler)
335+
336+
self._poll_until_condition_met(metric_value_condition)
337+
338+
finally:
339+
_terminate_port_forward_process(pf_process)
340+
341+
215342
class NamespaceAndInstructionArgs:
216343
def __init__(
217344
self,
@@ -380,7 +507,7 @@ def restart_service(self, instance_index: int) -> bool:
380507
# self.namespace_list[instance_index],
381508
# self.service,
382509
# instance_index,
383-
# self.cluster_list[instance_index],
510+
# self.cluster_list[instance_index] if self.cluster_list else None,
384511
# )
385512
instructions = self.namespace_and_instruction_args.get_instruction(instance_index)
386513
print_colored(f"Restarted pod {instance_index}.\n{instructions} ", Colors.YELLOW)

0 commit comments

Comments
 (0)