Skip to content

Updating RC with dev #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ if __name__ == "__main__":
...

# Send metrics inside main application loop
run.log({'loss': 0.5, 'density': 34.4})
run.log_metrics({'loss': 0.5, 'density': 34.4})

...

Expand Down
42 changes: 21 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 28 additions & 6 deletions simvue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
self._exit_codes = self._manager.dict()
self._std_err = self._manager.dict()
self._std_out = self._manager.dict()
self._alert_ids: dict[str, str] = {}
self._command_str: typing.Dict[str, str] = {}
self._processes: typing.Dict[str, multiprocessing.Process] = {}

Expand Down Expand Up @@ -221,6 +222,9 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
env,
),
)
self._alert_ids[identifier] = self._runner.create_alert(
name=f"{identifier}_exit_status", source="user"
)
logger.debug(f"Executing process: {' '.join(_command)}")
self._processes[identifier].start()

Expand All @@ -239,6 +243,14 @@ def exit_status(self) -> int:

return 0

def get_error_summary(self) -> dict[str, typing.Optional[str]]:
"""Returns the summary messages of all errors"""
return {
identifier: self._get_error_status(identifier)
for identifier, value in self._exit_codes.items()
if value
}

def get_command(self, process_id: str) -> str:
"""Returns the command executed within the given process.

Expand All @@ -256,7 +268,19 @@ def get_command(self, process_id: str) -> str:
raise KeyError(f"Failed to retrieve '{process_id}', no such process")
return self._command_str[process_id]

def _log_events(self) -> None:
def _get_error_status(self, process_id: str) -> typing.Optional[str]:
err_msg: typing.Optional[str] = None

# Return last 10 lines of stdout if stderr empty
if not (err_msg := self._std_err[process_id]) and (
std_out := self._std_out[process_id]
):
err_msg = " Tail STDOUT:\n\n"
start_index = -10 if len(lines := std_out.split("\n")) > 10 else 0
err_msg += "\n".join(lines[start_index:])
return err_msg

def _update_alerts(self) -> None:
"""Send log events for the result of each process"""
for proc_id, code in self._exit_codes.items():
if code != 0:
Expand All @@ -265,11 +289,9 @@ def _log_events(self) -> None:
if self._runner._dispatcher:
self._runner._dispatcher.purge()

_err = self._std_err[proc_id]
_msg = f"Process {proc_id} returned non-zero exit status {code} with:\n{_err}"
self._runner.log_alert(self._alert_ids[proc_id], "critical")
else:
_msg = f"Process {proc_id} completed successfully."
self._runner.log_event(_msg)
self._runner.log_alert(self._alert_ids[proc_id], "ok")

# Wait for the dispatcher to send the latest information before
# allowing the executor to finish (and as such the run instance to exit)
Expand Down Expand Up @@ -321,7 +343,7 @@ def wait_for_completion(self) -> None:
for process in self._processes.values():
if process.is_alive():
process.join()
self._log_events()
self._update_alerts()
self._save_output()

if not self.success:
Expand Down
30 changes: 26 additions & 4 deletions simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,19 @@ def __exit__(
self._dispatcher.join()

if _non_zero := self.executor.exit_status:
logger.error(
f"Simvue process executor terminated with non-zero exit status {_non_zero}"
_error_msgs: dict[str, typing.Optional[str]] = (
self.executor.get_error_summary()
)
_error_msg = "\n".join(
f"{identifier}:\n{msg}" for identifier, msg in _error_msgs.items()
)
if _error_msg:
_error_msg = f":\n{_error_msg}"
click.secho(
"Simvue process executor terminated with non-zero exit status "
f"{_non_zero}{_error_msg}",
fg="red",
bold=True,
)
sys.exit(_non_zero)

Expand Down Expand Up @@ -1375,8 +1386,19 @@ def close(self) -> bool:
self._dispatcher.join()

if _non_zero := self.executor.exit_status:
logger.error(
f"Simvue process executor terminated with non-zero exit status {_non_zero}"
_error_msgs: dict[str, typing.Optional[str]] = (
self.executor.get_error_summary()
)
_error_msg = "\n".join(
f"{identifier}:\n{msg}" for identifier, msg in _error_msgs.items()
)
if _error_msg:
_error_msg = f":\n{_error_msg}"
click.secho(
"Simvue process executor terminated with non-zero exit status "
f"{_non_zero}{_error_msg}",
fg="red",
bold=True,
)
sys.exit(_non_zero)

Expand Down
19 changes: 17 additions & 2 deletions simvue/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def _serialize_plotly_figure(data: typing.Any) -> typing.Optional[tuple[str, str
return None
mimetype = "application/vnd.plotly.v1+json"
data = plotly.io.to_json(data, engine="json")
mfile = BytesIO()
mfile.write(data.encode())
mfile.seek(0)
data = mfile.read()
return data, mimetype


Expand All @@ -110,6 +114,10 @@ def _serialize_matplotlib(data: typing.Any) -> typing.Optional[tuple[str, str]]:
return None
mimetype = "application/vnd.plotly.v1+json"
data = plotly.io.to_json(plotly.tools.mpl_to_plotly(data.gcf()), engine="json")
mfile = BytesIO()
mfile.write(data.encode())
mfile.seek(0)
data = mfile.read()
return data, mimetype


Expand All @@ -121,6 +129,10 @@ def _serialize_matplotlib_figure(data: typing.Any) -> typing.Optional[tuple[str,
return None
mimetype = "application/vnd.plotly.v1+json"
data = plotly.io.to_json(plotly.tools.mpl_to_plotly(data), engine="json")
mfile = BytesIO()
mfile.write(data.encode())
mfile.seek(0)
data = mfile.read()
return data, mimetype


Expand Down Expand Up @@ -161,8 +173,11 @@ def _serialize_torch_tensor(data: typing.Any) -> typing.Optional[tuple[str, str]
def _serialize_json(data: typing.Any) -> typing.Optional[tuple[str, str]]:
mimetype = "application/json"
try:
data = json.dumps(data)
except TypeError:
mfile = BytesIO()
mfile.write(json.dumps(data).encode())
mfile.seek(0)
data = mfile.read()
except (TypeError, json.JSONDecodeError):
return None
return data, mimetype

Expand Down
8 changes: 0 additions & 8 deletions tests/refactor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ def test_executor_add_process(
with pytest.raises(SystemExit):
run.close()

time.sleep(1)
client = simvue.Client()
_events = client.get_events(
run._id,
message_contains="successfully" if successful else "non-zero exit",
)
assert len(_events) == 1


@pytest.mark.executor
def test_add_process_command_assembly(request: pytest.FixtureRequest) -> None:
Expand Down
Loading