Skip to content

Commit 700bfb0

Browse files
committed
Fix trigger not being set at correct time during process execution
1 parent 1109c58 commit 700bfb0

File tree

2 files changed

+62
-72
lines changed

2 files changed

+62
-72
lines changed

simvue/executor.py

Lines changed: 60 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
__date__ = "2023-11-15"
1212

1313
import logging
14-
import contextlib
1514
import multiprocessing.synchronize
1615
import sys
1716
import multiprocessing
@@ -36,8 +35,11 @@ def _execute_process(
3635
proc_id: str,
3736
command: typing.List[str],
3837
runner_name: str,
39-
environment: typing.Optional[typing.Dict[str, str]],
40-
) -> subprocess.Popen:
38+
completion_trigger: typing.Optional[multiprocessing.synchronize.Event] = None,
39+
environment: typing.Optional[typing.Dict[str, str]] = None,
40+
) -> tuple[subprocess.Popen, typing.Optional[multiprocessing.Process]]:
41+
process = None
42+
4143
with open(f"{runner_name}_{proc_id}.err", "w") as err:
4244
with open(f"{runner_name}_{proc_id}.out", "w") as out:
4345
_result = subprocess.Popen(
@@ -48,7 +50,24 @@ def _execute_process(
4850
env=environment,
4951
)
5052

51-
return _result
53+
# If the user has requested a completion trigger we need to create a
54+
# process which checks for when it has completed and then sets the trigger
55+
# once polling returns an exit code
56+
if completion_trigger:
57+
58+
def trigger_check(
59+
trigger_to_set: multiprocessing.synchronize.Event, process: subprocess.Popen
60+
) -> None:
61+
while process.poll() is None:
62+
time.sleep(1)
63+
trigger_to_set.set()
64+
65+
process = multiprocessing.Process(
66+
target=trigger_check, args=(completion_trigger, _result)
67+
)
68+
process.start()
69+
70+
return _result, process
5271

5372

5473
class Executor:
@@ -76,7 +95,9 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
7695
self._completion_triggers: dict[
7796
str, typing.Optional[multiprocessing.synchronize.Event]
7897
] = {}
79-
self._exit_codes: dict[str, int] = {}
98+
self._completion_processes: dict[
99+
str, typing.Optional[multiprocessing.Process]
100+
] = {}
80101
self._std_err: dict[str, str] = {}
81102
self._std_out: dict[str, str] = {}
82103
self._alert_ids: dict[str, str] = {}
@@ -198,41 +219,27 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
198219
self._completion_callbacks[identifier] = completion_callback
199220
self._completion_triggers[identifier] = completion_trigger
200221

201-
self._processes[identifier] = _execute_process(
202-
identifier, _command, self._runner.name, env
222+
self._processes[identifier], self._completion_processes[identifier] = (
223+
_execute_process(
224+
identifier, _command, self._runner.name, completion_trigger, env
225+
)
203226
)
204227

205228
self._alert_ids[identifier] = self._runner.create_alert(
206229
name=f"{identifier}_exit_status", source="user"
207230
)
208231

209-
@property
210-
def processes(self) -> list[psutil.Process]:
211-
"""Create an array containing a list of processes"""
212-
if not self._processes:
213-
return []
214-
215-
_all_processes: list[psutil.Process] = [
216-
psutil.Process(process.pid) for process in self._processes.values()
217-
]
218-
219-
with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
220-
for process in _all_processes:
221-
for child in process.children(recursive=True):
222-
if child not in _all_processes:
223-
_all_processes.append(child)
224-
225-
return list(set(_all_processes))
226-
227232
@property
228233
def success(self) -> int:
229234
"""Return whether all attached processes completed successfully"""
230-
return all(i == 0 for i in self._exit_codes.values())
235+
return all(i.returncode == 0 for i in self._processes.values())
231236

232237
@property
233238
def exit_status(self) -> int:
234239
"""Returns the first non-zero exit status if applicable"""
235-
_non_zero = [i for i in self._exit_codes.values() if i != 0]
240+
_non_zero = [
241+
i.returncode for i in self._processes.values() if i.returncode != 0
242+
]
236243

237244
if _non_zero:
238245
return _non_zero[0]
@@ -243,8 +250,8 @@ def get_error_summary(self) -> dict[str, typing.Optional[str]]:
243250
"""Returns the summary messages of all errors"""
244251
return {
245252
identifier: self._get_error_status(identifier)
246-
for identifier, value in self._exit_codes.items()
247-
if value
253+
for identifier, value in self._processes.items()
254+
if value.returncode
248255
}
249256

250257
def get_command(self, process_id: str) -> str:
@@ -278,8 +285,8 @@ def _get_error_status(self, process_id: str) -> typing.Optional[str]:
278285

279286
def _update_alerts(self) -> None:
280287
"""Send log events for the result of each process"""
281-
for proc_id, code in self._exit_codes.items():
282-
if code != 0:
288+
for proc_id, process in self._processes.items():
289+
if process.returncode != 0:
283290
# If the process fails then purge the dispatcher event queue
284291
# and ensure that the stderr event is sent before the run closes
285292
if self._runner._dispatcher:
@@ -302,7 +309,7 @@ def _update_alerts(self) -> None:
302309

303310
def _save_output(self) -> None:
304311
"""Save the output to Simvue"""
305-
for proc_id in self._exit_codes.keys():
312+
for proc_id in self._processes.keys():
306313
# Only save the file if the contents are not empty
307314
if self._std_err.get(proc_id):
308315
self._runner.save_file(
@@ -313,37 +320,15 @@ def _save_output(self) -> None:
313320
f"{self._runner.name}_{proc_id}.out", category="output"
314321
)
315322

316-
def kill_process(
317-
self, process_id: typing.Union[int, str], kill_children_only: bool = False
318-
) -> None:
319-
"""Kill a running process by ID
320-
321-
If argument is a string this is a process handled by the client,
322-
else it is a PID of a external monitored process
323+
def kill_process(self, process_id: str) -> None:
324+
"""Kill a running process by ID"""
325+
if not (process := self._processes.get(process_id)):
326+
logger.error(
327+
f"Failed to terminate process '{process_id}', no such identifier."
328+
)
329+
return
323330

324-
Parameters
325-
----------
326-
process_id : typing.Union[int, str]
327-
either the identifier for a client created process or the PID
328-
of an external process
329-
kill_children_only : bool, optional
330-
if process_id is an integer, whether to kill only its children
331-
"""
332-
if isinstance(process_id, str):
333-
if not (process := self._processes.get(process_id)):
334-
logger.error(
335-
f"Failed to terminate process '{process_id}', no such identifier."
336-
)
337-
return
338-
try:
339-
parent = psutil.Process(process.pid)
340-
except psutil.NoSuchProcess:
341-
return
342-
elif isinstance(process_id, int):
343-
try:
344-
parent = psutil.Process(process_id)
345-
except psutil.NoSuchProcess:
346-
return
331+
parent = psutil.Process(process.pid)
347332

348333
for child in parent.children(recursive=True):
349334
logger.debug(f"Terminating child process {child.pid}: {child.name()}")
@@ -352,13 +337,17 @@ def kill_process(
352337
for child in parent.children(recursive=True):
353338
child.wait()
354339

355-
if not kill_children_only:
356-
logger.debug(f"Terminating process {process.pid}: {process.args}")
357-
process.kill()
358-
process.wait()
340+
logger.debug(f"Terminating child process {process.pid}: {process.args}")
341+
process.kill()
342+
process.wait()
343+
344+
if trigger := self._completion_triggers.get(process_id):
345+
trigger.set()
346+
347+
if trigger_process := self._completion_processes.get(process_id):
348+
trigger_process.join()
359349

360-
if isinstance(process_id, str):
361-
self._execute_callback(process_id)
350+
self._execute_callback(process_id)
362351

363352
def kill_all(self) -> None:
364353
"""Kill all running processes"""
@@ -368,7 +357,7 @@ def kill_all(self) -> None:
368357
def _clear_cache_files(self) -> None:
369358
"""Clear local log files if required"""
370359
if not self._keep_logs:
371-
for proc_id in self._exit_codes.keys():
360+
for proc_id in self._processes.keys():
372361
os.remove(f"{self._runner.name}_{proc_id}.err")
373362
os.remove(f"{self._runner.name}_{proc_id}.out")
374363

@@ -385,8 +374,8 @@ def _execute_callback(self, identifier: str) -> None:
385374
std_out=std_out,
386375
std_err=std_err,
387376
)
388-
if completion_trigger := self._completion_triggers.get(identifier):
389-
completion_trigger.set()
377+
if completion_process := self._completion_processes.get(identifier):
378+
completion_process.join()
390379

391380
def wait_for_completion(self) -> None:
392381
"""Wait for all processes to finish then perform tidy up and upload"""

tests/refactor/test_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ def test_executor_add_process(
1414
successful: bool,
1515
request: pytest.FixtureRequest
1616
) -> None:
17+
import logging
18+
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
1719
run = simvue.Run()
1820
completion_trigger = multiprocessing.Event()
1921
run.init(
2022
f"test_executor_{'success' if successful else 'fail'}",
2123
tags=["simvue_client_unit_tests", request.node.name.replace("[", "_").replace("]", "_")],
2224
folder="/simvue_unit_testing"
2325
)
24-
2526
run.add_process(
2627
identifier=f"test_add_process_{'success' if successful else 'fail'}",
2728
c=f"exit {0 if successful else 1}",

0 commit comments

Comments
 (0)