From 84f0717bdd905e55f2f1af60ac77424653519aba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= <64790965+kzscisoft@users.noreply.github.com> Date: Tue, 4 Jun 2024 19:23:10 +0100 Subject: [PATCH 01/12] Start addition of auto-abort functionality --- simvue/factory/proxy/base.py | 4 +++ simvue/factory/proxy/offline.py | 6 ++++ simvue/factory/proxy/remote.py | 23 ++++++++++++++++ simvue/run.py | 47 ++++++++++++++++++++++++++------ tests/refactor/test_run_class.py | 29 ++++++++++++++++++++ 5 files changed, 100 insertions(+), 9 deletions(-) diff --git a/simvue/factory/proxy/base.py b/simvue/factory/proxy/base.py index 6f95efa9..1ca9060c 100644 --- a/simvue/factory/proxy/base.py +++ b/simvue/factory/proxy/base.py @@ -89,3 +89,7 @@ def send_heartbeat(self) -> typing.Optional[dict[str, typing.Any]]: @abc.abstractmethod def check_token(self) -> bool: pass + + @abc.abstractmethod + def get_abort_status(self) -> bool: + pass diff --git a/simvue/factory/proxy/offline.py b/simvue/factory/proxy/offline.py index add39c14..8460bc5e 100644 --- a/simvue/factory/proxy/offline.py +++ b/simvue/factory/proxy/offline.py @@ -171,9 +171,15 @@ def set_alert_state( @skip_if_failed("_aborted", "_suppress_errors", []) def list_tags(self) -> list[dict[str, typing.Any]]: + #TODO: Tag retrieval not implemented for offline running raise NotImplementedError( "Retrieval of current tags is not implemented for offline running" ) + + @skip_if_failed("_aborted", "_suppress_errors", True) + def get_abort_status(self) -> bool: + #TODO: Abort on failure not implemented for offline running + return True @skip_if_failed("_aborted", "_suppress_errors", []) def list_alerts(self) -> list[dict[str, typing.Any]]: diff --git a/simvue/factory/proxy/remote.py b/simvue/factory/proxy/remote.py index 612b4a09..cb00cb6d 100644 --- a/simvue/factory/proxy/remote.py +++ b/simvue/factory/proxy/remote.py @@ -467,3 +467,26 @@ def check_token(self) -> bool: self._error("Token has expired") return False return True + + @skip_if_failed("_aborted", "_suppress_errors", False) + def get_abort_status(self) -> bool: + logger.debug("Retrieving alert status") + + try: + response = get( + f"{self._url}/api/runs/{self._id}/abort", self._headers_mp + ) + except Exception as err: + self._error(f"Exception retrieving abort status: {str(err)}") + return False + + logger.debug("Got status code %d when checking abort status", response.status_code) + + if response.status_code == 200: + if (status := response.json().get("status")) is None: + self._error(f"Expected key 'status' when retrieving abort status {response.json()}") + return False + return status + + self._error(f"Got status code {response.status_code} when checking abort status") + return False diff --git a/simvue/run.py b/simvue/run.py index 78376811..b6b7cd11 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -101,6 +101,7 @@ def __init__( self._uuid: str = f"{uuid.uuid4()}" self._mode: typing.Literal["online", "offline", "disabled"] = mode self._name: typing.Optional[str] = None + self._abort_on_fail: bool = True self._dispatch_mode: typing.Literal["direct", "queued"] = "queued" self._executor = Executor(self) self._dispatcher: typing.Optional[DispatcherBaseClass] = None @@ -124,8 +125,10 @@ def __init__( self._shutdown_event: typing.Optional[threading.Event] = None self._configuration_lock = threading.Lock() self._heartbeat_termination_trigger: typing.Optional[threading.Event] = None + self._alert_raised_trigger: typing.Optional[threading.Event] = None self._storage_id: typing.Optional[str] = None self._heartbeat_thread: typing.Optional[threading.Thread] = None + self._heartbeat_interval: int = HEARTBEAT_INTERVAL def __enter__(self) -> "Run": return self @@ -138,9 +141,6 @@ def __exit__( typing.Union[typing.Type[BaseException], BaseException] ], ) -> None: - # Wait for the executor to finish with currently running processes - self._executor.wait_for_completion() - identifier = self._id logger.debug( "Automatically closing run '%s' in status %s", @@ -153,6 +153,18 @@ def __exit__( self._heartbeat_termination_trigger.set() self._heartbeat_thread.join() + # Wait for the executor to finish with currently running processes + if ( + self._abort_on_fail + and self._alert_raised_trigger + and self._alert_raised_trigger.is_set() + ): + if self._shutdown_event: + self._shutdown_event.set() + self.kill_all_processes() + + self._executor.wait_for_completion() + # Handle case where run is aborted by user KeyboardInterrupt if (self._id or self._mode == "offline") and self._status == "running": if not exc_type: @@ -273,11 +285,20 @@ def _heartbeat( ) last_res_metric_call = res_time - if time.time() - last_heartbeat < HEARTBEAT_INTERVAL: + if time.time() - last_heartbeat < self._heartbeat_interval: continue last_heartbeat = time.time() + with self._configuration_lock: + if ( + self._simvue + and self._simvue.get_abort_status() + and self._alert_raised_trigger + ): + self._alert_raised_trigger.set() + break + if self._simvue: self._simvue.send_heartbeat() @@ -393,6 +414,7 @@ def _start(self, reconnect: bool = False) -> bool: self._shutdown_event = threading.Event() self._heartbeat_termination_trigger = threading.Event() + self._alert_raised_trigger = threading.Event() try: self._dispatcher = Dispatcher( @@ -438,6 +460,12 @@ def _error(self, message: str, join_threads: bool = True) -> None: self._heartbeat_termination_trigger.set() if join_threads: self._heartbeat_thread.join() + if ( + self._abort_on_fail + and self._alert_raised_trigger + and self._alert_raised_trigger.is_set() + ): + self.kill_all_processes() # Finish stopping all threads if self._shutdown_event: @@ -474,7 +502,6 @@ def init( folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)] = "/", running: bool = True, retention_period: typing.Optional[str] = None, - resources_metrics_interval: typing.Optional[int] = HEARTBEAT_INTERVAL, visibility: typing.Union[ typing.Literal["public", "tenant"], list[str], None ] = None, @@ -500,8 +527,6 @@ def init( retention_period : str, optional describer for time period to retain run, the default of None removes this constraint. - resources_metrics_interval : int, optional - how often to publish resource metrics, if None these will not be published visibility : Literal['public', 'tenant'] | list[str], optional set visibility options for this run, either: * public - run viewable to all. @@ -536,8 +561,6 @@ def init( self._error("specified name is invalid") return False - self._resources_metrics_interval = resources_metrics_interval - self._name = name self._status = "running" if running else "created" @@ -806,6 +829,7 @@ def config( resources_metrics_interval: typing.Optional[int] = None, disable_resources_metrics: typing.Optional[bool] = None, storage_id: typing.Optional[str] = None, + abort_on_fail: typing.Optional[bool] = None, ) -> bool: """Optional configuration @@ -822,6 +846,8 @@ def config( disable monitoring of resource metrics storage_id : str, optional identifier of storage to use, by default None + abort_on_fail : bool, optional + whether to abort the run if an alert is triggered Returns ------- @@ -849,6 +875,9 @@ def config( if resources_metrics_interval: self._resources_metrics_interval = resources_metrics_interval + if abort_on_fail: + self._abort_on_fail = abort_on_fail + if storage_id: self._storage_id = storage_id diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index d7bc0ca9..fdbb4ec7 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -459,3 +459,32 @@ def test_save_object( pytest.skip("Numpy is not installed") save_obj = array([1, 2, 3, 4]) simvue_run.save_object(save_obj, "input", f"test_object_{object_type}") + + +@pytest.mark.run +def test_abort_on_fail(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: + run, _ = create_plain_run + alert_id = run.create_alert("forever_fails", source="user") + run.config(resources_metrics_interval=1) + run._heartbeat_interval = 1 + run.add_process( + identifier="forever_long", + executable="bash", + c="sleep 10000" + ) + time.sleep(2) + run.log_alert(alert_id, "critical") + time.sleep(4) + + +@pytest.mark.run +def test_kill_all_processes(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: + run, _ = create_plain_run + run.config(resources_metrics_interval=1) + run.add_process( + identifier="forever_long", + executable="bash", + c="sleep 10000" + ) + time.sleep(2) + run.kill_all_processes() From 395a8e0b2c50d886bc71caf131dceeb2a0198a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 5 Jun 2024 09:12:32 +0100 Subject: [PATCH 02/12] Handle alert fail abort in Python script --- simvue/run.py | 27 ++++++++++++++++---------- tests/refactor/test_run_class.py | 33 ++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/simvue/run.py b/simvue/run.py index b6b7cd11..f27359ad 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -159,6 +159,7 @@ def __exit__( and self._alert_raised_trigger and self._alert_raised_trigger.is_set() ): + self._aborted = True if self._shutdown_event: self._shutdown_event.set() self.kill_all_processes() @@ -297,6 +298,7 @@ def _heartbeat( and self._alert_raised_trigger ): self._alert_raised_trigger.set() + self.close(False) break if self._simvue: @@ -465,6 +467,7 @@ def _error(self, message: str, join_threads: bool = True) -> None: and self._alert_raised_trigger and self._alert_raised_trigger.is_set() ): + self._aborted = True self.kill_all_processes() # Finish stopping all threads @@ -1378,15 +1381,7 @@ def set_status( return False - @skip_if_failed("_aborted", "_suppress_errors", False) - def close(self) -> bool: - """Close the run - - Returns - ------- - bool - whether close was successful - """ + def _close_session(self, join_heartbeat: bool) -> bool: self._executor.wait_for_completion() if self._mode == "disabled": return True @@ -1401,7 +1396,8 @@ def close(self) -> bool: if self._heartbeat_thread and self._heartbeat_termination_trigger: self._heartbeat_termination_trigger.set() - self._heartbeat_thread.join() + if join_heartbeat: + self._heartbeat_thread.join() if self._shutdown_event: self._shutdown_event.set() @@ -1433,6 +1429,17 @@ def close(self) -> bool: return True + @skip_if_failed("_aborted", "_suppress_errors", False) + def close(self) -> bool: + """Close the run + + Returns + ------- + bool + whether close was successful + """ + return self._close_session(True) + @skip_if_failed("_aborted", "_suppress_errors", False) @check_run_initialised @pydantic.validate_call diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index fdbb4ec7..c50e2325 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -462,29 +462,34 @@ def test_save_object( @pytest.mark.run -def test_abort_on_fail(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: +def test_abort_on_alert(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run - alert_id = run.create_alert("forever_fails", source="user") + run.create_alert( + "forever_fails", + source="metrics", + metric="x", + frequency=1, + window=1, + rule="is below", + threshold=0, + ) run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 - run.add_process( - identifier="forever_long", - executable="bash", - c="sleep 10000" - ) + run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") time.sleep(2) - run.log_alert(alert_id, "critical") + run.log_metrics({"x": -1}) + time.sleep(1) + run.log_metrics({"x": -1}) time.sleep(4) - + if not run._aborted: + run.kill_all_processes() + raise AssertionError("Run was not aborted") + @pytest.mark.run def test_kill_all_processes(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run run.config(resources_metrics_interval=1) - run.add_process( - identifier="forever_long", - executable="bash", - c="sleep 10000" - ) + run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") time.sleep(2) run.kill_all_processes() From dff7b4bcdd70aa0f68ae898a7050fbb6edeeb248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 5 Jun 2024 09:14:00 +0100 Subject: [PATCH 03/12] Use user alert in test --- tests/refactor/test_run_class.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index c50e2325..b6a449c0 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -464,22 +464,15 @@ def test_save_object( @pytest.mark.run def test_abort_on_alert(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run - run.create_alert( + alert_id = run.create_alert( "forever_fails", - source="metrics", - metric="x", - frequency=1, - window=1, - rule="is below", - threshold=0, + source="user" ) run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") time.sleep(2) - run.log_metrics({"x": -1}) - time.sleep(1) - run.log_metrics({"x": -1}) + run.log_alert(alert_id, "critical") time.sleep(4) if not run._aborted: run.kill_all_processes() From 753c9b2049948186cb6302a1a5cacbb3891221c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 5 Jun 2024 14:29:26 +0100 Subject: [PATCH 04/12] Fix wrong function call --- simvue/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 9acda71e..a9e2089c 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -298,7 +298,7 @@ def _heartbeat( and self._alert_raised_trigger ): self._alert_raised_trigger.set() - self.close(False) + self._close_session(False) break if self._simvue: From 213e19a75c20bd4f46845ef0ba4f0f7e26f89a2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Wed, 5 Jun 2024 16:30:55 +0100 Subject: [PATCH 05/12] Fix haning on abort --- simvue/run.py | 49 ++++++++++++++++++++----------------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/simvue/run.py b/simvue/run.py index a9e2089c..a969b7c0 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -148,24 +148,13 @@ def __exit__( self._status, ) + self._executor.wait_for_completion() + # Stop the run heartbeat if self._heartbeat_thread and self._heartbeat_termination_trigger: self._heartbeat_termination_trigger.set() self._heartbeat_thread.join() - # Wait for the executor to finish with currently running processes - if ( - self._abort_on_fail - and self._alert_raised_trigger - and self._alert_raised_trigger.is_set() - ): - self._aborted = True - if self._shutdown_event: - self._shutdown_event.set() - self.kill_all_processes() - - self._executor.wait_for_completion() - # Handle case where run is aborted by user KeyboardInterrupt if (self._id or self._mode == "offline") and self._status == "running": if not exc_type: @@ -291,6 +280,7 @@ def _heartbeat( last_heartbeat = time.time() + # Check if the user has aborted the run with self._configuration_lock: if ( self._simvue @@ -298,8 +288,13 @@ def _heartbeat( and self._alert_raised_trigger ): self._alert_raised_trigger.set() - self._close_session(False) - break + self.kill_all_processes() + if self._dispatcher and self._shutdown_event: + self._shutdown_event.set() + self._dispatcher.purge() + self._dispatcher.join() + self.set_status("terminated") + raise RuntimeError("Run was aborted") if self._simvue: self._simvue.send_heartbeat() @@ -1379,7 +1374,15 @@ def set_status( return False - def _close_session(self, join_heartbeat: bool) -> bool: + @skip_if_failed("_aborted", "_suppress_errors", False) + def close(self) -> bool: + """Close the run + + Returns + ------- + bool + whether close was successful + """ self._executor.wait_for_completion() if self._mode == "disabled": return True @@ -1394,8 +1397,7 @@ def _close_session(self, join_heartbeat: bool) -> bool: if self._heartbeat_thread and self._heartbeat_termination_trigger: self._heartbeat_termination_trigger.set() - if join_heartbeat: - self._heartbeat_thread.join() + self._heartbeat_thread.join() if self._shutdown_event: self._shutdown_event.set() @@ -1427,17 +1429,6 @@ def _close_session(self, join_heartbeat: bool) -> bool: return True - @skip_if_failed("_aborted", "_suppress_errors", False) - def close(self) -> bool: - """Close the run - - Returns - ------- - bool - whether close was successful - """ - return self._close_session(True) - @skip_if_failed("_aborted", "_suppress_errors", False) @check_run_initialised @pydantic.validate_call From 7a16c3a346e738d0839ceb371ba08ae8cdd13d1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Fri, 7 Jun 2024 12:08:45 +0100 Subject: [PATCH 06/12] Added abort_run method to client, and fixed test of abort --- simvue/client.py | 39 ++++++++++++++++++++++++++++++++ simvue/executor.py | 8 +++---- tests/refactor/test_run_class.py | 12 ++++------ 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/simvue/client.py b/simvue/client.py index bebb7b7a..cf1f0be3 100644 --- a/simvue/client.py +++ b/simvue/client.py @@ -603,6 +603,45 @@ def _retrieve_artifact_from_server( return json_response + @prettify_pydantic + @pydantic.validate_call + def abort_run(self, run_id: str, reason: str) -> typing.Union[dict, list]: + """Abort a currently active run on the server + + Parameters + ---------- + run_id : str + the unique identifier for the run + reason : str + reason for abort + + Returns + ------- + dict | list + response from server + """ + body: dict[str, str | None] = {"id": run_id, "reason": reason} + + response = requests.put( + f"{self._url}/api/runs/abort", + headers=self._headers, + json=body, + ) + + json_response = self._get_json_from_response( + expected_status=[200, 400], + scenario=f"Abort of run '{run_id}'", + response=response, + ) + + if not isinstance(json_response, dict): + raise RuntimeError( + "Expected list from JSON response during retrieval of " + f"artifact but got '{type(json_response)}'" + ) + + return json_response + @prettify_pydantic @pydantic.validate_call def get_artifact( diff --git a/simvue/executor.py b/simvue/executor.py index f4710137..08374ed2 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -271,8 +271,8 @@ def _get_error_status(self, process_id: str) -> typing.Optional[str]: err_msg: typing.Optional[str] = None # Return last 10 lines of stdout if stderr empty - if not (err_msg := self._std_err[process_id]) and ( - std_out := self._std_out[process_id] + if not (err_msg := self._std_err.get(process_id)) and ( + std_out := self._std_out.get(process_id) ): err_msg = " Tail STDOUT:\n\n" start_index = -10 if len(lines := std_out.split("\n")) > 10 else 0 @@ -307,11 +307,11 @@ def _save_output(self) -> None: """Save the output to Simvue""" for proc_id in self._exit_codes.keys(): # Only save the file if the contents are not empty - if self._std_err[proc_id]: + if self._std_err.get(proc_id): self._runner.save_file( f"{self._runner.name}_{proc_id}.err", category="output" ) - if self._std_out[proc_id]: + if self._std_out.get(proc_id): self._runner.save_file( f"{self._runner.name}_{proc_id}.out", category="output" ) diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index b6a449c0..5bfb5acb 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -8,7 +8,6 @@ import pathlib import concurrent.futures import random -import inspect import simvue.run as sv_run import simvue.client as sv_cl @@ -464,19 +463,16 @@ def test_save_object( @pytest.mark.run def test_abort_on_alert(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run - alert_id = run.create_alert( - "forever_fails", - source="user" - ) run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") time.sleep(2) - run.log_alert(alert_id, "critical") + client = sv_cl.Client() + client.abort_run(run._id, reason="testing abort") time.sleep(4) - if not run._aborted: + if not run._status == "terminated": run.kill_all_processes() - raise AssertionError("Run was not aborted") + raise AssertionError("Run was not terminated") @pytest.mark.run From f1d6f491d1c0b1a91497ff9ff4ea68ec536715fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Fri, 7 Jun 2024 12:29:52 +0100 Subject: [PATCH 07/12] Added python based script test Used sys._exit instead of exception raise to exit both scenarios Updated click echo to always have [simvue] --- simvue/run.py | 9 +++++---- tests/refactor/test_run_class.py | 22 +++++++++++++++++++++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/simvue/run.py b/simvue/run.py index a969b7c0..d9c7ec45 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -189,7 +189,7 @@ def __exit__( if _error_msg: _error_msg = f":\n{_error_msg}" click.secho( - "Simvue process executor terminated with non-zero exit status " + "[simvue] Process executor terminated with non-zero exit status " f"{_non_zero}{_error_msg}", fg="red", bold=True, @@ -294,7 +294,8 @@ def _heartbeat( self._dispatcher.purge() self._dispatcher.join() self.set_status("terminated") - raise RuntimeError("Run was aborted") + click.secho("[simvue] Run was aborted.", fg="red", bold=True) + os._exit(1) if self._simvue: self._simvue.send_heartbeat() @@ -1232,7 +1233,7 @@ def save_file( if not file_size: click.secho( - "WARNING: saving zero-sized files not currently supported", + "[simvue] WARNING: saving zero-sized files not currently supported", bold=True, fg="yellow", ) @@ -1420,7 +1421,7 @@ def close(self) -> bool: if _error_msg: _error_msg = f":\n{_error_msg}" click.secho( - "Simvue process executor terminated with non-zero exit status " + "[simvue] Process executor terminated with non-zero exit status " f"{_non_zero}{_error_msg}", fg="red", bold=True, diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index 5bfb5acb..dfa966fe 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -461,7 +461,7 @@ def test_save_object( @pytest.mark.run -def test_abort_on_alert(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: +def test_abort_on_alert_process(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 @@ -473,6 +473,26 @@ def test_abort_on_alert(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> Non if not run._status == "terminated": run.kill_all_processes() raise AssertionError("Run was not terminated") + + +@pytest.mark.run +def test_abort_on_alert_python(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: + run, _ = create_plain_run + run.config(resources_metrics_interval=1) + run._heartbeat_interval = 1 + client = sv_cl.Client() + i = 0 + + while True: + time.sleep(1) + if i == 4: + client.abort_run(run._id, reason="testing abort") + i += 1 + if i == 10: + break + + assert i == 4 + assert run._status == "terminated" @pytest.mark.run From 352b4ed6af067a88842437030414dea8d77d293a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Fri, 7 Jun 2024 12:51:10 +0100 Subject: [PATCH 08/12] update for term color option --- simvue/run.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/simvue/run.py b/simvue/run.py index 9fd7e487..f9986039 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -295,7 +295,11 @@ def _heartbeat( self._dispatcher.purge() self._dispatcher.join() self.set_status("terminated") - click.secho("[simvue] Run was aborted.", fg="red", bold=True) + click.secho( + "[simvue] Run was aborted.", + fg="red" if self._term_color else None, + bold=self._term_color, + ) os._exit(1) if self._simvue: From 730d17bd4f642c88dca0ba1a0827b38d36cade3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Fri, 7 Jun 2024 13:03:15 +0100 Subject: [PATCH 09/12] Remove unused trigger variable for alerts --- simvue/run.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/simvue/run.py b/simvue/run.py index f9986039..3b74b5a9 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -101,7 +101,7 @@ def __init__( self._uuid: str = f"{uuid.uuid4()}" self._mode: typing.Literal["online", "offline", "disabled"] = mode self._name: typing.Optional[str] = None - self._abort_on_fail: bool = True + self._abort_on_alert: bool = True self._dispatch_mode: typing.Literal["direct", "queued"] = "queued" self._executor = Executor(self) self._dispatcher: typing.Optional[DispatcherBaseClass] = None @@ -126,7 +126,6 @@ def __init__( self._shutdown_event: typing.Optional[threading.Event] = None self._configuration_lock = threading.Lock() self._heartbeat_termination_trigger: typing.Optional[threading.Event] = None - self._alert_raised_trigger: typing.Optional[threading.Event] = None self._storage_id: typing.Optional[str] = None self._heartbeat_thread: typing.Optional[threading.Thread] = None self._heartbeat_interval: int = HEARTBEAT_INTERVAL @@ -285,8 +284,8 @@ def _heartbeat( with self._configuration_lock: if ( self._simvue + and self._abort_on_alert and self._simvue.get_abort_status() - and self._alert_raised_trigger ): self._alert_raised_trigger.set() self.kill_all_processes() @@ -463,13 +462,6 @@ def _error(self, message: str, join_threads: bool = True) -> None: self._heartbeat_termination_trigger.set() if join_threads: self._heartbeat_thread.join() - if ( - self._abort_on_fail - and self._alert_raised_trigger - and self._alert_raised_trigger.is_set() - ): - self._aborted = True - self.kill_all_processes() # Finish stopping all threads if self._shutdown_event: @@ -842,7 +834,7 @@ def config( resources_metrics_interval: typing.Optional[int] = None, disable_resources_metrics: typing.Optional[bool] = None, storage_id: typing.Optional[str] = None, - abort_on_fail: typing.Optional[bool] = None, + abort_on_alert: typing.Optional[bool] = None, ) -> bool: """Optional configuration @@ -859,7 +851,7 @@ def config( disable monitoring of resource metrics storage_id : str, optional identifier of storage to use, by default None - abort_on_fail : bool, optional + abort_on_alert : bool, optional whether to abort the run if an alert is triggered Returns @@ -888,8 +880,8 @@ def config( if resources_metrics_interval: self._resources_metrics_interval = resources_metrics_interval - if abort_on_fail: - self._abort_on_fail = abort_on_fail + if abort_on_alert: + self._abort_on_alert = abort_on_alert if storage_id: self._storage_id = storage_id From ed1ca61df7cb34894e4aaf1ef6613db3271ad3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Fri, 7 Jun 2024 18:40:28 +0100 Subject: [PATCH 10/12] Removed threads from executor and add child process termination --- simvue/executor.py | 99 ++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/simvue/executor.py b/simvue/executor.py index 7689fd7d..d3b05b38 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -15,6 +15,7 @@ import sys import multiprocessing import os +import psutil import subprocess import pathlib import time @@ -30,13 +31,8 @@ def _execute_process( proc_id: str, command: typing.List[str], runner_name: str, - exit_status_dict: typing.Dict[str, int], - std_err: typing.Dict[str, str], - std_out: typing.Dict[str, str], - run_on_exit: typing.Optional[typing.Callable[[int, int, str], None]], - trigger: typing.Optional[multiprocessing.synchronize.Event], environment: typing.Optional[typing.Dict[str, str]], -) -> None: +) -> subprocess.Popen: with open(f"{runner_name}_{proc_id}.err", "w") as err: with open(f"{runner_name}_{proc_id}.out", "w") as out: _result = subprocess.Popen( @@ -47,24 +43,7 @@ def _execute_process( env=environment, ) - _status_code = _result.wait() - with open(f"{runner_name}_{proc_id}.err") as err: - std_err[proc_id] = err.read() - - with open(f"{runner_name}_{proc_id}.out") as out: - std_out[proc_id] = out.read() - - exit_status_dict[proc_id] = _status_code - - if run_on_exit: - run_on_exit( - status_code=exit_status_dict[proc_id], - std_out=std_out[proc_id], - std_err=std_err[proc_id], - ) - - if trigger: - trigger.set() + return _result class Executor: @@ -88,13 +67,14 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None: """ self._runner = simvue_runner self._keep_logs = keep_logs - self._manager = multiprocessing.Manager() - self._exit_codes = self._manager.dict() - self._std_err = self._manager.dict() - self._std_out = self._manager.dict() + self._completion_callbacks = {} + self._completion_triggers = {} + self._exit_codes = {} + self._std_err = {} + self._std_out = {} self._alert_ids: dict[str, str] = {} - self._command_str: typing.Dict[str, str] = {} - self._processes: typing.Dict[str, multiprocessing.Process] = {} + self._command_str: dict[str, str] = {} + self._processes: dict[str, subprocess.Popen] = {} def add_process( self, @@ -207,26 +187,16 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None: _command += _pos_args self._command_str[identifier] = " ".join(_command) + self._completion_callbacks[identifier] = completion_callback + self._completion_triggers[identifier] = completion_trigger - self._processes[identifier] = multiprocessing.Process( - target=_execute_process, - args=( - identifier, - _command, - self._runner.name, - self._exit_codes, - self._std_err, - self._std_out, - completion_callback, - completion_trigger, - env, - ), + self._processes[identifier] = _execute_process( + identifier, _command, self._runner.name, env ) + self._alert_ids[identifier] = self._runner.create_alert( name=f"{identifier}_exit_status", source="user" ) - logger.debug(f"Executing process: {' '.join(_command)}") - self._processes[identifier].start() @property def success(self) -> int: @@ -324,12 +294,22 @@ def kill_process(self, process_id: str) -> None: f"Failed to terminate process '{process_id}', no such identifier." ) return - _process.kill() + + _parent = psutil.Process(_process.pid) + + for child in _parent.children(recursive=True): + logger.debug(f"Terminating child process {child.pid}: {child.name()}") + child.kill() + + logger.debug(f"Terminating child process {_process.pid}: {_process.args}") + _process.terminate() + + self._execute_callback(process_id) def kill_all(self) -> None: """Kill all running processes""" - for process in self._processes.values(): - process.kill() + for process in self._processes.keys(): + self.kill_process(process) def _clear_cache_files(self) -> None: """Clear local log files if required""" @@ -338,11 +318,28 @@ def _clear_cache_files(self) -> None: os.remove(f"{self._runner.name}_{proc_id}.err") os.remove(f"{self._runner.name}_{proc_id}.out") + def _execute_callback(self, identifier: str) -> None: + with open(f"{self._runner.name}_{identifier}.err") as err: + std_err = err.read() + + with open(f"{self._runner.name}_{identifier}.out") as out: + std_out = out.read() + + if self._completion_callbacks[identifier]: + self._completion_callbacks[identifier]( + status_code=self._processes[identifier].returncode, + std_out=std_out, + std_err=std_err, + ) + if self._completion_triggers[identifier]: + self._completion_triggers[identifier].set() + def wait_for_completion(self) -> None: """Wait for all processes to finish then perform tidy up and upload""" - for process in self._processes.values(): - if process.is_alive(): - process.join() + for identifier, process in self._processes.items(): + process.wait() + self._execute_callback(identifier) + self._update_alerts() self._save_output() From 20d64669233bc8e7ca568f0afea1fe723d3003e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Mon, 10 Jun 2024 10:34:31 +0100 Subject: [PATCH 11/12] Update typing issues and fix tests for checking scenarios for abort --- simvue/api.py | 2 +- simvue/executor.py | 41 +++++++++++++++++++------------- simvue/run.py | 27 ++++++++++++++------- simvue/types.py | 1 + tests/refactor/test_run_class.py | 28 ++++++++++++++++++---- 5 files changed, 68 insertions(+), 31 deletions(-) diff --git a/simvue/api.py b/simvue/api.py index 626961df..022aaa95 100644 --- a/simvue/api.py +++ b/simvue/api.py @@ -44,7 +44,7 @@ def set_json_header(headers: dict[str, str]) -> dict[str, str]: reraise=True, ) def post( - url: str, headers: dict[str, str], data: dict[str, typing.Any], is_json: bool = True + url: str, headers: dict[str, str], data: typing.Any, is_json: bool = True ) -> requests.Response: """HTTP POST with retries diff --git a/simvue/executor.py b/simvue/executor.py index 776e293f..cd657659 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -27,6 +27,10 @@ logger = logging.getLogger(__name__) +class CompletionCallback(typing.Protocol): + def __call__(self, *, status_code: int, std_out: str, std_err: str) -> None: ... + + def _execute_process( proc_id: str, command: typing.List[str], @@ -67,11 +71,13 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None: """ self._runner = simvue_runner self._keep_logs = keep_logs - self._completion_callbacks = {} - self._completion_triggers = {} - self._exit_codes = {} - self._std_err = {} - self._std_out = {} + self._completion_callbacks: dict[str, typing.Optional[CompletionCallback]] = {} + self._completion_triggers: dict[ + str, typing.Optional[multiprocessing.synchronize.Event] + ] = {} + self._exit_codes: dict[str, int] = {} + self._std_err: dict[str, str] = {} + self._std_out: dict[str, str] = {} self._alert_ids: dict[str, str] = {} self._command_str: dict[str, str] = {} self._processes: dict[str, subprocess.Popen] = {} @@ -84,9 +90,7 @@ def add_process( script: typing.Optional[pathlib.Path] = None, input_file: typing.Optional[pathlib.Path] = None, env: typing.Optional[typing.Dict[str, str]] = None, - completion_callback: typing.Optional[ - typing.Callable[[int, str, str], None] - ] = None, + completion_callback: typing.Optional[CompletionCallback] = None, completion_trigger: typing.Optional[multiprocessing.synchronize.Event] = None, **kwargs, ) -> None: @@ -141,6 +145,9 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None: """ _pos_args = list(args) + if not self._runner.name: + raise RuntimeError("Cannot add process, expected Run instance to have name") + if sys.platform == "win32" and completion_callback: logger.warning( "Completion callback for 'add_process' may fail on Windows due to " @@ -289,20 +296,20 @@ def _save_output(self) -> None: def kill_process(self, process_id: str) -> None: """Kill a running process by ID""" - if not (_process := self._processes.get(process_id)): + if not (process := self._processes.get(process_id)): logger.error( f"Failed to terminate process '{process_id}', no such identifier." ) return - _parent = psutil.Process(_process.pid) + parent = psutil.Process(process.pid) - for child in _parent.children(recursive=True): + for child in parent.children(recursive=True): logger.debug(f"Terminating child process {child.pid}: {child.name()}") child.kill() - logger.debug(f"Terminating child process {_process.pid}: {_process.args}") - _process.terminate() + logger.debug(f"Terminating child process {process.pid}: {process.args}") + process.terminate() self._execute_callback(process_id) @@ -325,14 +332,14 @@ def _execute_callback(self, identifier: str) -> None: with open(f"{self._runner.name}_{identifier}.out") as out: std_out = out.read() - if self._completion_callbacks[identifier]: - self._completion_callbacks[identifier]( + if callback := self._completion_callbacks.get(identifier): + callback( status_code=self._processes[identifier].returncode, std_out=std_out, std_err=std_err, ) - if self._completion_triggers[identifier]: - self._completion_triggers[identifier].set() + if completion_trigger := self._completion_triggers.get(identifier): + completion_trigger.set() def wait_for_completion(self) -> None: """Wait for all processes to finish then perform tidy up and upload""" diff --git a/simvue/run.py b/simvue/run.py index cfa242fe..5d835c3b 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -10,6 +10,7 @@ import datetime import json import logging +import pathlib import mimetypes import multiprocessing.synchronize import threading @@ -50,6 +51,7 @@ validate_timestamp, ) + if typing.TYPE_CHECKING: from .factory.proxy import SimvueBaseClass from .factory.dispatch import DispatcherBaseClass @@ -101,6 +103,7 @@ def __init__( self._uuid: str = f"{uuid.uuid4()}" self._mode: typing.Literal["online", "offline", "disabled"] = mode self._name: typing.Optional[str] = None + self._testing: bool = False self._abort_on_alert: bool = True self._dispatch_mode: typing.Literal["direct", "queued"] = "queued" self._executor = Executor(self) @@ -246,7 +249,7 @@ def _get_sysinfo(self) -> dict[str, typing.Any]: def _create_heartbeat_callback( self, - ) -> typing.Callable[[str, dict, str, bool], None]: + ) -> typing.Callable[[threading.Event], None]: if ( self._mode == "online" and (not self._url or not self._id) ) or not self._heartbeat_termination_trigger: @@ -354,7 +357,7 @@ def _online_dispatch_callback( buffer: list[typing.Any], category: str, url: str = self._url, - run_id: str = self._id, + run_id: typing.Optional[str] = self._id, headers: dict[str, str] = self._headers, ) -> None: if not buffer: @@ -636,7 +639,7 @@ def add_process( self, identifier: str, *cmd_args, - executable: typing.Optional[typing.Union[str]] = None, + executable: typing.Optional[typing.Union[str, pathlib.Path]] = None, script: typing.Optional[pydantic.FilePath] = None, input_file: typing.Optional[pydantic.FilePath] = None, completion_callback: typing.Optional[ @@ -708,12 +711,20 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None: "due to function pickling restrictions for multiprocessing" ) + if isinstance(executable, pathlib.Path): + if not executable.is_file(): + raise FileNotFoundError( + f"Executable '{executable}' is not a valid file" + ) + + executable_str = f"{executable}" + _cmd_list: typing.List[str] = [] _pos_args = list(cmd_args) # Assemble the command for saving to metadata as string if executable: - _cmd_list += [executable] + _cmd_list += [executable_str] else: _cmd_list += [_pos_args[0]] executable = _pos_args[0] @@ -742,10 +753,10 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None: self._executor.add_process( identifier, *_pos_args, - executable=executable, + executable=executable_str, script=script, input_file=input_file, - completion_callback=completion_callback, + completion_callback=completion_callback, # type: ignore completion_trigger=completion_trigger, env=env, **cmd_kwargs, @@ -1368,14 +1379,14 @@ def set_status( if self._mode == "disabled": return True - if not self._active: + if not self._active or not self._name: self._error("Run is not active") return False data: dict[str, str] = {"name": self._name, "status": status} self._status = status - if self._simvue.update(data): + if self._simvue and self._simvue.update(data): return True return False diff --git a/simvue/types.py b/simvue/types.py index ec194d73..95b3c46c 100644 --- a/simvue/types.py +++ b/simvue/types.py @@ -5,6 +5,7 @@ except ImportError: from typing_extensions import TypeAlias + if typing.TYPE_CHECKING: from numpy import ndarray from pandas import DataFrame diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index dfa966fe..7b10d942 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -1,10 +1,13 @@ import pytest +import pytest_mock import time import typing import contextlib import inspect import tempfile +import threading import uuid +import psutil import pathlib import concurrent.futures import random @@ -461,22 +464,37 @@ def test_save_object( @pytest.mark.run -def test_abort_on_alert_process(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: +def test_abort_on_alert_process(create_plain_run: typing.Tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture) -> None: + def testing_exit(status: int) -> None: + raise SystemExit(status) + mocker.patch("os._exit", testing_exit) + N_PROCESSES: int = 3 run, _ = create_plain_run run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 - run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") + run._testing = True + run.add_process(identifier="forever_long", executable="bash", c="&".join(["sleep 10"] * N_PROCESSES)) + process_id = list(run._executor._processes.values())[0].pid + process = psutil.Process(process_id) + assert len(child_processes := process.children(recursive=True)) == 3 time.sleep(2) client = sv_cl.Client() client.abort_run(run._id, reason="testing abort") time.sleep(4) + for child in child_processes: + assert not child.is_running() if not run._status == "terminated": run.kill_all_processes() raise AssertionError("Run was not terminated") @pytest.mark.run -def test_abort_on_alert_python(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: +def test_abort_on_alert_python(create_plain_run: typing.Tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture) -> None: + abort_set = threading.Event() + def testing_exit(status: int) -> None: + abort_set.set() + raise SystemExit(status) + mocker.patch("os._exit", testing_exit) run, _ = create_plain_run run.config(resources_metrics_interval=1) run._heartbeat_interval = 1 @@ -488,10 +506,10 @@ def test_abort_on_alert_python(create_plain_run: typing.Tuple[sv_run.Run, dict]) if i == 4: client.abort_run(run._id, reason="testing abort") i += 1 - if i == 10: + if abort_set.is_set() or i > 9: break - assert i == 4 + assert i < 7 assert run._status == "terminated" From ef5f2fe6367b50028cda7edd2f915a7dc93bcfa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Zar=C4=99bski?= Date: Mon, 10 Jun 2024 10:47:58 +0100 Subject: [PATCH 12/12] Use sigkill to prevent zombies --- simvue/executor.py | 6 +++++- tests/refactor/test_run_class.py | 11 ++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/simvue/executor.py b/simvue/executor.py index cd657659..b9682454 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -308,8 +308,12 @@ def kill_process(self, process_id: str) -> None: logger.debug(f"Terminating child process {child.pid}: {child.name()}") child.kill() + for child in parent.children(recursive=True): + child.wait() + logger.debug(f"Terminating child process {process.pid}: {process.args}") - process.terminate() + process.kill() + process.wait() self._execute_callback(process_id) diff --git a/tests/refactor/test_run_class.py b/tests/refactor/test_run_class.py index 7b10d942..16fad8ad 100644 --- a/tests/refactor/test_run_class.py +++ b/tests/refactor/test_run_class.py @@ -517,6 +517,15 @@ def testing_exit(status: int) -> None: def test_kill_all_processes(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None: run, _ = create_plain_run run.config(resources_metrics_interval=1) - run.add_process(identifier="forever_long", executable="bash", c="sleep 10000") + run.add_process(identifier="forever_long_1", executable="bash", c="sleep 10000") + run.add_process(identifier="forever_long_2", executable="bash", c="sleep 10000") + processes = [ + psutil.Process(process.pid) + for process in run._executor._processes.values() + ] time.sleep(2) run.kill_all_processes() + time.sleep(4) + for process in processes: + assert not process.is_running() + assert all(not child.is_running() for child in process.children(recursive=True))