Skip to content

Commit 5ea038b

Browse files
scripts: A class which gates progress on metrics satisifying a condition
1 parent 6d0f7dc commit 5ea038b

File tree

1 file changed

+139
-6
lines changed

1 file changed

+139
-6
lines changed

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 139 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
from abc import ABC, abstractmethod
44
import argparse
55
import json
6+
import signal
7+
import socket
68
import subprocess
79
import sys
810
from enum import Enum
11+
from time import sleep
912
from typing import Any, Callable, Optional
1013

1114
import tempfile
1215
import urllib.error
1316
import urllib.parse
1417
import urllib.request
18+
from prometheus_client.parser import text_string_to_metric_families
1519
import yaml
1620
from difflib import unified_diff
1721

@@ -212,6 +216,135 @@ def validate_arguments(args: argparse.Namespace) -> None:
212216
sys.exit(1)
213217

214218

219+
class MetricConditionGater:
220+
"""Gates progress on a metric satisfying a condition."""
221+
222+
def __init__(
223+
self,
224+
metric_name: str,
225+
namespace: str,
226+
cluster: Optional[str],
227+
pod: str,
228+
metrics_port: int,
229+
refresh_interval_seconds: int = 3,
230+
):
231+
self.metric_name = metric_name
232+
self.local_port = self._get_free_port()
233+
self.namespace = namespace
234+
self.cluster = cluster
235+
self.pod = pod
236+
self.metrics_port = metrics_port
237+
self.refresh_interval_seconds = refresh_interval_seconds
238+
239+
@staticmethod
240+
def _get_free_port():
241+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
242+
s.bind(("", 0))
243+
return s.getsockname()[1]
244+
245+
def _get_metrics(self) -> str:
246+
while True:
247+
try:
248+
with urllib.request.urlopen(
249+
f"http://localhost:{self.local_port}/monitoring/metrics"
250+
) as response:
251+
if response.status == 200:
252+
return response.read().decode("utf-8")
253+
else:
254+
print_colored(
255+
f"Failed to get metrics from for pod {self.pod}: {response.status}"
256+
)
257+
except urllib.error.URLError as e:
258+
print_colored(f"Failed to get metrics from for pod {self.pod}: {e}")
259+
print_colored(
260+
f"Waiting {self.refresh_interval_seconds} seconds to retry getting metrics...",
261+
Colors.YELLOW,
262+
)
263+
sleep(self.refresh_interval_seconds)
264+
265+
def _poll_until_condition_met(self, metric_value_condition: Callable[[Any], bool]):
266+
"""Poll metrics until the condition is met for the metric."""
267+
while True:
268+
metrics = self._get_metrics()
269+
assert metrics is not None, f"Failed to get metrics from for pod {self.pod}"
270+
271+
metric_families = text_string_to_metric_families(metrics)
272+
val = None
273+
for metric_family in metric_families:
274+
if metric_family.name == self.metric_name:
275+
if len(metric_family.samples) > 1:
276+
print_error(
277+
f"Multiple samples found for metric {self.metric_name}. Using the first one.",
278+
)
279+
val = metric_family.samples[0].value
280+
break
281+
282+
if val is None:
283+
print_colored(
284+
f"Metric '{self.metric_name}' not found in pod {self.pod}. Assuming the node is not ready."
285+
)
286+
elif metric_value_condition(val):
287+
print_colored(f"Metric {self.metric_name} condition met (value={val}).")
288+
return
289+
else:
290+
print_colored(
291+
f"Metric {self.metric_name} condition not met (value={val}). Continuing to wait."
292+
)
293+
294+
sleep(self.refresh_interval_seconds)
295+
296+
def gate(self, metric_value_condition: Callable[[Any], bool]):
297+
"""Wait until the nodes metrics satisfy the condition."""
298+
# Start kubectl port forwarding to the node and keep it running in the background so we can access the metrics.
299+
cmd = [
300+
"kubectl",
301+
"port-forward",
302+
f"pod/{self.pod}",
303+
f"{self.local_port}:{self.metrics_port}",
304+
]
305+
cmd.extend(get_namespace_args(self.namespace, self.cluster))
306+
307+
pf_process = None
308+
309+
def _terminate_port_forward_process(pf_process: subprocess.Popen):
310+
if pf_process and pf_process.poll() is None:
311+
print_colored(f"Terminating kubectl port-forward process (PID: {pf_process.pid})")
312+
pf_process.terminate()
313+
try:
314+
pf_process.wait(timeout=5)
315+
except subprocess.TimeoutExpired:
316+
print_colored("Force killing kubectl port-forward process")
317+
pf_process.kill()
318+
pf_process.wait()
319+
320+
try:
321+
pf_process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
322+
print("Waiting for forwarding to start")
323+
# Give the forwarding time to start.
324+
# TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
325+
sleep(3)
326+
assert (
327+
pf_process.poll() is None
328+
), f"Port forwarding process exited with code {pf_process.returncode}"
329+
330+
print(
331+
f"Forwarding started (from local port {self.local_port} to {self.pod}:{self.metrics_port})"
332+
)
333+
334+
# Set up signal handler to ensure forwarding subprocess is terminated on interruption
335+
def signal_handler(signum, frame):
336+
_terminate_port_forward_process(pf_process)
337+
sys.exit(0)
338+
339+
signal.signal(signal.SIGINT, signal_handler)
340+
signal.signal(signal.SIGTERM, signal_handler)
341+
342+
self._poll_until_condition_met(metric_value_condition)
343+
344+
finally:
345+
_terminate_port_forward_process(pf_process)
346+
347+
215348
class NamespaceAndInstructionArgs:
216349
def __init__(
217350
self,
@@ -376,12 +509,12 @@ def __init__(
376509

377510
def restart_service(self, instance_index: int) -> bool:
378511
"""Restart the instance one by one, running the use code in between each restart."""
379-
self._restart_pod(
380-
self.namespace_and_instruction_args.get_namespace(instance_index),
381-
self.service,
382-
instance_index,
383-
self.namespace_and_instruction_args.get_cluster(instance_index),
384-
)
512+
# self._restart_pod(
513+
# self.namespace_and_instruction_args.get_namespace(instance_index),
514+
# self.service,
515+
# instance_index,
516+
# self.namespace_and_instruction_args.get_cluster(instance_index),
517+
# )
385518
instructions = self.namespace_and_instruction_args.get_instruction(instance_index)
386519
print_colored(
387520
f"Restarted pod {instance_index}.\n{instructions if instructions else ''} ",

0 commit comments

Comments
 (0)