@@ -222,10 +222,10 @@ class MetricConditionGater:
222222 def __init__ (
223223 self ,
224224 metric_name : str ,
225+ namespace : str ,
226+ cluster : Optional [str ],
225227 pod : str ,
226228 metrics_port : int ,
227- namespace : str ,
228- cluster : Optional [str ] = None ,
229229 refresh_interval_seconds : int = 3 ,
230230 ):
231231 self .metric_name = metric_name
@@ -263,7 +263,7 @@ def _get_metrics(self) -> str:
263263 )
264264 sleep (self .refresh_interval_seconds )
265265
266- def _poll_condition_met (self , metric_value_condition : Callable [[Any ], bool ]):
266+ def _poll_until_condition_met (self , metric_value_condition : Callable [[Any ], bool ]):
267267 """Poll metrics until the condition is met for the metric."""
268268 while True :
269269 metrics = self ._get_metrics ()
@@ -305,6 +305,18 @@ def gate(self, metric_value_condition: Callable[[Any], bool]):
305305 cmd .extend (get_namespace_args (self .namespace , self .cluster ))
306306
307307 pf_process = None
308+
309+ def _terminate_port_forward_process (pf_process : subprocess .Popen ):
310+ if pf_process and pf_process .poll () is None :
311+ print_colored (f"Terminating kubectl port-forward process (PID: { pf_process .pid } )" )
312+ pf_process .terminate ()
313+ try :
314+ pf_process .wait (timeout = 5 )
315+ except subprocess .TimeoutExpired :
316+ print_colored ("Force killing kubectl port-forward process" )
317+ pf_process .kill ()
318+ pf_process .wait ()
319+
308320 try :
309321 pf_process = subprocess .Popen (cmd , stdout = subprocess .DEVNULL , stderr = subprocess .DEVNULL )
310322 print ("Waiting for forwarding to start" )
@@ -315,36 +327,16 @@ def gate(self, metric_value_condition: Callable[[Any], bool]):
315327
316328 # Set up signal handler to ensure forwarding subprocess is terminated on interruption
317329 def signal_handler (signum , frame ):
318- if pf_process and pf_process .poll () is None :
319- print_colored (
320- f"Terminating kubectl port-forward process (PID: { pf_process .pid } )" ,
321- Colors .RED ,
322- )
323- pf_process .terminate ()
324- try :
325- pf_process .wait (timeout = 5 )
326- except subprocess .TimeoutExpired :
327- print_colored ("Force killing kubectl port-forward process" , Colors .RED )
328- pf_process .kill ()
329- pf_process .wait ()
330+ _terminate_port_forward_process (pf_process )
330331 sys .exit (0 )
331332
332333 signal .signal (signal .SIGINT , signal_handler )
333334 signal .signal (signal .SIGTERM , signal_handler )
334335
335- self ._poll_condition_met (metric_value_condition )
336+ self ._poll_until_condition_met (metric_value_condition )
336337
337338 finally :
338- # Ensure subprocess is always terminated
339- if pf_process and pf_process .poll () is None :
340- print_colored (f"Terminating kubectl port-forward process (PID: { pf_process .pid } )" )
341- pf_process .terminate ()
342- try :
343- pf_process .wait (timeout = 5 )
344- except subprocess .TimeoutExpired :
345- print_colored ("Force killing kubectl port-forward process" )
346- pf_process .kill ()
347- pf_process .wait ()
339+ _terminate_port_forward_process (pf_process )
348340
349341
350342class NamespaceAndInstructionArgs :
@@ -409,6 +401,20 @@ def get_context_list_from_args(
409401 ]
410402
411403
404+ def _get_pod_names (
405+ namespace : str , service : Service , index : int , cluster : Optional [str ] = None
406+ ) -> list [str ]:
407+ kubectl_args = [
408+ "get" ,
409+ "pods" ,
410+ "-o" ,
411+ "name" ,
412+ ]
413+ kubectl_args .extend (get_namespace_args (namespace , cluster ))
414+ pods = run_kubectl_command (kubectl_args , capture_output = True ).stdout .splitlines ()
415+ return [pod .split ("/" )[1 ] for pod in pods if pod .startswith (f"pod/{ service .pod_name } " )]
416+
417+
412418class ServiceRestarter (ABC ):
413419 """Abstract class for restarting service instances."""
414420
@@ -425,18 +431,7 @@ def _restart_pod(
425431 namespace : str , service : Service , index : int , cluster : Optional [str ] = None
426432 ) -> None :
427433 """Restart pod by deleting it"""
428- # Get the list of pods (one string per line).
429- kubectl_args = [
430- "get" ,
431- "pods" ,
432- "-o" ,
433- "name" ,
434- ]
435- kubectl_args .extend (get_namespace_args (namespace , cluster ))
436- pods = run_kubectl_command (kubectl_args , capture_output = True ).stdout .splitlines ()
437-
438- # Filter the list of pods to only include the ones that match the service and extract the pod name.
439- pods = [pod .split ("/" )[1 ] for pod in pods if pod .startswith (f"pod/{ service .pod_name } " )]
434+ pods = _get_pod_names (namespace , service , index , cluster )
440435
441436 if not pods :
442437 print_error (f"Could not find pods for service { service .pod_name } ." )
@@ -522,6 +517,66 @@ def restart_service(self, instance_index: int) -> bool:
522517 return self .check_between_restarts (instance_index )
523518
524519
520+ class WaitOnMetrticOneByOneRestarter (ChecksBetweenRestarts ):
521+ def __init__ (
522+ self ,
523+ namespace_and_instruction_args : NamespaceAndInstructionArgs ,
524+ service : Service ,
525+ metric_name : str ,
526+ metrics_port : int ,
527+ metric_value_condition : Callable [[Any ], bool ],
528+ ):
529+ def _check_between_restarts (instance_index : int ) -> bool :
530+ if not WaitOnMetrticOneByOneRestarter ._wait_for_pods_to_be_ready (service ):
531+ return False
532+ metric_condition_gater = MetricConditionGater (
533+ metric_name ,
534+ namespace_and_instruction_args .get_namespace (instance_index ),
535+ namespace_and_instruction_args .get_cluster (instance_index ),
536+ service .pod_name ,
537+ metrics_port ,
538+ )
539+ metric_condition_gater .gate (metric_value_condition )
540+ if instance_index == namespace_and_instruction_args .size () - 1 :
541+ return True
542+ return wait_until_y_or_n (f"Do you want to restart the next pod?" )
543+
544+ super ().__init__ (namespace_and_instruction_args , service , _check_between_restarts )
545+
546+ @staticmethod
547+ def _wait_for_pods_to_be_ready (
548+ service : Service , wait_timeout : int = 180 , num_retry : int = 3 , refresh_delay_sec : int = 3
549+ ) -> bool :
550+ for i in range (num_retry ):
551+ pods = _get_pod_names (service )
552+ if pods :
553+ for pod in pods :
554+ print_colored (
555+ f"Waiting for pod { pod } to be ready... (timeout: { wait_timeout } s)"
556+ )
557+ result = run_kubectl_command (
558+ ["wait" , "--for=condition=ready" , "pod/" , pod , "--timeout" , "{timeout}s" ],
559+ capture_output = False ,
560+ )
561+
562+ if result .returncode != 0 :
563+ print_colored (
564+ f"Timed out waiting for pod { pod } to be ready: { result .stderr } , retrying... (attempt { i + 1 } /{ num_retry } )" ,
565+ Colors .YELLOW ,
566+ )
567+ break
568+ return True
569+ else :
570+ print_colored (
571+ f"Could not get pod names for service { service .pod_name } , retrying... (attempt { i + 1 } /{ num_retry } )" ,
572+ Colors .YELLOW ,
573+ )
574+ sleep (refresh_delay_sec )
575+
576+ print_error (f"Pods for service { service .pod_name } are not ready after { num_retry } attempts" )
577+ return False
578+
579+
525580class NoOpServiceRestarter (ServiceRestarter ):
526581 """No-op service restarter."""
527582
0 commit comments