Skip to content

Commit 98ed689

Browse files
jlnavshuds13jmlarson1
authored
Refactor/user function handling modules + Manager can run additional worker on thread (#1216)
** Commit Highlights: ** * first round of refactoring runners.py, Runner base class for normal in-place launches, but based on the contents of passed-in specs, instantiates the relevant subclass * ThreadRunner uses comms.QCommThread, slightly modified, to launch its user function. corresponding unit test * removing now-redundant content from manager, trying to see if we can start a temporary, local Worker for handling work * use _Worker class to correctly index into W and wcomms. add initial option to libE_specs * add "threaded" tentative option to sim/gen_specs * fix ThreadRunner shutdown when that worker didn't launch a thread * adjust alloc_support to not use w - 1 indexing * add tentative gen_on_manager option, separate additional_worker_launch into function * move _WorkerIndexer into libensemble.utils, also use within PersistentSupport * manager also needs to send workflow_dir location to worker 0 * simply gen_workers parameter description for avail_worker_ids * filter for gen_workers within avail_worker_ids, if set and there are gen_workers. solution resembles zrw, like shuds predicted all along! * refactor give_sim_work_first for running on gen_workers if no points_to_evaluate. add test for mixed existing sample plus calling a gen * it turns out that values set by validators are still considered "unset". So for updating purposes for libE_specs, we want to exclude fields that are still set to their defaults * platform_specs sometimes seems to be at risk of disappearing when we convert LibeSpecs to dict, so lets save it and reinsert * add libE_specs["gen_workers"] option, adjust ensure_one_active_gen so multiple gen work orders aren't given out at once ** Major features: ** * runners.py refactored to more easily develop additional methods to submit/run user functions * User functions can be launched by workers on separate threads alternatively to calling in-place. * Generator can be launched on the Manager - various refactors to make this possible * gen_workers option to specify workers that should run generators only * Various bugfixes and refactors of allocation functions --------- Co-authored-by: shudson <[email protected]> Co-authored-by: Jeffrey Larson <[email protected]>
1 parent cd27017 commit 98ed689

27 files changed

+478
-311
lines changed

docs/data_structures/libE_specs.rst

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl
2828
Manager/Worker communications mode: ``'mpi'``, ``'local'``, or ``'tcp'``.
2929

3030
**nworkers** [int]:
31-
Number of worker processes in ``"local"`` or ``"tcp"``.
31+
Number of worker processes in ``"local"``, ``"threads"``, or ``"tcp"``.
32+
33+
**gen_on_manager** Optional[bool] = False
34+
Instructs Manager process to run generator functions.
35+
This generator function can access/modify user objects by reference.
3236

3337
**mpi_comm** [MPI communicator] = ``MPI.COMM_WORLD``:
3438
libEnsemble MPI communicator.
@@ -51,6 +55,10 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl
5155
**disable_log_files** [bool] = ``False``:
5256
Disable ``ensemble.log`` and ``libE_stats.txt`` log files.
5357

58+
**gen_workers** [list of ints]:
59+
List of workers that should only run generators. All other workers will only
60+
run simulator functions.
61+
5462
.. tab-item:: Directories
5563

5664
.. tab-set::

libensemble/alloc_funcs/fast_alloc.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,26 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li
3232
Work = {}
3333
gen_in = gen_specs.get("in", [])
3434

35-
for wid in support.avail_worker_ids():
35+
# Give sim work if possible
36+
for wid in support.avail_worker_ids(gen_workers=False):
3637
persis_info = support.skip_canceled_points(H, persis_info)
37-
38-
# Give sim work if possible
3938
if persis_info["next_to_give"] < len(H):
4039
try:
4140
Work[wid] = support.sim_work(wid, H, sim_specs["in"], [persis_info["next_to_give"]], [])
4241
except InsufficientFreeResources:
4342
break
4443
persis_info["next_to_give"] += 1
4544

46-
elif gen_count < user.get("num_active_gens", gen_count + 1):
47-
# Give gen work
48-
return_rows = range(len(H)) if gen_in else []
49-
try:
50-
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
51-
except InsufficientFreeResources:
52-
break
53-
gen_count += 1
54-
persis_info["total_gen_calls"] += 1
45+
# Give gen work if possible
46+
if persis_info["next_to_give"] >= len(H):
47+
for wid in support.avail_worker_ids(gen_workers=True):
48+
if wid not in Work and gen_count < user.get("num_active_gens", gen_count + 1):
49+
return_rows = range(len(H)) if gen_in else []
50+
try:
51+
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
52+
except InsufficientFreeResources:
53+
break
54+
gen_count += 1
55+
persis_info["total_gen_calls"] += 1
5556

5657
return Work, persis_info

libensemble/alloc_funcs/fast_alloc_and_pausing.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li
4343
for pt_id in persis_info["pt_ids"]:
4444
persis_info["inds_of_pt_ids"][pt_id] = H["pt_id"] == pt_id
4545

46-
idle_workers = support.avail_worker_ids()
46+
idle_sim_workers = support.avail_worker_ids(gen_workers=False)
47+
idle_gen_workers = support.avail_worker_ids(gen_workers=True)
4748

48-
while len(idle_workers):
49+
while len(idle_sim_workers):
4950
pt_ids_to_pause = set()
5051

5152
# Find indices of H that are not yet given out to be evaluated
@@ -106,15 +107,19 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li
106107

107108
if len(persis_info["need_to_give"]) != 0:
108109
next_row = persis_info["need_to_give"].pop()
109-
i = idle_workers[0]
110+
i = idle_sim_workers[0]
110111
try:
111112
Work[i] = support.sim_work(i, H, sim_specs["in"], [next_row], [])
112113
except InsufficientFreeResources:
113114
persis_info["need_to_give"].add(next_row)
114115
break
115-
idle_workers = idle_workers[1:]
116+
idle_sim_workers = idle_sim_workers[1:]
116117

117-
elif gen_count < alloc_specs["user"].get("num_active_gens", gen_count + 1):
118+
else:
119+
break
120+
121+
while len(idle_gen_workers):
122+
if gen_count < alloc_specs["user"].get("num_active_gens", gen_count + 1):
118123
lw = persis_info["last_worker"]
119124

120125
last_size = persis_info.get("last_size")
@@ -126,18 +131,18 @@ def give_sim_work_first(W, H, sim_specs, gen_specs, alloc_specs, persis_info, li
126131
break
127132

128133
# Give gen work
129-
i = idle_workers[0]
134+
i = idle_gen_workers[0]
130135
try:
131136
Work[i] = support.gen_work(i, gen_specs["in"], range(len(H)), persis_info[lw])
132137
except InsufficientFreeResources:
133138
break
134-
idle_workers = idle_workers[1:]
139+
idle_gen_workers = idle_gen_workers[1:]
135140
gen_count += 1
136141
persis_info["total_gen_calls"] += 1
137142
persis_info["last_worker"] = i
138143
persis_info["last_size"] = len(H)
139144

140145
elif gen_count >= alloc_specs["user"].get("num_active_gens", gen_count + 1):
141-
idle_workers = []
146+
idle_gen_workers = []
142147

143148
return Work, persis_info

libensemble/alloc_funcs/give_pregenerated_work.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def give_pregenerated_sim_work(W, H, sim_specs, gen_specs, alloc_specs, persis_i
2323
if persis_info["next_to_give"] >= len(H):
2424
return Work, persis_info, 1
2525

26-
for i in support.avail_worker_ids():
26+
for i in support.avail_worker_ids(gen_workers=False):
2727
persis_info = support.skip_canceled_points(H, persis_info)
2828

2929
# Give sim work

libensemble/alloc_funcs/give_sim_work_first.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,19 @@ def give_sim_work_first(
6464
Work = {}
6565

6666
points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"]
67-
for wid in support.avail_worker_ids():
68-
if np.any(points_to_evaluate):
67+
68+
if np.any(points_to_evaluate):
69+
for wid in support.avail_worker_ids(gen_workers=False):
6970
sim_ids_to_send = support.points_by_priority(H, points_avail=points_to_evaluate, batch=batch_give)
7071
try:
7172
Work[wid] = support.sim_work(wid, H, sim_specs["in"], sim_ids_to_send, persis_info.get(wid))
7273
except InsufficientFreeResources:
7374
break
7475
points_to_evaluate[sim_ids_to_send] = False
75-
else:
76+
if not np.any(points_to_evaluate):
77+
break
78+
else:
79+
for wid in support.avail_worker_ids(gen_workers=True):
7680
# Allow at most num_active_gens active generator instances
7781
if gen_count >= user.get("num_active_gens", gen_count + 1):
7882
break

libensemble/alloc_funcs/inverse_bayes_allocf.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, alloc_spe
4242
Work[wid] = support.gen_work(wid, ["like"], inds_to_send_back, persis_info.get(wid), persistent=True)
4343

4444
points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"]
45-
for wid in support.avail_worker_ids(persistent=False):
46-
if np.any(points_to_evaluate):
45+
if np.any(points_to_evaluate):
46+
for wid in support.avail_worker_ids(persistent=False, gen_workers=False):
47+
4748
# perform sim evaluations (if any point hasn't been given).
4849
sim_subbatches = H["subbatch"][points_to_evaluate]
4950
sim_inds = sim_subbatches == np.min(sim_subbatches)
@@ -54,13 +55,11 @@ def only_persistent_gens_for_inverse_bayes(W, H, sim_specs, gen_specs, alloc_spe
5455
except InsufficientFreeResources:
5556
break
5657
points_to_evaluate[sim_ids_to_send] = False
57-
58-
elif gen_count == 0:
59-
# Finally, generate points since there is nothing else to do.
60-
try:
61-
Work[wid] = support.gen_work(wid, gen_specs["in"], [], persis_info.get(wid), persistent=True)
62-
except InsufficientFreeResources:
58+
if not np.any(points_to_evaluate):
6359
break
64-
gen_count += 1
60+
61+
elif gen_count == 0:
62+
wid = support.avail_worker_ids(persistent=False, gen_workers=True)[0]
63+
Work[wid] = support.gen_work(wid, gen_specs["in"], [], persis_info.get(wid), persistent=True)
6564

6665
return Work, persis_info

libensemble/alloc_funcs/only_one_gen_alloc.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,30 @@ def ensure_one_active_gen(W, H, sim_specs, gen_specs, alloc_specs, persis_info,
2121
gen_flag = True
2222
gen_in = gen_specs.get("in", [])
2323

24-
for wid in support.avail_worker_ids():
25-
persis_info = support.skip_canceled_points(H, persis_info)
26-
27-
if persis_info["next_to_give"] < len(H):
24+
if persis_info["next_to_give"] < len(H):
25+
for wid in support.avail_worker_ids(gen_workers=False):
26+
persis_info = support.skip_canceled_points(H, persis_info)
2827
try:
2928
Work[wid] = support.sim_work(wid, H, sim_specs["in"], [persis_info["next_to_give"]], [])
3029
except InsufficientFreeResources:
3130
break
3231
persis_info["next_to_give"] += 1
33-
34-
elif not support.test_any_gen() and gen_flag:
35-
if not support.all_sim_ended(H):
32+
if persis_info["next_to_give"] >= len(H):
3633
break
3734

38-
# Give gen work
39-
return_rows = range(len(H)) if gen_in else []
40-
try:
41-
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
42-
except InsufficientFreeResources:
43-
break
44-
gen_flag = False
45-
persis_info["total_gen_calls"] += 1
35+
elif not support.test_any_gen() and gen_flag:
36+
# Give gen work
37+
return_rows = range(len(H)) if gen_in else []
38+
wid = support.avail_worker_ids(gen_workers=True)[0]
39+
40+
if not support.all_sim_ended(H):
41+
return Work, persis_info
42+
43+
try:
44+
Work[wid] = support.gen_work(wid, gen_in, return_rows, persis_info.get(wid))
45+
except InsufficientFreeResources:
46+
return Work, persis_info
47+
gen_flag = False
48+
persis_info["total_gen_calls"] += 1
4649

4750
return Work, persis_info

libensemble/alloc_funcs/persistent_aposmm_alloc.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info
5353
)
5454
returned_but_not_given[point_ids] = False
5555

56-
for wid in support.avail_worker_ids(persistent=False):
56+
for wid in support.avail_worker_ids(persistent=False, gen_workers=False):
5757
persis_info = support.skip_canceled_points(H, persis_info)
5858

5959
if persis_info["next_to_give"] < len(H):
@@ -63,8 +63,11 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info
6363
except InsufficientFreeResources:
6464
break
6565
persis_info["next_to_give"] += 1
66+
if persis_info["next_to_give"] >= len(H):
67+
break
6668

67-
elif persis_info.get("gen_started") is None:
69+
if persis_info.get("gen_started") is None:
70+
for wid in support.avail_worker_ids(persistent=False, gen_workers=True):
6871
# Finally, call a persistent generator as there is nothing else to do.
6972
persis_info.get(wid)["nworkers"] = len(W)
7073
try:
@@ -74,5 +77,6 @@ def persistent_aposmm_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info
7477
except InsufficientFreeResources:
7578
break
7679
persis_info["gen_started"] = True # Must set after - in case break on resources
80+
break
7781

7882
return Work, persis_info

libensemble/alloc_funcs/start_fd_persistent.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,21 @@ def finite_diff_alloc(W, H, sim_specs, gen_specs, alloc_specs, persis_info, libE
4949
)
5050

5151
points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"]
52-
for wid in support.avail_worker_ids(persistent=False):
53-
if np.any(points_to_evaluate):
52+
if np.any(points_to_evaluate):
53+
for wid in support.avail_worker_ids(persistent=False, gen_workers=False):
5454
# perform sim evaluations (if they exist in History).
5555
sim_ids_to_send = np.nonzero(points_to_evaluate)[0][0] # oldest point
5656
try:
5757
Work[wid] = support.sim_work(wid, H, sim_specs["in"], sim_ids_to_send, persis_info.get(wid))
5858
except InsufficientFreeResources:
5959
break
6060
points_to_evaluate[sim_ids_to_send] = False
61-
62-
elif gen_count == 0:
63-
# Finally, call a persistent generator as there is nothing else to do.
64-
try:
65-
Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info.get(wid), persistent=True)
66-
except InsufficientFreeResources:
61+
if not np.any(points_to_evaluate):
6762
break
68-
gen_count += 1
63+
64+
if gen_count == 0:
65+
wid = support.avail_worker_ids(persistent=False, gen_workers=True)[0]
66+
Work[wid] = support.gen_work(wid, gen_specs.get("in", []), [], persis_info.get(wid), persistent=True)
67+
gen_count += 1
6968

7069
return Work, persis_info, 0

libensemble/alloc_funcs/start_only_persistent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info, l
8989

9090
# Now the give_sim_work_first part
9191
points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"]
92-
avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=False)
92+
avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=False, gen_workers=False)
9393
if user.get("alt_type"):
9494
avail_workers = list(
9595
set(support.avail_worker_ids(persistent=False, zero_resource_workers=False))
@@ -115,7 +115,7 @@ def only_persistent_gens(W, H, sim_specs, gen_specs, alloc_specs, persis_info, l
115115

116116
# Start persistent gens if no worker to give out. Uses zero_resource_workers if defined.
117117
if not np.any(points_to_evaluate):
118-
avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=True)
118+
avail_workers = support.avail_worker_ids(persistent=False, zero_resource_workers=True, gen_workers=True)
119119

120120
for wid in avail_workers:
121121
if gen_count < user.get("num_active_gens", 1):

0 commit comments

Comments
 (0)