Skip to content

Commit a9d8a7e

Browse files
committed
Merge branch '782-the-executor-doesnt-work-when-reconnect-is-used' of github.com:simvue-io/python-api into 782-the-executor-doesnt-work-when-reconnect-is-used
2 parents 7eb6b25 + b5bcaf1 commit a9d8a7e

File tree

4 files changed

+81
-44
lines changed

4 files changed

+81
-44
lines changed

simvue/executor.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import multiprocessing
1717
import threading
1818
import os
19+
import shutil
1920
import psutil
2021
import subprocess
2122
import contextlib
@@ -205,6 +206,19 @@ def callback_function(status_code: int, std_out: str, std_err: str) -> None:
205206
"due to function pickling restrictions"
206207
)
207208

209+
# To check the executable provided by the user exists combine with environment
210+
# PATH variable if exists, if not provided use the current environment
211+
_session_path: str | None = (os.environ.copy() | (env or {})).get("PATH", None)
212+
213+
if (
214+
executable
215+
and not pathlib.Path(executable).exists()
216+
and not shutil.which(executable, path=_session_path)
217+
):
218+
raise FileNotFoundError(
219+
f"Executable '{executable}' does not exist, please check the path/environment."
220+
)
221+
208222
if script:
209223
self._runner.save_file(file_path=script, category="code")
210224

@@ -377,7 +391,7 @@ def _update_alerts(self) -> None:
377391
# This is so that if a process incorrectly reports its return code,
378392
# the user can manually set the correct status depending on logs etc.
379393
_alert = UserAlert(identifier=self._alert_ids[proc_id])
380-
_is_set = _alert.get_status(run_id=self._runner._id)
394+
_is_set = _alert.get_status(run_id=self._runner.id)
381395

382396
if process.returncode != 0:
383397
# If the process fails then purge the dispatcher event queue
@@ -404,6 +418,10 @@ def _update_alerts(self) -> None:
404418

405419
def _save_output(self) -> None:
406420
"""Save the output to Simvue"""
421+
if self._runner.status != "running":
422+
logger.debug("Run is not active, skipping output save.")
423+
return
424+
407425
for proc_id in self._processes.keys():
408426
# Only save the file if the contents are not empty
409427
if self.std_err(proc_id):

simvue/run.py

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ def __init__(
146146
```
147147
"""
148148
self._uuid: str = f"{uuid.uuid4()}"
149-
self._name: str | None = None
150149

151150
# monitor duration with respect to retention period
152151
self._timer: float = 0
@@ -160,7 +159,6 @@ def __init__(
160159
self._executor = Executor(self)
161160
self._dispatcher: DispatcherBaseClass | None = None
162161

163-
self._id: str | None = None
164162
self._folder: Folder | None = None
165163
self._term_color: bool = True
166164
self._suppress_errors: bool = False
@@ -259,7 +257,7 @@ def __exit__(
259257
) -> None:
260258
logger.debug(
261259
"Automatically closing run '%s' in status %s",
262-
self._id if self._user_config.run.mode == "online" else "unregistered",
260+
self.id if self._user_config.run.mode == "online" else "unregistered",
263261
self._status,
264262
)
265263

@@ -364,24 +362,25 @@ def _get_internal_metrics(
364362
# Set join on fail to false as if an error is thrown
365363
# join would be called on this thread and a thread cannot
366364
# join itself!
367-
self._add_metrics_to_dispatch(
368-
_current_system_measure.to_dict(),
369-
join_on_fail=False,
370-
step=system_metrics_step,
371-
)
365+
if self.status == "running":
366+
self._add_metrics_to_dispatch(
367+
_current_system_measure.to_dict(),
368+
join_on_fail=False,
369+
step=system_metrics_step,
370+
)
372371

373372
# For the first emissions metrics reading, the time interval to use
374373
# Is the time since the run started, otherwise just use the time between readings
375374
if self._emissions_monitor:
376375
_estimated = self._emissions_monitor.estimate_co2_emissions(
377-
process_id=f"{self._name}",
376+
process_id=f"{self._sv_obj.name}",
378377
cpu_percent=_current_system_measure.cpu_percent,
379378
measure_interval=(time.time() - self._start_time)
380379
if system_metrics_step == 0
381380
else self._system_metrics_interval,
382381
gpu_percent=_current_system_measure.gpu_percent,
383382
)
384-
if _estimated:
383+
if _estimated and self.status == "running":
385384
self._add_metrics_to_dispatch(
386385
self._emissions_monitor.simvue_metrics(),
387386
join_on_fail=False,
@@ -394,7 +393,7 @@ def _create_heartbeat_callback(
394393
"""Defines the callback executed at the heartbeat interval for the Run."""
395394
if (
396395
self._user_config.run.mode == "online"
397-
and (not self._user_config.server.url or not self._id)
396+
and (not self._user_config.server.url or not self.id)
398397
) or not self._heartbeat_termination_trigger:
399398
raise RuntimeError("Could not commence heartbeat, run not initialised")
400399

@@ -459,7 +458,7 @@ def _create_dispatch_callback(
459458
executed on metrics and events objects held in a buffer.
460459
"""
461460

462-
if self._user_config.run.mode == "online" and not self._id:
461+
if self._user_config.run.mode == "online" and not self.id:
463462
raise RuntimeError("Expected identifier for run")
464463

465464
if (
@@ -590,7 +589,6 @@ def _error(self, message: str, join_threads: bool = True) -> None:
590589
# Simvue support now terminated as the instance of Run has entered
591590
# the dormant state due to exception throw so set listing to be 'lost'
592591
if self._status == "running" and self._sv_obj:
593-
self._sv_obj.name = self._name
594592
self._sv_obj.status = "lost"
595593
self._sv_obj.commit()
596594

@@ -701,8 +699,6 @@ def init(
701699
elif not name and self._user_config.run.mode == "offline":
702700
name = randomname.get_name()
703701

704-
self._name = name
705-
706702
self._status = "running" if running else "created"
707703

708704
# Parse the time to live/retention time if specified
@@ -750,28 +746,20 @@ def init(
750746
self._data = self._sv_obj._staging
751747
self._sv_obj.commit()
752748

753-
if self._user_config.run.mode == "online":
754-
name = self._sv_obj.name
755-
756-
self._id = self._sv_obj.id
757-
758-
if not name:
749+
if not self.name:
759750
return False
760751

761-
elif name is not True:
762-
self._name = name
763-
764752
if self._status == "running":
765753
self._start()
766754

767755
if self._user_config.run.mode == "online":
768756
click.secho(
769-
f"[simvue] Run {self._name} created",
757+
f"[simvue] Run {self._sv_obj.name} created",
770758
bold=self._term_color,
771759
fg="green" if self._term_color else None,
772760
)
773761
click.secho(
774-
f"[simvue] Monitor in the UI at {self._user_config.server.url.rsplit('/api', 1)[0]}/dashboard/runs/run/{self._id}",
762+
f"[simvue] Monitor in the UI at {self._user_config.server.url.rsplit('/api', 1)[0]}/dashboard/runs/run/{self.id}",
775763
bold=self._term_color,
776764
fg="green" if self._term_color else None,
777765
)
@@ -951,7 +939,23 @@ def executor(self) -> Executor:
951939
@property
952940
def name(self) -> str | None:
953941
"""Return the name of the run"""
954-
return self._name
942+
if not self._sv_obj:
943+
raise RuntimeError("Run has not been initialised")
944+
return self._sv_obj.name
945+
946+
@property
947+
def status(
948+
self,
949+
) -> (
950+
typing.Literal[
951+
"created", "running", "completed", "failed", "terminated", "lost"
952+
]
953+
| None
954+
):
955+
"""Return the status of the run"""
956+
if not self._sv_obj:
957+
raise RuntimeError("Run has not been initialised")
958+
return self._sv_obj.status
955959

956960
@property
957961
def uid(self) -> str:
@@ -961,7 +965,9 @@ def uid(self) -> str:
961965
@property
962966
def id(self) -> str | None:
963967
"""Return the unique id of the run"""
964-
return self._id
968+
if not self._sv_obj:
969+
raise RuntimeError("Run has not been initialised")
970+
return self._sv_obj.id
965971

966972
@skip_if_failed("_aborted", "_suppress_errors", False)
967973
@pydantic.validate_call
@@ -981,7 +987,7 @@ def reconnect(self, run_id: str) -> bool:
981987
self._status = "running"
982988

983989
self._id = run_id
984-
self._sv_obj = RunObject(identifier=self._id, _read_only=False)
990+
self._sv_obj = RunObject(identifier=run_id, _read_only=False)
985991
self._name = self._sv_obj.name
986992
self._sv_obj.status = self._status
987993
self._sv_obj.system = get_system()
@@ -1614,7 +1620,7 @@ def _tidy_run(self) -> None:
16141620
and self._status != "created"
16151621
):
16161622
self._user_config.offline.cache.joinpath(
1617-
"runs", f"{self._id}.closed"
1623+
"runs", f"{self.id}.closed"
16181624
).touch()
16191625

16201626
if _non_zero := self.executor.exit_status:
@@ -2088,7 +2094,7 @@ def log_alert(
20882094
)
20892095
return False
20902096
_alert.read_only(False)
2091-
_alert.set_status(run_id=self._id, status=state)
2097+
_alert.set_status(run_id=self.id, status=state)
20922098
_alert.commit()
20932099

20942100
return True

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur
279279
if create_objects:
280280
TEST_DATA["metrics"] = ("metric_counter", "metric_val")
281281

282-
TEST_DATA["run_id"] = run._id
283-
TEST_DATA["run_name"] = run._name
282+
TEST_DATA["run_id"] = run.id
283+
TEST_DATA["run_name"] = run.name
284284
TEST_DATA["url"] = run._user_config.server.url
285285
TEST_DATA["headers"] = run._headers
286286
TEST_DATA["pid"] = run._pid

tests/functional/test_run_class.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None:
674674
ids=[f"scenario_{i}" for i in range(1, 6)],
675675
)
676676
def test_save_file_online(
677-
create_plain_run: typing.Tuple[sv_run.Run, dict],
677+
create_plain_run: tuple[sv_run.Run, dict],
678678
valid_mimetype: bool,
679679
preserve_path: bool,
680680
name: str | None,
@@ -743,7 +743,7 @@ def test_save_file_online(
743743
ids=[f"scenario_{i}" for i in range(1, 6)],
744744
)
745745
def test_save_file_offline(
746-
create_plain_run_offline: typing.Tuple[sv_run.Run, dict],
746+
create_plain_run_offline: tuple[sv_run.Run, dict],
747747
preserve_path: bool,
748748
name: str | None,
749749
allow_pickle: bool,
@@ -798,7 +798,7 @@ def test_save_file_offline(
798798

799799
@pytest.mark.run
800800
def test_update_tags_running(
801-
create_plain_run: typing.Tuple[sv_run.Run, dict],
801+
create_plain_run: tuple[sv_run.Run, dict],
802802
request: pytest.FixtureRequest,
803803
) -> None:
804804
simvue_run, _ = create_plain_run
@@ -824,7 +824,7 @@ def test_update_tags_running(
824824

825825
@pytest.mark.run
826826
def test_update_tags_created(
827-
create_pending_run: typing.Tuple[sv_run.Run, dict],
827+
create_pending_run: tuple[sv_run.Run, dict],
828828
request: pytest.FixtureRequest,
829829
) -> None:
830830
simvue_run, _ = create_pending_run
@@ -851,7 +851,7 @@ def test_update_tags_created(
851851
@pytest.mark.offline
852852
@pytest.mark.run
853853
def test_update_tags_offline(
854-
create_plain_run_offline: typing.Tuple[sv_run.Run, dict],
854+
create_plain_run_offline: tuple[sv_run.Run, dict],
855855
) -> None:
856856
simvue_run, _ = create_plain_run_offline
857857
run_name = simvue_run._name
@@ -879,7 +879,7 @@ def test_update_tags_offline(
879879
@pytest.mark.run
880880
@pytest.mark.parametrize("object_type", ("DataFrame", "ndarray"))
881881
def test_save_object(
882-
create_plain_run: typing.Tuple[sv_run.Run, dict], object_type: str
882+
create_plain_run: tuple[sv_run.Run, dict], object_type: str
883883
) -> None:
884884
simvue_run, _ = create_plain_run
885885

@@ -1081,7 +1081,7 @@ def abort_callback(abort_run=trigger) -> None:
10811081

10821082
@pytest.mark.run
10831083
def test_abort_on_alert_python(
1084-
speedy_heartbeat, create_plain_run: typing.Tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture
1084+
speedy_heartbeat, create_plain_run: tuple[sv_run.Run, dict], mocker: pytest_mock.MockerFixture
10851085
) -> None:
10861086
timeout: int = 20
10871087
interval: int = 0
@@ -1094,7 +1094,7 @@ def test_abort_on_alert_python(
10941094

10951095
@pytest.mark.run
10961096
def test_abort_on_alert_raise(
1097-
create_plain_run: typing.Tuple[sv_run.Run, dict]
1097+
create_plain_run: tuple[sv_run.Run, dict]
10981098
) -> None:
10991099

11001100
run, _ = create_plain_run
@@ -1119,7 +1119,7 @@ def test_abort_on_alert_raise(
11191119

11201120

11211121
@pytest.mark.run
1122-
def test_kill_all_processes(create_plain_run: typing.Tuple[sv_run.Run, dict]) -> None:
1122+
def test_kill_all_processes(create_plain_run: tuple[sv_run.Run, dict]) -> None:
11231123
run, _ = create_plain_run
11241124
run.config(system_metrics_interval=1)
11251125
run.add_process(identifier="forever_long_1", executable="bash", c="sleep 10000")
@@ -1150,7 +1150,7 @@ def test_run_created_with_no_timeout() -> None:
11501150

11511151
@pytest.mark.parametrize("mode", ("online", "offline"), ids=("online", "offline"))
11521152
@pytest.mark.run
1153-
def test_reconnect(mode, monkeypatch: pytest.MonkeyPatch) -> None:
1153+
def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None:
11541154
temp_d: tempfile.TemporaryDirectory | None = None
11551155

11561156
if mode == "offline":
@@ -1191,3 +1191,16 @@ def test_reconnect(mode, monkeypatch: pytest.MonkeyPatch) -> None:
11911191
if temp_d:
11921192
temp_d.cleanup()
11931193

1194+
1195+
def test_reconnect_with_process(create_plain_run: tuple[sv_run.Run, dict]) -> None:
1196+
run, _ = create_plain_run
1197+
run.init(name="test_reconnect_with_process", folder="/simvue_unit_testing", retention_period="2 minutes", running=False)
1198+
run.close()
1199+
1200+
with sv_run.Run() as new_run:
1201+
new_run.reconnect(run.id)
1202+
run.add_process(
1203+
identifier="test_process",
1204+
executable="bash",
1205+
c="echo 'Hello World!'",
1206+
)

0 commit comments

Comments
 (0)