Skip to content

Commit b5bcaf1

Browse files
committed
🐛 Fix addition of process to reconnected run
Uses underlying low level API run object to retrieve name and ID for reconnect. Also ensures run is "running" when attempting to log system metrics or save files."
1 parent d6ce062 commit b5bcaf1

File tree

4 files changed

+81
-45
lines changed

4 files changed

+81
-45
lines changed

simvue/executor.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import multiprocessing
1717
import threading
1818
import os
19+
import shutil
1920
import psutil
2021
import subprocess
2122
import contextlib
@@ -205,6 +206,19 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
205206
"due to function pickling restrictions"
206207
)
207208

209+
# To check the executable provided by the user exists combine with environment
210+
# PATH variable if exists, if not provided use the current environment
211+
_session_path: str | None = (os.environ.copy() | (env or {})).get("PATH", None)
212+
213+
if (
214+
executable
215+
and not pathlib.Path(executable).exists()
216+
and not shutil.which(executable, path=_session_path)
217+
):
218+
raise FileNotFoundError(
219+
f"Executable '{executable}' does not exist, please check the path/environment."
220+
)
221+
208222
if script:
209223
self._runner.save_file(file_path=script, category="code")
210224

@@ -377,7 +391,7 @@ def _update_alerts(self) -> None:
377391
# This is so that if a process incorrectly reports its return code,
378392
# the user can manually set the correct status depending on logs etc.
379393
_alert = UserAlert(identifier=self._alert_ids[proc_id])
380-
_is_set = _alert.get_status(run_id=self._runner._id)
394+
_is_set = _alert.get_status(run_id=self._runner.id)
381395

382396
if process.returncode != 0:
383397
# If the process fails then purge the dispatcher event queue
@@ -404,6 +418,10 @@ def _update_alerts(self) -> None:
404418

405419
def _save_output(self) -> None:
406420
"""Save the output to Simvue"""
421+
if self._runner.status != "running":
422+
logger.debug("Run is not active, skipping output save.")
423+
return
424+
407425
for proc_id in self._processes.keys():
408426
# Only save the file if the contents are not empty
409427
if self.std_err(proc_id):

simvue/run.py

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ def __init__(
147147
```
148148
"""
149149
self._uuid: str = f"{uuid.uuid4()}"
150-
self._name: str | None = None
151150

152151
# monitor duration with respect to retention period
153152
self._timer: float = 0
@@ -161,7 +160,6 @@ def __init__(
161160
self._executor = Executor(self)
162161
self._dispatcher: DispatcherBaseClass | None = None
163162

164-
self._id: str | None = None
165163
self._folder: Folder | None = None
166164
self._term_color: bool = True
167165
self._suppress_errors: bool = False
@@ -260,7 +258,7 @@ def __exit__(
260258
) -> None:
261259
logger.debug(
262260
"Automatically closing run '%s' in status %s",
263-
self._id if self._user_config.run.mode == "online" else "unregistered",
261+
self.id if self._user_config.run.mode == "online" else "unregistered",
264262
self._status,
265263
)
266264

@@ -365,24 +363,25 @@ def _get_internal_metrics(
365363
# Set join on fail to false as if an error is thrown
366364
# join would be called on this thread and a thread cannot
367365
# join itself!
368-
self._add_metrics_to_dispatch(
369-
_current_system_measure.to_dict(),
370-
join_on_fail=False,
371-
step=system_metrics_step,
372-
)
366+
if self.status == "running":
367+
self._add_metrics_to_dispatch(
368+
_current_system_measure.to_dict(),
369+
join_on_fail=False,
370+
step=system_metrics_step,
371+
)
373372

374373
# For the first emissions metrics reading, the time interval to use
375374
# Is the time since the run started, otherwise just use the time between readings
376375
if self._emissions_monitor:
377376
_estimated = self._emissions_monitor.estimate_co2_emissions(
378-
process_id=f"{self._name}",
377+
process_id=f"{self._sv_obj.name}",
379378
cpu_percent=_current_system_measure.cpu_percent,
380379
measure_interval=(time.time() - self._start_time)
381380
if system_metrics_step == 0
382381
else self._system_metrics_interval,
383382
gpu_percent=_current_system_measure.gpu_percent,
384383
)
385-
if _estimated:
384+
if _estimated and self.status == "running":
386385
self._add_metrics_to_dispatch(
387386
self._emissions_monitor.simvue_metrics(),
388387
join_on_fail=False,
@@ -395,7 +394,7 @@ def _create_heartbeat_callback(
395394
"""Defines the callback executed at the heartbeat interval for the Run."""
396395
if (
397396
self._user_config.run.mode == "online"
398-
and (not self._user_config.server.url or not self._id)
397+
and (not self._user_config.server.url or not self.id)
399398
) or not self._heartbeat_termination_trigger:
400399
raise RuntimeError("Could not commence heartbeat, run not initialised")
401400

@@ -460,7 +459,7 @@ def _create_dispatch_callback(
460459
executed on metrics and events objects held in a buffer.
461460
"""
462461

463-
if self._user_config.run.mode == "online" and not self._id:
462+
if self._user_config.run.mode == "online" and not self.id:
464463
raise RuntimeError("Expected identifier for run")
465464

466465
if (
@@ -591,7 +590,6 @@ def _error(self, message: str, join_threads: bool = True) -> None:
591590
# Simvue support now terminated as the instance of Run has entered
592591
# the dormant state due to exception throw so set listing to be 'lost'
593592
if self._status == "running" and self._sv_obj:
594-
self._sv_obj.name = self._name
595593
self._sv_obj.status = "lost"
596594
self._sv_obj.commit()
597595

@@ -702,8 +700,6 @@ def init(
702700
elif not name and self._user_config.run.mode == "offline":
703701
name = randomname.get_name()
704702

705-
self._name = name
706-
707703
self._status = "running" if running else "created"
708704

709705
# Parse the time to live/retention time if specified
@@ -751,28 +747,20 @@ def init(
751747
self._data = self._sv_obj._staging
752748
self._sv_obj.commit()
753749

754-
if self._user_config.run.mode == "online":
755-
name = self._sv_obj.name
756-
757-
self._id = self._sv_obj.id
758-
759-
if not name:
750+
if not self.name:
760751
return False
761752

762-
elif name is not True:
763-
self._name = name
764-
765753
if self._status == "running":
766754
self._start()
767755

768756
if self._user_config.run.mode == "online":
769757
click.secho(
770-
f"[simvue] Run {self._name} created",
758+
f"[simvue] Run {self._sv_obj.name} created",
771759
bold=self._term_color,
772760
fg="green" if self._term_color else None,
773761
)
774762
click.secho(
775-
f"[simvue] Monitor in the UI at {self._user_config.server.url.rsplit('/api', 1)[0]}/dashboard/runs/run/{self._id}",
763+
f"[simvue] Monitor in the UI at {self._user_config.server.url.rsplit('/api', 1)[0]}/dashboard/runs/run/{self.id}",
776764
bold=self._term_color,
777765
fg="green" if self._term_color else None,
778766
)
@@ -952,7 +940,23 @@ def executor(self) -> Executor:
952940
@property
953941
def name(self) -> str | None:
954942
"""Return the name of the run"""
955-
return self._name
943+
if not self._sv_obj:
944+
raise RuntimeError("Run has not been initialised")
945+
return self._sv_obj.name
946+
947+
@property
948+
def status(
949+
self,
950+
) -> (
951+
typing.Literal[
952+
"created", "running", "completed", "failed", "terminated", "lost"
953+
]
954+
| None
955+
):
956+
"""Return the status of the run"""
957+
if not self._sv_obj:
958+
raise RuntimeError("Run has not been initialised")
959+
return self._sv_obj.status
956960

957961
@property
958962
def uid(self) -> str:
@@ -962,7 +966,9 @@ def uid(self) -> str:
962966
@property
963967
def id(self) -> str | None:
964968
"""Return the unique id of the run"""
965-
return self._id
969+
if not self._sv_obj:
970+
raise RuntimeError("Run has not been initialised")
971+
return self._sv_obj.id
966972

967973
@skip_if_failed("_aborted", "_suppress_errors", False)
968974
@pydantic.validate_call
@@ -981,8 +987,7 @@ def reconnect(self, run_id: str) -> bool:
981987
"""
982988
self._status = "running"
983989

984-
self._id = run_id
985-
self._sv_obj = RunObject(identifier=self._id, _read_only=False)
990+
self._sv_obj = RunObject(identifier=run_id, _read_only=False)
986991
self._sv_obj.system = get_system()
987992
self._start()
988993

@@ -1612,7 +1617,7 @@ def _tidy_run(self) -> None:
16121617
and self._status != "created"
16131618
):
16141619
self._user_config.offline.cache.joinpath(
1615-
"runs", f"{self._id}.closed"
1620+
"runs", f"{self.id}.closed"
16161621
).touch()
16171622

16181623
if _non_zero := self.executor.exit_status:
@@ -2086,7 +2091,7 @@ def log_alert(
20862091
)
20872092
return False
20882093
_alert.read_only(False)
2089-
_alert.set_status(run_id=self._id, status=state)
2094+
_alert.set_status(run_id=self.id, status=state)
20902095
_alert.commit()
20912096

20922097
return True

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur
274274
if create_objects:
275275
TEST_DATA["metrics"] = ("metric_counter", "metric_val")
276276

277-
TEST_DATA["run_id"] = run._id
278-
TEST_DATA["run_name"] = run._name
277+
TEST_DATA["run_id"] = run.id
278+
TEST_DATA["run_name"] = run.name
279279
TEST_DATA["url"] = run._user_config.server.url
280280
TEST_DATA["headers"] = run._headers
281281
TEST_DATA["pid"] = run._pid

tests/functional/test_run_class.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None:
672672
ids=[f"scenario_{i}" for i in range(1, 6)],
673673
)
674674
def test_save_file_online(
675-
create_plain_run: typing.Tuple[sv_run.Run, dict],
675+
create_plain_run: tuple[sv_run.Run, dict],
676676
valid_mimetype: bool,
677677
preserve_path: bool,
678678
name: str | None,
@@ -741,7 +741,7 @@ def test_save_file_online(
741741
ids=[f"scenario_{i}" for i in range(1, 6)],
742742
)
743743
def test_save_file_offline(
744-
create_plain_run_offline: typing.Tuple[sv_run.Run, dict],
744+
create_plain_run_offline: tuple[sv_run.Run, dict],
745745
preserve_path: bool,
746746
name: str | None,
747747
allow_pickle: bool,
@@ -796,7 +796,7 @@ def test_save_file_offline(
796796

797797
@pytest.mark.run
798798
def test_update_tags_running(
799-
create_plain_run: typing.Tuple[sv_run.Run, dict],
799+
create_plain_run: tuple[sv_run.Run, dict],
800800
request: pytest.FixtureRequest,
801801
) -> None:
802802
simvue_run, _ = create_plain_run
@@ -822,7 +822,7 @@ def test_update_tags_running(
822822

823823
@pytest.mark.run
824824
def test_update_tags_created(
825-
create_pending_run: typing.Tuple[sv_run.Run, dict],
825+
create_pending_run: tuple[sv_run.Run, dict],
826826
request: pytest.FixtureRequest,
827827
) -> None:
828828
simvue_run, _ = create_pending_run
@@ -849,7 +849,7 @@ def test_update_tags_created(
849849
@pytest.mark.offline
850850
@pytest.mark.run
851851
def test_update_tags_offline(
852-
create_plain_run_offline: typing.Tuple[sv_run.Run, dict],
852+
create_plain_run_offline: tuple[sv_run.Run, dict],
853853
) -> None:
854854
simvue_run, _ = create_plain_run_offline
855855
run_name = simvue_run._name
@@ -877,7 +877,7 @@ def test_update_tags_offline(
877877
@pytest.mark.run
878878
@pytest.mark.parametrize("object_type", ("DataFrame", "ndarray"))
879879
def test_save_object(
880-
create_plain_run: typing.Tuple[sv_run.Run, dict], object_type: str
880+
create_plain_run: tuple[sv_run.Run, dict], object_type: str
881881
) -> None:
882882
simvue_run, _ = create_plain_run
883883

@@ -1079,7 +1079,7 @@ def abort_callback(abort_run=trigger) -> None:
10791079

10801080
@pytest.mark.run
10811081
def test_abort_on_alert_python(
1082-
speedy_heartbeat, create_plain_run: typing.Tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture
1082+
speedy_heartbeat, create_plain_run: tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture
10831083
) -> None:
10841084
timeout: int = 20
10851085
interval: int = 0
@@ -1092,7 +1092,7 @@ def test_abort_on_alert_python(
10921092

10931093
@pytest.mark.run
10941094
def test_abort_on_alert_raise(
1095-
create_plain_run: typing.Tuple[sv_run.Run, dict]
1095+
create_plain_run: tuple[sv_run.Run, dict]
10961096
) -> None:
10971097

10981098
run, _ = create_plain_run
@@ -1117,7 +1117,7 @@ def test_abort_on_alert_raise(
11171117

11181118

11191119
@pytest.mark.run
1120-
def test_kill_all_processes(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None:
1120+
def test_kill_all_processes(create_plain_run: tuple[sv_run.Run, dict]) -> None:
11211121
run, _ = create_plain_run
11221122
run.config(system_metrics_interval=1)
11231123
run.add_process(identifier="forever_long_1", executable="bash", c="sleep 10000")
@@ -1148,7 +1148,7 @@ def test_run_created_with_no_timeout() -> None:
11481148

11491149
@pytest.mark.parametrize("mode", ("online", "offline"), ids=("online", "offline"))
11501150
@pytest.mark.run
1151-
def test_reconnect(mode, monkeypatch: pytest.MonkeyPatch) -> None:
1151+
def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
11521152
temp_d: tempfile.TemporaryDirectory | None = None
11531153

11541154
if mode == "offline":
@@ -1188,3 +1188,16 @@ def test_reconnect(mode, monkeypatch: pytest.MonkeyPatch) -> None:
11881188
if temp_d:
11891189
temp_d.cleanup()
11901190

1191+
1192+
def test_reconnect_with_process(create_plain_run: tuple[sv_run.Run, dict]) -> None:
1193+
run, _ = create_plain_run
1194+
run.init(name="test_reconnect_with_process", folder="/simvue_unit_testing", retention_period="2 minutes", running=False)
1195+
run.close()
1196+
1197+
with sv_run.Run() as new_run:
1198+
new_run.reconnect(run.id)
1199+
run.add_process(
1200+
identifier="test_process",
1201+
executable="bash",
1202+
c="echo 'Hello World!'",
1203+
)

0 commit comments

Comments
 (0)