Skip to content

Commit f257625

Browse files
scripts: A class which gates progress on metrics satisifying a condition
1 parent 81430ef commit f257625

File tree

2 files changed

+161
-2
lines changed

2 files changed

+161
-2
lines changed

scripts/prod/update_config_and_restart_nodes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from update_config_and_restart_nodes_lib import (
99
ApolloArgsParserBuilder,
1010
Colors,
11-
RestartStrategy,
1211
NamespaceAndInstructionArgs,
1312
Service,
1413
ServiceRestarter,

scripts/prod/update_config_and_restart_nodes_lib.py

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
from abc import ABC, abstractmethod
44
import argparse
55
import json
6+
import signal
7+
import socket
68
import subprocess
79
import sys
810
from enum import Enum
11+
from time import sleep
912
from typing import Any, Callable, Optional
1013

1114
import tempfile
1215
import urllib.error
1316
import urllib.parse
1417
import urllib.request
18+
from prometheus_client.parser import text_string_to_metric_families
1519
import yaml
1620
from difflib import unified_diff
1721

@@ -212,6 +216,163 @@ def validate_arguments(args: argparse.Namespace) -> None:
212216
sys.exit(1)
213217

214218

219+
class MetricConditionGater:
220+
"""Gates progress on a metric satisfying a condition.
221+
222+
This class was meant to be used with counter/gauge metrics. It may not work properly with histogram metrics.
223+
"""
224+
225+
class MetricCondition:
226+
"""Represents a condition to check against metric values."""
227+
228+
def __init__(
229+
self,
230+
value_condition: Callable[[Any], bool],
231+
condition_description: Optional[str] = None,
232+
):
233+
self.value_condition = value_condition
234+
self.condition_description = condition_description
235+
236+
def __init__(
237+
self,
238+
metric_name: str,
239+
namespace: str,
240+
cluster: Optional[str],
241+
pod: str,
242+
metrics_port: int,
243+
refresh_interval_seconds: int = 3,
244+
):
245+
self.metric_name = metric_name
246+
self.local_port = self._get_free_port()
247+
self.namespace = namespace
248+
self.cluster = cluster
249+
self.pod = pod
250+
self.metrics_port = metrics_port
251+
self.refresh_interval_seconds = refresh_interval_seconds
252+
253+
@staticmethod
254+
def _get_free_port():
255+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
256+
s.bind(("", 0))
257+
return s.getsockname()[1]
258+
259+
def _get_metrics_raw_string(self) -> str:
260+
while True:
261+
try:
262+
with urllib.request.urlopen(
263+
f"http://localhost:{self.local_port}/monitoring/metrics"
264+
) as response:
265+
if response.status == 200:
266+
return response.read().decode("utf-8")
267+
else:
268+
print_colored(
269+
f"Failed to get metrics for pod {self.pod}: {response.status}"
270+
)
271+
except urllib.error.URLError as e:
272+
print_colored(f"Failed to get metrics for pod {self.pod}: {e}")
273+
print_colored(
274+
f"Waiting {self.refresh_interval_seconds} seconds to retry getting metrics...",
275+
Colors.YELLOW,
276+
)
277+
sleep(self.refresh_interval_seconds)
278+
279+
def _poll_until_condition_met(
280+
self, metric_value_condition: "MetricConditionGater.MetricCondition"
281+
):
282+
"""Poll metrics until the condition is met for the metric."""
283+
while True:
284+
metrics = self._get_metrics_raw_string()
285+
assert metrics is not None, f"Failed to get metrics from for pod {self.pod}"
286+
287+
metric_families = text_string_to_metric_families(metrics)
288+
val = None
289+
for metric_family in metric_families:
290+
if metric_family.name == self.metric_name:
291+
if len(metric_family.samples) > 1:
292+
print_error(
293+
f"Multiple samples found for metric {self.metric_name}. Using the first one.",
294+
)
295+
val = metric_family.samples[0].value
296+
break
297+
298+
if val is None:
299+
print_colored(
300+
f"Metric '{self.metric_name}' not found in pod {self.pod}. Assuming the node is not ready."
301+
)
302+
elif metric_value_condition.value_condition(val):
303+
condition_desc = (
304+
f"({metric_value_condition.condition_description}) "
305+
if metric_value_condition.condition_description is not None
306+
else ""
307+
)
308+
309+
print_colored(
310+
f"Metric {self.metric_name} condition {condition_desc}met (value={val})."
311+
)
312+
return
313+
else:
314+
print_colored(
315+
f"Metric {self.metric_name} condition {condition_desc}not met (value={val}). Continuing to wait."
316+
)
317+
318+
sleep(self.refresh_interval_seconds)
319+
320+
@staticmethod
321+
def _terminate_port_forward_process(pf_process: subprocess.Popen):
322+
if pf_process and pf_process.poll() is None:
323+
print_colored(f"Terminating kubectl port-forward process (PID: {pf_process.pid})")
324+
pf_process.terminate()
325+
try:
326+
pf_process.wait(timeout=5)
327+
except subprocess.TimeoutExpired:
328+
print_colored("Force killing kubectl port-forward process")
329+
pf_process.kill()
330+
pf_process.wait()
331+
332+
def gate(self, metric_value_condition: "MetricConditionGater.MetricCondition"):
333+
"""Wait until the nodes metrics satisfy the condition."""
334+
# This method:
335+
# 1. Starts kubectl port forwarding to the node and keep it running in the background so we can access the metrics.
336+
# 2. Calls _poll_until_condition_met.
337+
# 3. Terminates the port forwarding process when done or when interrupted.
338+
cmd = [
339+
"kubectl",
340+
"port-forward",
341+
f"pod/{self.pod}",
342+
f"{self.local_port}:{self.metrics_port}",
343+
]
344+
cmd.extend(get_namespace_args(self.namespace, self.cluster))
345+
346+
pf_process = None
347+
348+
try:
349+
pf_process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
350+
print("Waiting for forwarding to start")
351+
# Give the forwarding time to start.
352+
# TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
353+
sleep(3)
354+
assert (
355+
pf_process.poll() is None
356+
), f"Port forwarding process exited with code {pf_process.returncode}"
357+
358+
print(
359+
f"Forwarding started (from local port {self.local_port} to {self.pod}:{self.metrics_port})"
360+
)
361+
362+
# Set up signal handler to ensure forwarding subprocess is terminated on interruption
363+
def signal_handler(signum, frame):
364+
self._terminate_port_forward_process(pf_process)
365+
sys.exit(0)
366+
367+
signal.signal(signal.SIGINT, signal_handler)
368+
signal.signal(signal.SIGTERM, signal_handler)
369+
370+
self._poll_until_condition_met(metric_value_condition)
371+
372+
finally:
373+
self._terminate_port_forward_process(pf_process)
374+
375+
215376
class NamespaceAndInstructionArgs:
216377
def __init__(
217378
self,
@@ -326,7 +487,6 @@ def _restart_pod(
326487
@abstractmethod
327488
def restart_service(self, instance_index: int) -> bool:
328489
"""Restart service for a specific instance. If returns False, the restart process should be aborted."""
329-
pass
330490

331491
# from_restart_strategy is a static method that returns the appropriate ServiceRestarter based on the restart strategy.
332492
@staticmethod

0 commit comments

Comments
 (0)