Skip to content

Use alerts for executor process reporting and stdout if stderr not available #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions simvue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
self._exit_codes = self._manager.dict()
self._std_err = self._manager.dict()
self._std_out = self._manager.dict()
self._alert_ids: dict[str, str] = {}
self._command_str: typing.Dict[str, str] = {}
self._processes: typing.Dict[str, multiprocessing.Process] = {}

Expand Down Expand Up @@ -221,6 +222,9 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
env,
),
)
self._alert_ids[identifier] = self._runner.create_alert(
name=f"{identifier}_exit_status", source="user"
)
logger.debug(f"Executing process: {' '.join(_command)}")
self._processes[identifier].start()

Expand All @@ -239,6 +243,14 @@ def exit_status(self) -> int:

return 0

def get_error_summary(self) -> dict[str, typing.Optional[str]]:
"""Returns the summary messages of all errors"""
return {
identifier: self._get_error_status(identifier)
for identifier, value in self._exit_codes.items()
if value
}

def get_command(self, process_id: str) -> str:
"""Returns the command executed within the given process.

Expand All @@ -256,7 +268,19 @@ def get_command(self, process_id: str) -> str:
raise KeyError(f"Failed to retrieve '{process_id}', no such process")
return self._command_str[process_id]

def _log_events(self) -> None:
def _get_error_status(self, process_id: str) -> typing.Optional[str]:
err_msg: typing.Optional[str] = None

# Return last 10 lines of stdout if stderr empty
if not (err_msg := self._std_err[process_id]) and (
std_out := self._std_out[process_id]
):
err_msg = " Tail STDOUT:\n\n"
start_index = -10 if len(lines := std_out.split("\n")) > 10 else 0
err_msg += "\n".join(lines[start_index:])
return err_msg

def _update_alerts(self) -> None:
"""Send log events for the result of each process"""
for proc_id, code in self._exit_codes.items():
if code != 0:
Expand All @@ -265,11 +289,9 @@ def _log_events(self) -> None:
if self._runner._dispatcher:
self._runner._dispatcher.purge()

_err = self._std_err[proc_id]
_msg = f"Process {proc_id} returned non-zero exit status {code} with:\n{_err}"
self._runner.log_alert(self._alert_ids[proc_id], "critical")
else:
_msg = f"Process {proc_id} completed successfully."
self._runner.log_event(_msg)
self._runner.log_alert(self._alert_ids[proc_id], "ok")

# Wait for the dispatcher to send the latest information before
# allowing the executor to finish (and as such the run instance to exit)
Expand Down Expand Up @@ -321,7 +343,7 @@ def wait_for_completion(self) -> None:
for process in self._processes.values():
if process.is_alive():
process.join()
self._log_events()
self._update_alerts()
self._save_output()

if not self.success:
Expand Down
30 changes: 26 additions & 4 deletions simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,19 @@ def __exit__(
self._dispatcher.join()

if _non_zero := self.executor.exit_status:
logger.error(
f"Simvue process executor terminated with non-zero exit status {_non_zero}"
_error_msgs: dict[str, typing.Optional[str]] = (
self.executor.get_error_summary()
)
_error_msg = "\n".join(
f"{identifier}:\n{msg}" for identifier, msg in _error_msgs.items()
)
if _error_msg:
_error_msg = f":\n{_error_msg}"
click.secho(
"Simvue process executor terminated with non-zero exit status "
f"{_non_zero}{_error_msg}",
fg="red",
bold=True,
)
sys.exit(_non_zero)

Expand Down Expand Up @@ -1375,8 +1386,19 @@ def close(self) -> bool:
self._dispatcher.join()

if _non_zero := self.executor.exit_status:
logger.error(
f"Simvue process executor terminated with non-zero exit status {_non_zero}"
_error_msgs: dict[str, typing.Optional[str]] = (
self.executor.get_error_summary()
)
_error_msg = "\n".join(
f"{identifier}:\n{msg}" for identifier, msg in _error_msgs.items()
)
if _error_msg:
_error_msg = f":\n{_error_msg}"
click.secho(
"Simvue process executor terminated with non-zero exit status "
f"{_non_zero}{_error_msg}",
fg="red",
bold=True,
)
sys.exit(_non_zero)

Expand Down
Loading