Skip to content

Commit b485651

Browse files
authored
Merge pull request #343 from simvue-io/hotfix/fix-join-infinite-loop
Fix infinite thread join bug and resource metrics disabling
2 parents 0d5b5fa + 0620196 commit b485651

File tree

3 files changed

+64
-48
lines changed

3 files changed

+64
-48
lines changed

simvue/run.py

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def __init__(self, mode: typing.Literal["online", "offline"] = "online") -> None
101101
self._simvue: typing.Optional[SimvueBaseClass] = None
102102
self._pid: typing.Optional[int] = 0
103103
self._shutdown_event: typing.Optional[threading.Event] = None
104+
self._configuration_lock = threading.Lock()
104105
self._heartbeat_termination_trigger: typing.Optional[threading.Event] = None
105106
self._storage_id: typing.Optional[str] = None
106107
self._heartbeat_thread: typing.Optional[threading.Thread] = None
@@ -218,27 +219,27 @@ def _create_heartbeat_callback(
218219
raise RuntimeError("Could not commence heartbeat, run not initialised")
219220

220221
def _heartbeat(
221-
url: typing.Optional[str] = self._url,
222-
headers: dict[str, str] = self._headers,
223-
run_id: typing.Optional[str] = self._id,
224-
online: bool = self._mode == "online",
225222
heartbeat_trigger: threading.Event = self._heartbeat_termination_trigger,
226223
) -> None:
227224
last_heartbeat = time.time()
228225
last_res_metric_call = time.time()
229226

230-
self._add_metrics_to_dispatch(self._get_sysinfo())
231-
232227
while not heartbeat_trigger.is_set():
233228
time.sleep(0.1)
234229

235-
if (
236-
self._resources_metrics_interval
237-
and (res_time := time.time()) - last_res_metric_call
238-
> self._resources_metrics_interval
239-
):
240-
self._add_metrics_to_dispatch(self._get_sysinfo())
241-
last_res_metric_call = res_time
230+
with self._configuration_lock:
231+
if (
232+
self._resources_metrics_interval
233+
and (res_time := time.time()) - last_res_metric_call
234+
> self._resources_metrics_interval
235+
):
236+
# Set join on fail to false as if an error is thrown
237+
# join would be called on this thread and a thread cannot
238+
# join itself!
239+
self._add_metrics_to_dispatch(
240+
self._get_sysinfo(), join_on_fail=False
241+
)
242+
last_res_metric_call = res_time
242243

243244
if time.time() - last_heartbeat < HEARTBEAT_INTERVAL:
244245
continue
@@ -377,20 +378,23 @@ def _start(self, reconnect: bool = False) -> bool:
377378
self._error(e.args[0])
378379
return False
379380

381+
self._active = True
382+
380383
self._dispatcher.start()
381384
self._heartbeat_thread.start()
382385

383-
self._active = True
384-
385386
return True
386387

387-
def _error(self, message: str) -> None:
388+
def _error(self, message: str, join_threads: bool = True) -> None:
388389
"""Raise an exception if necessary and log error
389390
390391
Parameters
391392
----------
392393
message : str
393394
message to display in exception or logger message
395+
join_threads : bool
396+
whether to join the threads on failure. This option exists to
397+
prevent join being called in nested thread calls to this function.
394398
395399
Raises
396400
------
@@ -400,7 +404,8 @@ def _error(self, message: str) -> None:
400404
# Stop heartbeat
401405
if self._heartbeat_termination_trigger and self._heartbeat_thread:
402406
self._heartbeat_termination_trigger.set()
403-
self._heartbeat_thread.join()
407+
if join_threads:
408+
self._heartbeat_thread.join()
404409

405410
# Finish stopping all threads
406411
if self._shutdown_event:
@@ -409,7 +414,8 @@ def _error(self, message: str) -> None:
409414
# Purge the queue as we can no longer send metrics
410415
if self._dispatcher and self._dispatcher.is_alive():
411416
self._dispatcher.purge()
412-
self._dispatcher.join()
417+
if join_threads:
418+
self._dispatcher.join()
413419

414420
if not self._suppress_errors:
415421
raise RuntimeError(message)
@@ -793,27 +799,28 @@ def config(
793799
_description_
794800
"""
795801

796-
if suppress_errors is not None:
797-
self._suppress_errors = suppress_errors
802+
with self._configuration_lock:
803+
if suppress_errors is not None:
804+
self._suppress_errors = suppress_errors
798805

799-
if queue_blocking is not None:
800-
self._queue_blocking = queue_blocking
806+
if queue_blocking is not None:
807+
self._queue_blocking = queue_blocking
801808

802-
if resources_metrics_interval and disable_resources_metrics:
803-
self._error(
804-
"Setting of resource metric interval and disabling resource metrics is ambiguous"
805-
)
806-
return False
809+
if resources_metrics_interval and disable_resources_metrics:
810+
self._error(
811+
"Setting of resource metric interval and disabling resource metrics is ambiguous"
812+
)
813+
return False
807814

808-
if disable_resources_metrics:
809-
self._pid = None
810-
self._resources_metrics_interval = None
815+
if disable_resources_metrics:
816+
self._pid = None
817+
self._resources_metrics_interval = None
811818

812-
if resources_metrics_interval:
813-
self._resources_metrics_interval = resources_metrics_interval
819+
if resources_metrics_interval:
820+
self._resources_metrics_interval = resources_metrics_interval
814821

815-
if storage_id:
816-
self._storage_id = storage_id
822+
if storage_id:
823+
self._storage_id = storage_id
817824

818825
return True
819826

@@ -900,6 +907,7 @@ def _add_metrics_to_dispatch(
900907
step: typing.Optional[int] = None,
901908
time: typing.Optional[int] = None,
902909
timestamp: typing.Optional[str] = None,
910+
join_on_fail: bool = True,
903911
) -> bool:
904912
if self._mode == "disabled":
905913
return True
@@ -909,19 +917,21 @@ def _add_metrics_to_dispatch(
909917
return True
910918

911919
if not self._simvue or not self._dispatcher:
912-
self._error("Cannot log metrics, run not initialised")
920+
self._error("Cannot log metrics, run not initialised", join_on_fail)
913921
return False
914922

915923
if not self._active:
916-
self._error("Run is not active")
924+
self._error("Run is not active", join_on_fail)
917925
return False
918926

919927
if self._status != "running":
920-
self._error("Cannot log metrics when not in the running state")
928+
self._error(
929+
"Cannot log metrics when not in the running state", join_on_fail
930+
)
921931
return False
922932

923933
if timestamp and not validate_timestamp(timestamp):
924-
self._error("Invalid timestamp format")
934+
self._error("Invalid timestamp format", join_on_fail)
925935
return False
926936

927937
_data: dict[str, typing.Any] = {

tests/refactor/test_run_class.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ def test_log_metrics(
3838
name=f"test_run_{str(uuid.uuid4()).split('-', 1)[0]}",
3939
tags=["simvue_client_unit_tests"],
4040
folder="/simvue_unit_testing",
41-
ttl=60 * 60,
42-
visibility=visibility
41+
retention_period="1 hour",
42+
visibility=visibility,
43+
resources_metrics_interval=1
4344
)
4445
return
4546

@@ -48,6 +49,7 @@ def test_log_metrics(
4849
tags=["simvue_client_unit_tests"],
4950
folder="/simvue_unit_testing",
5051
visibility=visibility,
52+
resources_metrics_interval=1,
5153
retention_period="1 hour",
5254
)
5355

tests/refactor/test_scenarios.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
@pytest.mark.scenario
1111
def test_time_multi_run_create_threshold() -> None:
1212
start = time.time()
13-
for i in range(20):
14-
with simvue.Run() as run:
15-
run.init(
16-
f"test run {i}",
17-
tags=["test_benchmarking"],
18-
folder="/simvue_benchmark_testing",
19-
retention_period="1 hour"
20-
)
13+
runs: list[simvue.Run] = []
14+
for i in range(10):
15+
run = simvue.Run()
16+
run.init(
17+
f"test run {i}",
18+
tags=["test_benchmarking"],
19+
folder="/simvue_benchmark_testing",
20+
retention_period="1 hour"
21+
)
22+
runs.append(run)
23+
for run in runs:
24+
run.close()
2125
end = time.time()
2226
client = simvue.Client()
2327
with contextlib.suppress(RuntimeError):

0 commit comments

Comments
 (0)