Skip to content

Commit 8f797c1

Browse files
committed
Fix .processes so that it reuses the same objects
1 parent 1ddfb21 commit 8f797c1

File tree

2 files changed

+65
-18
lines changed

2 files changed

+65
-18
lines changed

simvue/executor.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import pathlib
2323
import time
2424
import typing
25-
from simvue.api.objects.alert.fetch import Alert
25+
from simvue.api.objects.alert.user import UserAlert
2626

2727
if typing.TYPE_CHECKING:
2828
import simvue
@@ -114,6 +114,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
114114
self._alert_ids: dict[str, str] = {}
115115
self.command_str: dict[str, str] = {}
116116
self._processes: dict[str, subprocess.Popen] = {}
117+
self._all_processes: list[psutil.Process] = []
117118

118119
def std_out(self, process_id: str) -> str | None:
119120
if not os.path.exists(out_file := f"{self._runner.name}_{process_id}.out"):
@@ -271,19 +272,48 @@ def processes(self) -> list[psutil.Process]:
271272
if not self._processes:
272273
return []
273274

274-
_all_processes: list[psutil.Process] = []
275+
_current_processes: list[psutil.Process] = []
275276

276277
for process in self._processes.values():
277278
with contextlib.suppress(psutil.NoSuchProcess):
278-
_all_processes.append(psutil.Process(process.pid))
279+
_current_processes.append(psutil.Process(process.pid))
279280

280281
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
281-
for process in _all_processes:
282+
for process in _current_processes:
282283
for child in process.children(recursive=True):
283-
if child not in _all_processes:
284-
_all_processes.append(child)
284+
if child not in _current_processes:
285+
_current_processes.append(child)
285286

286-
return list(set(_all_processes))
287+
_current_pids = set([_process.pid for _process in _current_processes])
288+
_previous_pids = set([_process.pid for _process in self._all_processes])
289+
290+
# Find processes which used to exist, which are no longer running
291+
_expired_process_pids = _previous_pids - _current_pids
292+
293+
# Remove these processes from list of all processes
294+
self._all_processes = [
295+
_process
296+
for _process in self._all_processes
297+
if _process.pid not in _expired_process_pids
298+
]
299+
300+
# Find new processes
301+
_new_process_pids = _current_pids - _previous_pids
302+
_new_processes = [
303+
_process
304+
for _process in _current_processes
305+
if _process.pid in _new_process_pids
306+
]
307+
308+
# Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate
309+
if _new_processes:
310+
[_process.cpu_percent() for _process in _new_processes]
311+
time.sleep(0.1)
312+
313+
# Add these to the list of all processes
314+
self._all_processes += _new_processes
315+
316+
return self._all_processes
287317

288318
@property
289319
def success(self) -> int:
@@ -346,7 +376,7 @@ def _update_alerts(self) -> None:
346376
# We don't want to override the user's setting for the alert status
347377
# This is so that if a process incorrectly reports its return code,
348378
# the user can manually set the correct status depending on logs etc.
349-
_alert = Alert(identifier=self._alert_ids[proc_id])
379+
_alert = UserAlert(identifier=self._alert_ids[proc_id])
350380
_is_set = _alert.get_status(run_id=self._runner._id)
351381

352382
if process.returncode != 0:

simvue/run.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,11 @@ def processes(self) -> list[psutil.Process]:
297297
return process_list
298298

299299
process_list += [self._parent_process]
300-
301-
# Attach child processes relating to the process set by set_pid
302-
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
303-
for child in self._parent_process.children(recursive=True):
304-
if child not in process_list:
305-
process_list.append(child)
300+
process_list += self._child_processes
306301

307302
return list(set(process_list))
308303

309-
def _get_sysinfo(self) -> dict[str, typing.Any]:
304+
def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]:
310305
"""Retrieve system administration
311306
312307
Parameters
@@ -320,7 +315,7 @@ def _get_sysinfo(self) -> dict[str, typing.Any]:
320315
retrieved system specifications
321316
"""
322317
processes = self.processes
323-
cpu = get_process_cpu(processes, interval=0.1)
318+
cpu = get_process_cpu(processes, interval=interval)
324319
memory = get_process_memory(processes)
325320
gpu = get_gpu_metrics(processes)
326321
data: dict[str, typing.Any] = {}
@@ -359,7 +354,9 @@ def _heartbeat(
359354
last_res_metric_call = time.time()
360355

361356
if self._resources_metrics_interval:
362-
self._add_metrics_to_dispatch(self._get_sysinfo(), join_on_fail=False)
357+
self._add_metrics_to_dispatch(
358+
self._get_sysinfo(interval=1), join_on_fail=False
359+
)
363360

364361
while not heartbeat_trigger.is_set():
365362
time.sleep(0.1)
@@ -490,6 +487,9 @@ def _start(self, reconnect: bool = False) -> bool:
490487
self._pid = os.getpid()
491488

492489
self._parent_process = psutil.Process(self._pid) if self._pid else None
490+
self._child_processes = (
491+
self._get_child_processes() if self._parent_process else None
492+
)
493493

494494
self._shutdown_event = threading.Event()
495495
self._heartbeat_termination_trigger = threading.Event()
@@ -904,6 +904,16 @@ def kill_all_processes(self) -> None:
904904
)
905905
self._executor.kill_all()
906906

907+
def _get_child_processes(self) -> list[psutil.Process]:
908+
_process_list = []
909+
# Attach child processes relating to the process set by set_pid
910+
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
911+
for child in self._parent_process.children(recursive=True):
912+
if child not in _process_list:
913+
_process_list.append(child)
914+
915+
return list(set(_process_list))
916+
907917
@property
908918
def executor(self) -> Executor:
909919
"""Return the executor for this run"""
@@ -959,6 +969,13 @@ def set_pid(self, pid: int) -> None:
959969
"""
960970
self._pid = pid
961971
self._parent_process = psutil.Process(self._pid)
972+
self._child_processes = self._get_child_processes()
973+
# Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate
974+
[
975+
_process.cpu_percent()
976+
for _process in self._child_processes + [self._parent_process]
977+
]
978+
time.sleep(0.1)
962979

963980
@skip_if_failed("_aborted", "_suppress_errors", False)
964981
@pydantic.validate_call
@@ -1962,7 +1979,7 @@ def log_alert(
19621979
self._error("Please specify alert to update either by ID or by name.")
19631980
return False
19641981

1965-
if self._user_config.run.mode == "offline":
1982+
if name and self._user_config.run.mode == "offline":
19661983
self._error(
19671984
"Cannot retrieve alerts based on names in offline mode - please use IDs instead."
19681985
)

0 commit comments

Comments
 (0)