Skip to content

Commit ed1ca61

Browse files
committed
Removed threads from executor and add child process termination
1 parent 5e1f84c commit ed1ca61

File tree

1 file changed

+48
-51
lines changed

1 file changed

+48
-51
lines changed

simvue/executor.py

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import sys
1616
import multiprocessing
1717
import os
18+
import psutil
1819
import subprocess
1920
import pathlib
2021
import time
@@ -30,13 +31,8 @@ def _execute_process(
3031
proc_id: str,
3132
command: typing.List[str],
3233
runner_name: str,
33-
exit_status_dict: typing.Dict[str, int],
34-
std_err: typing.Dict[str, str],
35-
std_out: typing.Dict[str, str],
36-
run_on_exit: typing.Optional[typing.Callable[[int, int, str], None]],
37-
trigger: typing.Optional[multiprocessing.synchronize.Event],
3834
environment: typing.Optional[typing.Dict[str, str]],
39-
) -> None:
35+
) -> subprocess.Popen:
4036
with open(f"{runner_name}_{proc_id}.err", "w") as err:
4137
with open(f"{runner_name}_{proc_id}.out", "w") as out:
4238
_result = subprocess.Popen(
@@ -47,24 +43,7 @@ def _execute_process(
4743
env=environment,
4844
)
4945

50-
_status_code = _result.wait()
51-
with open(f"{runner_name}_{proc_id}.err") as err:
52-
std_err[proc_id] = err.read()
53-
54-
with open(f"{runner_name}_{proc_id}.out") as out:
55-
std_out[proc_id] = out.read()
56-
57-
exit_status_dict[proc_id] = _status_code
58-
59-
if run_on_exit:
60-
run_on_exit(
61-
status_code=exit_status_dict[proc_id],
62-
std_out=std_out[proc_id],
63-
std_err=std_err[proc_id],
64-
)
65-
66-
if trigger:
67-
trigger.set()
46+
return _result
6847

6948

7049
class Executor:
@@ -88,13 +67,14 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
8867
"""
8968
self._runner = simvue_runner
9069
self._keep_logs = keep_logs
91-
self._manager = multiprocessing.Manager()
92-
self._exit_codes = self._manager.dict()
93-
self._std_err = self._manager.dict()
94-
self._std_out = self._manager.dict()
70+
self._completion_callbacks = {}
71+
self._completion_triggers = {}
72+
self._exit_codes = {}
73+
self._std_err = {}
74+
self._std_out = {}
9575
self._alert_ids: dict[str, str] = {}
96-
self._command_str: typing.Dict[str, str] = {}
97-
self._processes: typing.Dict[str, multiprocessing.Process] = {}
76+
self._command_str: dict[str, str] = {}
77+
self._processes: dict[str, subprocess.Popen] = {}
9878

9979
def add_process(
10080
self,
@@ -207,26 +187,16 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
207187
_command += _pos_args
208188

209189
self._command_str[identifier] = " ".join(_command)
190+
self._completion_callbacks[identifier] = completion_callback
191+
self._completion_triggers[identifier] = completion_trigger
210192

211-
self._processes[identifier] = multiprocessing.Process(
212-
target=_execute_process,
213-
args=(
214-
identifier,
215-
_command,
216-
self._runner.name,
217-
self._exit_codes,
218-
self._std_err,
219-
self._std_out,
220-
completion_callback,
221-
completion_trigger,
222-
env,
223-
),
193+
self._processes[identifier] = _execute_process(
194+
identifier, _command, self._runner.name, env
224195
)
196+
225197
self._alert_ids[identifier] = self._runner.create_alert(
226198
name=f"{identifier}_exit_status", source="user"
227199
)
228-
logger.debug(f"Executing process: {' '.join(_command)}")
229-
self._processes[identifier].start()
230200

231201
@property
232202
def success(self) -> int:
@@ -324,12 +294,22 @@ def kill_process(self, process_id: str) -> None:
324294
f"Failed to terminate process '{process_id}', no such identifier."
325295
)
326296
return
327-
_process.kill()
297+
298+
_parent = psutil.Process(_process.pid)
299+
300+
for child in _parent.children(recursive=True):
301+
logger.debug(f"Terminating child process {child.pid}: {child.name()}")
302+
child.kill()
303+
304+
logger.debug(f"Terminating child process {_process.pid}: {_process.args}")
305+
_process.terminate()
306+
307+
self._execute_callback(process_id)
328308

329309
def kill_all(self) -> None:
330310
"""Kill all running processes"""
331-
for process in self._processes.values():
332-
process.kill()
311+
for process in self._processes.keys():
312+
self.kill_process(process)
333313

334314
def _clear_cache_files(self) -> None:
335315
"""Clear local log files if required"""
@@ -338,11 +318,28 @@ def _clear_cache_files(self) -> None:
338318
os.remove(f"{self._runner.name}_{proc_id}.err")
339319
os.remove(f"{self._runner.name}_{proc_id}.out")
340320

321+
def _execute_callback(self, identifier: str) -> None:
322+
with open(f"{self._runner.name}_{identifier}.err") as err:
323+
std_err = err.read()
324+
325+
with open(f"{self._runner.name}_{identifier}.out") as out:
326+
std_out = out.read()
327+
328+
if self._completion_callbacks[identifier]:
329+
self._completion_callbacks[identifier](
330+
status_code=self._processes[identifier].returncode,
331+
std_out=std_out,
332+
std_err=std_err,
333+
)
334+
if self._completion_triggers[identifier]:
335+
self._completion_triggers[identifier].set()
336+
341337
def wait_for_completion(self) -> None:
342338
"""Wait for all processes to finish then perform tidy up and upload"""
343-
for process in self._processes.values():
344-
if process.is_alive():
345-
process.join()
339+
for identifier, process in self._processes.items():
340+
process.wait()
341+
self._execute_callback(identifier)
342+
346343
self._update_alerts()
347344
self._save_output()
348345

0 commit comments

Comments
 (0)