Skip to content

Commit ebeedec

Browse files
committed
🐛 Set all threads to be daemon=True
Addresses issue of hanging tests due to threads still running, this includes the QueuedDispatcher subclass.
1 parent 7b9aed5 commit ebeedec

File tree

10 files changed

+133
-51
lines changed

10 files changed

+133
-51
lines changed

simvue/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,9 @@ def get_artifacts_as_files(
642642
Artifact.from_run(run_id=run_id, category=category)
643643
)
644644

645-
with ThreadPoolExecutor(CONCURRENT_DOWNLOADS) as executor:
645+
with ThreadPoolExecutor(
646+
CONCURRENT_DOWNLOADS, thread_name_prefix=f"get_artifacts_run_{run_id}"
647+
) as executor:
646648
futures = [
647649
executor.submit(_download_artifact_to_file, artifact, output_dir)
648650
for _, artifact in _artifacts

simvue/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def trigger_check(
8181
target=trigger_check,
8282
args=(completion_callback, completion_trigger, _result),
8383
daemon=True,
84+
name=f"{proc_id}_Thread",
8485
)
8586
thread_out.start()
8687

simvue/factory/dispatch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def Dispatcher(
2323
callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None],
2424
object_types: list[str],
2525
termination_trigger: "Event",
26+
name: str | None = None,
2627
**kwargs,
2728
) -> "DispatcherBaseClass":
2829
"""Returns instance of dispatcher based on configuration
@@ -43,6 +44,8 @@ def Dispatcher(
4344
categories, this is mainly used for creation of queues in a QueueDispatcher
4445
termination_trigger : Event
4546
event which triggers termination of the dispatcher
47+
name : str | None, optional
48+
name for the underlying thread, default None
4649
4750
Returns
4851
-------
@@ -63,5 +66,6 @@ def Dispatcher(
6366
callback=callback,
6467
object_types=object_types,
6568
termination_trigger=termination_trigger,
69+
name=name,
6670
**kwargs,
6771
)

simvue/factory/dispatch/queued.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(
3636
callback: typing.Callable[[list[typing.Any], str], None],
3737
object_types: list[str],
3838
termination_trigger: threading.Event,
39+
name: str | None = None,
3940
max_buffer_size: int = MAX_BUFFER_SIZE,
4041
max_read_rate: float = MAX_REQUESTS_PER_SECOND,
4142
) -> None:
@@ -51,6 +52,8 @@ def __init__(
5152
termination_trigger : threading.Event
5253
a threading event which when set declares that the dispatcher
5354
should terminate
55+
name : str | None, optional
56+
name for underlying thread, default None
5457
max_buffer_size : int
5558
maximum number of items allowed in created buffer.
5659
max_read_rate : float
@@ -62,7 +65,7 @@ def __init__(
6265
object_types=object_types,
6366
termination_trigger=termination_trigger,
6467
)
65-
super().__init__()
68+
super().__init__(name=name, daemon=True)
6669

6770
self._termination_trigger = termination_trigger
6871
self._callback = callback

simvue/run.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,9 @@ def _start(self) -> bool:
539539
)
540540

541541
self._heartbeat_thread = threading.Thread(
542-
target=self._create_heartbeat_callback(), daemon=True
542+
target=self._create_heartbeat_callback(),
543+
daemon=True,
544+
name=f"{self.id}_heartbeat",
543545
)
544546

545547
except RuntimeError as e:

simvue/sender.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ def sender(
201201
for file_path in _offline_files:
202202
upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock)
203203
else:
204-
with ThreadPoolExecutor(max_workers=max_workers) as executor:
204+
with ThreadPoolExecutor(
205+
max_workers=max_workers, thread_name_prefix="sender_session_upload"
206+
) as executor:
205207
_results = executor.map(
206208
lambda file_path: upload_cached_file(
207209
cache_dir=cache_dir,
@@ -230,7 +232,9 @@ def sender(
230232
),
231233
)
232234
else:
233-
with ThreadPoolExecutor(max_workers=max_workers) as executor:
235+
with ThreadPoolExecutor(
236+
max_workers=max_workers, thread_name_prefix="sender_heartbeat"
237+
) as executor:
234238
_results = executor.map(
235239
lambda _heartbeat_file: send_heartbeat(
236240
file_path=_heartbeat_file,

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,4 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur
310310
TEST_DATA["alert_ids"] = _alert_ids
311311

312312
return TEST_DATA
313+

tests/functional/test_dispatch.py

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
from threading import Event, Thread
66
from queue import Queue
7+
from concurrent.futures import ThreadPoolExecutor
78

89

910
from simvue.factory.dispatch.queued import QueuedDispatcher
@@ -12,17 +13,16 @@
1213

1314
# FIXME: Update the layout of these tests
1415

16+
1517
@pytest.mark.dispatch
18+
@pytest.mark.parametrize("overload_buffer", (True, False), ids=("overload", "normal"))
1619
@pytest.mark.parametrize(
17-
"overload_buffer", (True, False),
18-
ids=("overload", "normal")
19-
)
20-
@pytest.mark.parametrize(
21-
"append_during_dispatch", (True, False),
22-
ids=("pre_append", "append")
20+
"append_during_dispatch", (True, False), ids=("pre_append", "append")
2321
)
2422
@pytest.mark.parametrize("multiple", (True, False), ids=("multiple", "single"))
25-
def test_queued_dispatcher(overload_buffer: bool, multiple: bool, append_during_dispatch: bool) -> None:
23+
def test_queued_dispatcher(
24+
overload_buffer: bool, multiple: bool, append_during_dispatch: bool
25+
) -> None:
2626
buffer_size: int = 10
2727
n_elements: int = 2 * buffer_size if overload_buffer else buffer_size - 1
2828
max_read_rate: float = 0.2
@@ -42,24 +42,39 @@ def test_queued_dispatcher(overload_buffer: bool, multiple: bool, append_during_
4242

4343
for variable in variables:
4444
check_dict[variable] = {"counter": 0}
45-
def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> None:
45+
46+
def callback(
47+
___: list[typing.Any], _: str, args=check_dict, var=variable
48+
) -> None:
4649
args[var]["counter"] += 1
50+
4751
dispatchers.append(
48-
QueuedDispatcher(callback, [variable], event, max_buffer_size=buffer_size, max_read_rate=max_read_rate)
52+
QueuedDispatcher(
53+
callback,
54+
[variable],
55+
event,
56+
max_buffer_size=buffer_size,
57+
max_read_rate=max_read_rate,
58+
name=f"Queued_Dispatcher_{variable}"
59+
)
4960
)
5061

5162
if not append_during_dispatch:
5263
for i in range(n_elements):
53-
for variable, dispatcher in zip(variables, dispatchers):
54-
dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False)
64+
for variable, dispatcher in zip(variables, dispatchers):
65+
dispatcher.add_item(
66+
{string.ascii_uppercase[i % 26]: i}, variable, False
67+
)
5568

5669
for dispatcher in dispatchers:
5770
dispatcher.start()
5871

5972
if append_during_dispatch:
6073
for i in range(n_elements):
61-
for variable, dispatcher in zip(variables, dispatchers):
62-
dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False)
74+
for variable, dispatcher in zip(variables, dispatchers):
75+
dispatcher.add_item(
76+
{string.ascii_uppercase[i % 26]: i}, variable, False
77+
)
6378

6479
while not dispatcher.empty:
6580
time.sleep(0.1)
@@ -70,7 +85,9 @@ def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> No
7085
time.sleep(0.1)
7186

7287
for variable in variables:
73-
assert check_dict[variable]["counter"] >= (2 if overload_buffer else 1), f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}"
88+
assert check_dict[variable]["counter"] >= (2 if overload_buffer else 1), (
89+
f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}"
90+
)
7491
assert time.time() - start_time < time_threshold
7592

7693

@@ -86,33 +103,48 @@ def test_nested_queued_dispatch(multi_queue: bool) -> None:
86103
result_queue = Queue()
87104

88105
event = Event()
106+
89107
def create_callback(index):
90-
def callback(___: list[typing.Any], _: str, check_dict=check_dict[index]) -> None:
108+
def callback(
109+
___: list[typing.Any], _: str, check_dict=check_dict[index]
110+
) -> None:
91111
check_dict["counter"] += 1
112+
92113
return callback
93-
def _main(res_queue, index, dispatch_callback=create_callback, term_event=event, variable=variable) -> bool:
94114

115+
def _main(
116+
res_queue,
117+
index,
118+
dispatch_callback=create_callback,
119+
term_event=event,
120+
variable=variable,
121+
) -> bool:
95122
term_event = Event()
96123
dispatcher = QueuedDispatcher(
97124
dispatch_callback(index),
98125
[variable] if isinstance(variable, str) else variable,
99126
term_event,
100127
max_buffer_size=buffer_size,
101-
max_read_rate=max_read_rate
128+
max_read_rate=max_read_rate,
129+
name=f"test_nested_queued_dispatch"
102130
)
103131

104132
dispatcher.start()
105133

106134
try:
107135
for i in range(n_elements):
108136
if isinstance(variable, str):
109-
dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False)
137+
dispatcher.add_item(
138+
{string.ascii_uppercase[i % 26]: i}, variable, False
139+
)
110140
else:
111141
for var in variable:
112-
dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, var, False)
113-
except(RuntimeError):
142+
dispatcher.add_item(
143+
{string.ascii_uppercase[i % 26]: i}, var, False
144+
)
145+
except RuntimeError:
114146
res_queue.put("AARGHGHGHGHAHSHGHSDHFSEDHSE")
115-
147+
116148
time.sleep(0.1)
117149

118150
while not dispatcher.empty:
@@ -127,18 +159,29 @@ def _main(res_queue, index, dispatch_callback=create_callback, term_event=event,
127159
threads = []
128160

129161
for i in range(3):
130-
_thread = Thread(target=_main, args=(result_queue, i,), daemon=True)
162+
_thread = Thread(
163+
target=_main,
164+
args=(
165+
result_queue,
166+
i,
167+
),
168+
daemon=True,
169+
name=f"nested_queue_dispatch_{i}_Thread",
170+
)
131171
_thread.start()
132172
threads.append(_thread)
133-
173+
134174
for i in range(3):
135175
threads[i].join()
136176

137177
if not result_queue.empty():
138178
assert False
139179

140180
for i in range(3):
141-
assert check_dict[i]["counter"] >= 2, f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[i]['counter']}"
181+
assert check_dict[i]["counter"] >= 2, (
182+
f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[i]['counter']}"
183+
)
184+
142185

143186
def test_queued_dispatch_error_adding_item_after_termination() -> None:
144187
trigger = Event()
@@ -148,7 +191,8 @@ def test_queued_dispatch_error_adding_item_after_termination() -> None:
148191
object_types=["q"],
149192
termination_trigger=trigger,
150193
max_buffer_size=5,
151-
max_read_rate=2
194+
max_read_rate=2,
195+
name="test_queued_dispatch_error_adding_item_after_termination"
152196
)
153197
dispatcher.start()
154198

@@ -157,14 +201,16 @@ def test_queued_dispatch_error_adding_item_after_termination() -> None:
157201
with pytest.raises(RuntimeError):
158202
dispatcher.add_item("blah", "q", False)
159203

204+
160205
def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None:
161206
trigger = Event()
162207
dispatcher = QueuedDispatcher(
163208
callback=lambda *_: None,
164209
object_types=["q"],
165210
termination_trigger=trigger,
166211
max_buffer_size=5,
167-
max_read_rate=2
212+
max_read_rate=2,
213+
name="test_queued_dispatch_error_attempting_to_use_non_existent_queue"
168214
)
169215
dispatcher.start()
170216

@@ -194,18 +240,22 @@ def test_direct_dispatcher(multiple: bool) -> None:
194240

195241
for variable in variables:
196242
check_dict[variable] = {"counter": 0}
197-
def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> None:
243+
244+
def callback(
245+
___: list[typing.Any], _: str, args=check_dict, var=variable
246+
) -> None:
198247
args[var]["counter"] += 1
199-
dispatchers.append(
200-
DirectDispatcher(callback, [variable], event)
201-
)
248+
249+
dispatchers.append(DirectDispatcher(callback, [variable], event))
202250

203251
for i in range(n_elements):
204-
for variable, dispatcher in zip(variables, dispatchers):
252+
for variable, dispatcher in zip(variables, dispatchers):
205253
dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable)
206254

207255
event.set()
208256

209257
for variable in variables:
210-
assert check_dict[variable]["counter"] >= 1, f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}"
258+
assert check_dict[variable]["counter"] >= 1, (
259+
f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}"
260+
)
211261
assert time.time() - start_time < time_threshold

tests/functional/test_run_class.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ def test_update_metadata_offline(
464464

465465

466466
@pytest.mark.run
467+
@pytest.mark.scenario
467468
@pytest.mark.parametrize("multi_threaded", (True, False), ids=("multi", "single"))
468469
def test_runs_multiple_parallel(
469470
multi_threaded: bool, request: pytest.FixtureRequest
@@ -491,7 +492,7 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]:
491492
run.log_metrics(metric)
492493
return index, metrics, run.id
493494

494-
with concurrent.futures.ThreadPoolExecutor(max_workers=N_RUNS) as executor:
495+
with concurrent.futures.ThreadPoolExecutor(max_workers=N_RUNS, thread_name_prefix="test_runs_multiple_parallel") as executor:
495496
futures = [executor.submit(thread_func, i) for i in range(N_RUNS)]
496497

497498
time.sleep(1)
@@ -561,6 +562,7 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]:
561562

562563

563564
@pytest.mark.run
565+
@pytest.mark.scenario
564566
def test_runs_multiple_series(request: pytest.FixtureRequest) -> None:
565567
N_RUNS: int = 2
566568

0 commit comments

Comments
 (0)