Skip to content

Commit 64ff8a5

Browse files
authored
Merge pull request #740 from simvue-io/wk9874/2.0.0_a3
Bug fixes for 2.0.0a3
2 parents ac71d0c + 2c4b749 commit 64ff8a5

File tree

5 files changed

+101
-26
lines changed

5 files changed

+101
-26
lines changed

simvue/api/objects/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def __init__(
171171
{
172172
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}",
173173
"User-Agent": _user_agent or f"Simvue Python client {__version__}",
174+
"Accept-Encoding": "gzip",
174175
}
175176
if not self._offline
176177
else {}

simvue/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ def __init__(
8484
logger.warning(f"No {label} specified")
8585

8686
self._headers: dict[str, str] = {
87-
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}"
87+
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}",
88+
"Accept-Encoding": "gzip",
8889
}
8990

9091
@prettify_pydantic

simvue/executor.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import pathlib
2323
import time
2424
import typing
25+
from simvue.api.objects.alert.user import UserAlert
2526

2627
if typing.TYPE_CHECKING:
2728
import simvue
@@ -113,6 +114,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
113114
self._alert_ids: dict[str, str] = {}
114115
self.command_str: dict[str, str] = {}
115116
self._processes: dict[str, subprocess.Popen] = {}
117+
self._all_processes: list[psutil.Process] = []
116118

117119
def std_out(self, process_id: str) -> str | None:
118120
if not os.path.exists(out_file := f"{self._runner.name}_{process_id}.out"):
@@ -270,19 +272,48 @@ def processes(self) -> list[psutil.Process]:
270272
if not self._processes:
271273
return []
272274

273-
_all_processes: list[psutil.Process] = []
275+
_current_processes: list[psutil.Process] = []
274276

275277
for process in self._processes.values():
276278
with contextlib.suppress(psutil.NoSuchProcess):
277-
_all_processes.append(psutil.Process(process.pid))
279+
_current_processes.append(psutil.Process(process.pid))
278280

279281
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
280-
for process in _all_processes:
282+
for process in _current_processes:
281283
for child in process.children(recursive=True):
282-
if child not in _all_processes:
283-
_all_processes.append(child)
284+
if child not in _current_processes:
285+
_current_processes.append(child)
284286

285-
return list(set(_all_processes))
287+
_current_pids = set([_process.pid for _process in _current_processes])
288+
_previous_pids = set([_process.pid for _process in self._all_processes])
289+
290+
# Find processes which used to exist, which are no longer running
291+
_expired_process_pids = _previous_pids - _current_pids
292+
293+
# Remove these processes from list of all processes
294+
self._all_processes = [
295+
_process
296+
for _process in self._all_processes
297+
if _process.pid not in _expired_process_pids
298+
]
299+
300+
# Find new processes
301+
_new_process_pids = _current_pids - _previous_pids
302+
_new_processes = [
303+
_process
304+
for _process in _current_processes
305+
if _process.pid in _new_process_pids
306+
]
307+
308+
# Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate
309+
if _new_processes:
310+
[_process.cpu_percent() for _process in _new_processes]
311+
time.sleep(0.1)
312+
313+
# Add these to the list of all processes
314+
self._all_processes += _new_processes
315+
316+
return self._all_processes
286317

287318
@property
288319
def success(self) -> int:
@@ -342,17 +373,26 @@ def _update_alerts(self) -> None:
342373
# allowing the executor to finish (and as such the run instance to exit)
343374
_wait_limit: float = 1
344375
for proc_id, process in self._processes.items():
376+
# We don't want to override the user's setting for the alert status
377+
# This is so that if a process incorrectly reports its return code,
378+
# the user can manually set the correct status depending on logs etc.
379+
_alert = UserAlert(identifier=self._alert_ids[proc_id])
380+
_is_set = _alert.get_status(run_id=self._runner._id)
381+
345382
if process.returncode != 0:
346383
# If the process fails then purge the dispatcher event queue
347384
# and ensure that the stderr event is sent before the run closes
348385
if self._runner._dispatcher:
349386
self._runner._dispatcher.purge()
350-
351-
self._runner.log_alert(
352-
identifier=self._alert_ids[proc_id], state="critical"
353-
)
387+
if not _is_set:
388+
self._runner.log_alert(
389+
identifier=self._alert_ids[proc_id], state="critical"
390+
)
354391
else:
355-
self._runner.log_alert(identifier=self._alert_ids[proc_id], state="ok")
392+
if not _is_set:
393+
self._runner.log_alert(
394+
identifier=self._alert_ids[proc_id], state="ok"
395+
)
356396

357397
_current_time: float = 0
358398
while (

simvue/run.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ def __init__(
186186
)
187187
self._headers: dict[str, str] = (
188188
{
189-
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}"
189+
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}",
190+
"Accept-Encoding": "gzip",
190191
}
191192
if mode != "offline"
192193
else {}
@@ -313,16 +314,11 @@ def processes(self) -> list[psutil.Process]:
313314
return process_list
314315

315316
process_list += [self._parent_process]
316-
317-
# Attach child processes relating to the process set by set_pid
318-
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
319-
for child in self._parent_process.children(recursive=True):
320-
if child not in process_list:
321-
process_list.append(child)
317+
process_list += self._child_processes
322318

323319
return list(set(process_list))
324320

325-
def _get_sysinfo(self) -> dict[str, typing.Any]:
321+
def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]:
326322
"""Retrieve system administration
327323
328324
Parameters
@@ -336,7 +332,7 @@ def _get_sysinfo(self) -> dict[str, typing.Any]:
336332
retrieved system specifications
337333
"""
338334
processes = self.processes
339-
cpu = get_process_cpu(processes, interval=0.1)
335+
cpu = get_process_cpu(processes, interval=interval)
340336
memory = get_process_memory(processes)
341337
gpu = get_gpu_metrics(processes)
342338
data: dict[str, typing.Any] = {}
@@ -375,7 +371,10 @@ def _heartbeat(
375371
last_res_metric_call = time.time()
376372

377373
if self._resources_metrics_interval:
378-
self._add_metrics_to_dispatch(self._get_sysinfo(), join_on_fail=False)
374+
self._add_metrics_to_dispatch(
375+
self._get_sysinfo(interval=1), join_on_fail=False, step=0
376+
)
377+
res_step = 1
379378

380379
while not heartbeat_trigger.is_set():
381380
time.sleep(0.1)
@@ -390,9 +389,10 @@ def _heartbeat(
390389
# join would be called on this thread and a thread cannot
391390
# join itself!
392391
self._add_metrics_to_dispatch(
393-
self._get_sysinfo(), join_on_fail=False
392+
self._get_sysinfo(), join_on_fail=False, step=res_step
394393
)
395394
last_res_metric_call = res_time
395+
res_step += 1
396396

397397
if time.time() - last_heartbeat < self._heartbeat_interval:
398398
continue
@@ -506,6 +506,9 @@ def _start(self, reconnect: bool = False) -> bool:
506506
self._pid = os.getpid()
507507

508508
self._parent_process = psutil.Process(self._pid) if self._pid else None
509+
self._child_processes = (
510+
self._get_child_processes() if self._parent_process else None
511+
)
509512

510513
self._shutdown_event = threading.Event()
511514
self._heartbeat_termination_trigger = threading.Event()
@@ -920,6 +923,16 @@ def kill_all_processes(self) -> None:
920923
)
921924
self._executor.kill_all()
922925

926+
def _get_child_processes(self) -> list[psutil.Process]:
927+
_process_list = []
928+
# Attach child processes relating to the process set by set_pid
929+
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
930+
for child in self._parent_process.children(recursive=True):
931+
if child not in _process_list:
932+
_process_list.append(child)
933+
934+
return list(set(_process_list))
935+
923936
@property
924937
def executor(self) -> Executor:
925938
"""Return the executor for this run"""
@@ -975,6 +988,13 @@ def set_pid(self, pid: int) -> None:
975988
"""
976989
self._pid = pid
977990
self._parent_process = psutil.Process(self._pid)
991+
self._child_processes = self._get_child_processes()
992+
# Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate
993+
[
994+
_process.cpu_percent()
995+
for _process in self._child_processes + [self._parent_process]
996+
]
997+
time.sleep(0.1)
978998

979999
@skip_if_failed("_aborted", "_suppress_errors", False)
9801000
@pydantic.validate_call
@@ -1686,6 +1706,11 @@ def add_alerts(
16861706
names = names or []
16871707

16881708
if names and not ids:
1709+
if self._user_config.run.mode == "offline":
1710+
self._error(
1711+
"Cannot retrieve alerts based on names in offline mode - please use IDs instead."
1712+
)
1713+
return False
16891714
try:
16901715
if alerts := Alert.get(offline=self._user_config.run.mode == "offline"):
16911716
ids += [id for id, alert in alerts if alert.name in names]
@@ -1986,6 +2011,12 @@ def log_alert(
19862011
self._error("Please specify alert to update either by ID or by name.")
19872012
return False
19882013

2014+
if name and self._user_config.run.mode == "offline":
2015+
self._error(
2016+
"Cannot retrieve alerts based on names in offline mode - please use IDs instead."
2017+
)
2018+
return False
2019+
19892020
if name:
19902021
try:
19912022
if alerts := Alert.get(offline=self._user_config.run.mode == "offline"):

simvue/sender.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from concurrent.futures import ThreadPoolExecutor
1212
import threading
1313
import requests
14+
import psutil
1415
from simvue.config.user import SimvueConfiguration
1516

1617
import simvue.api.objects
@@ -167,13 +168,14 @@ def sender(
167168
cache_dir = cache_dir or _user_config.offline.cache
168169

169170
cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True)
171+
_lock_path = cache_dir.joinpath("sender.lock")
170172

171173
# Check that no other sender is already currently running...
172-
if cache_dir.joinpath("sender.lock").exists():
174+
if _lock_path.exists() and psutil.pid_exists(int(_lock_path.read_text())):
173175
raise RuntimeError("A sender is already running for this cache!")
174176

175177
# Create lock file to prevent other senders running while this one isn't finished
176-
cache_dir.joinpath("sender.lock").touch()
178+
_lock_path.write_text(str(psutil.Process().pid))
177179

178180
_id_mapping: dict[str, str] = {
179181
file_path.name.split(".")[0]: file_path.read_text()
@@ -233,5 +235,5 @@ def sender(
233235
_heartbeat_files,
234236
)
235237
# Remove lock file to allow another sender to start in the future
236-
cache_dir.joinpath("sender.lock").unlink()
238+
_lock_path.unlink()
237239
return _id_mapping

0 commit comments

Comments
 (0)