Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Make thread safe #1045

Open
wants to merge 35 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bee5684
trying to use new tasks
jdolence Dec 11, 2023
e881ad9
Merge branch 'lroberts36/bugfix-sparse-cache' into jdolence/new_tasking
jdolence Dec 14, 2023
90f3e59
remove debugging
jdolence Dec 14, 2023
92564e1
formatting
jdolence Dec 14, 2023
6fde57d
remove raw mpi.hpp include
jdolence Dec 14, 2023
2320c0e
style
jdolence Dec 14, 2023
95818ba
more style
jdolence Dec 14, 2023
d602a35
and more style
jdolence Dec 14, 2023
10a67f1
ok thats enough
jdolence Dec 14, 2023
23803d0
actually remove the old task stuff
jdolence Dec 14, 2023
a4db040
formatting
jdolence Dec 14, 2023
8b7d42a
maybe last style commit...
jdolence Dec 14, 2023
52f0d5a
oops, includes inside parthenon namespace
jdolence Dec 14, 2023
e6eb2e3
update TaskID unit test
jdolence Dec 14, 2023
ce7a6bb
missing header
jdolence Dec 14, 2023
1ddc2e0
port the poisson examples
jdolence Dec 15, 2023
0bd54cf
try to fix serial builds
jdolence Dec 15, 2023
6082812
clean up branching in `|` operator of TaskID
jdolence Dec 15, 2023
07ae71a
rename Queue ThreadQueue
jdolence Dec 15, 2023
c1dbcb3
formatting
jdolence Dec 15, 2023
fbbe02a
try to fix builds with threads
jdolence Dec 15, 2023
d39a31a
update tasking docs
jdolence Dec 18, 2023
b074ee6
formatting and update changelog
jdolence Dec 18, 2023
829e047
address review comments
jdolence Jan 9, 2024
fc16f0f
merge develop
jdolence Jan 9, 2024
b400c11
style
jdolence Jan 9, 2024
9957538
add a comment about the dependent variable in Task
jdolence Jan 9, 2024
4842676
add locks to sparse pack caching
jdolence Jan 12, 2024
7faf25b
merge develop
jdolence Jan 12, 2024
7ce9ed5
move thread pool to utils
jdolence Jan 12, 2024
97874e0
add thread pool to driver/mesh
jdolence Jan 12, 2024
e9630b7
random intermediate commit
jdolence Mar 22, 2024
3e7ea4b
merge develop
jdolence Apr 4, 2024
e51f4db
seems to be thread safe -- advection example works
jdolence Apr 5, 2024
be1f029
crazy state
jdolence Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
crazy state
jdolence committed Apr 23, 2024
commit be1f029ec5b8c3b1ac5b8471207651b32f39c9af
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -204,6 +204,7 @@ set(CMAKE_CXX_STANDARD 17)

#add_compile_options(-fsanitize=thread)
#add_link_options(-fsanitize=thread)
add_link_options(-Wl,-no_pie -L/opt/homebrew/lib -lprofiler -ltcmalloc)

option(PARTHENON_IMPORT_KOKKOS "If ON, attempt to link to an external Kokkos library. If OFF, build Kokkos from source and package with Parthenon" OFF)
if (NOT TARGET Kokkos::kokkos)
1 change: 1 addition & 0 deletions example/advection/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ if( "advection-example" IN_LIST DRIVER_LIST OR NOT PARTHENON_DISABLE_EXAMPLES)
)
#add_compile_options(-fsanitize=thread)
#add_link_options(-fsanitize=thread)
add_link_options(-Wl,-no_pie -L/opt/homebrew/lib -lprofiler -ltcmalloc)
target_link_libraries(advection-example PRIVATE Parthenon::parthenon)
lint_target(advection-example)
endif()
14 changes: 7 additions & 7 deletions example/advection/advection_driver.cpp
Original file line number Diff line number Diff line change
@@ -86,8 +86,8 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in

const auto any = parthenon::BoundaryType::any;

tl.AddTask(none, parthenon::StartReceiveBoundBufs<any>, mc1);
tl.AddTask(none, parthenon::StartReceiveFluxCorrections, mc0);
//tl.AddTask(none, parthenon::StartReceiveBoundBufs<any>, mc1);
//tl.AddTask(none, parthenon::StartReceiveFluxCorrections, mc0);
}

// Number of task lists that can be executed independently and thus *may*
@@ -124,13 +124,13 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in
auto &mc1 = pmesh->mesh_data.GetOrAdd(stage_name[stage], i);
auto &mdudt = pmesh->mesh_data.GetOrAdd("dUdt", i);

auto send_flx = tl.AddTask(none, parthenon::LoadAndSendFluxCorrections, mc0);
auto recv_flx = tl.AddTask(none, parthenon::ReceiveFluxCorrections, mc0);
auto set_flx = tl.AddTask(recv_flx, parthenon::SetFluxCorrections, mc0);
//auto send_flx = tl.AddTask(none, parthenon::LoadAndSendFluxCorrections, mc0);
//auto recv_flx = tl.AddTask(none, parthenon::ReceiveFluxCorrections, mc0);
//auto set_flx = tl.AddTask(recv_flx, parthenon::SetFluxCorrections, mc0);

// compute the divergence of fluxes of conserved variables
auto flux_div =
tl.AddTask(set_flx, FluxDivergence<MeshData<Real>>, mc0.get(), mdudt.get());
tl.AddTask(none, FluxDivergence<MeshData<Real>>, mc0.get(), mdudt.get());

auto avg_data = tl.AddTask(flux_div, AverageIndependentData<MeshData<Real>>,
mc0.get(), mbase.get(), beta);
@@ -139,7 +139,7 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in
mdudt.get(), beta * dt, mc1.get());

// do boundary exchange
parthenon::AddBoundaryExchangeTasks(update, tl, mc1, pmesh->multilevel);
//parthenon::AddBoundaryExchangeTasks(update, tl, mc1, pmesh->multilevel);
}

TaskRegion &async_region2 = tc.AddRegion(num_task_lists_executed_independently);
7 changes: 6 additions & 1 deletion example/advection/advection_package.cpp
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
#include "kokkos_abstraction.hpp"
#include "reconstruct/dc_inline.hpp"
#include "utils/error_checking.hpp"
#include "utils/thread_pool.hpp"

using namespace parthenon::package::prelude;

@@ -75,6 +76,7 @@ std::shared_ptr<StateDescriptor> Initialize(ParameterInput *pin) {

auto fill_derived = pin->GetOrAddBoolean("Advection", "fill_derived", true);
pkg->AddParam<>("fill_derived", fill_derived);
printf("fill_derived = %d\n", fill_derived);

// For wavevector along coordinate axes, set desired values of ang_2/ang_3.
// For example, for 1D problem use ang_2 = ang_3 = 0.0
@@ -314,7 +316,6 @@ void SquareIt(MeshBlockData<Real> *rc) {
auto v = desc.GetPack(rc);
auto imap = desc.GetMap();


const int in = imap["one_minus_advected"];
const int out = imap["one_minus_advected_sq"];
const auto num_vars = rc->Get("advected").data.GetDim(4);
@@ -495,6 +496,8 @@ TaskStatus CalculateFluxes(std::shared_ptr<MeshBlockData<Real>> &rc) {
const int nvar = v.GetMaxNumberOfVars();
size_t scratch_size_in_bytes = parthenon::ScratchPad2D<Real>::shmem_size(nvar, nx1);
// get x-fluxes
//std::cout << "hello from thread " << std::this_thread::get_id() << std::endl;
//for (int cnt=0; cnt<300; cnt++) {
parthenon::par_for_outer(
DEFAULT_OUTER_LOOP_PATTERN, PARTHENON_AUTO_LABEL, DevExecSpace(),
2 * scratch_size_in_bytes, scratch_level, kb.s, kb.e, jb.s,
@@ -615,6 +618,8 @@ TaskStatus CalculateFluxes(std::shared_ptr<MeshBlockData<Real>> &rc) {
}
});
}
//}
//std::cout << "done on thread " << std::this_thread::get_id() << std::endl;

return TaskStatus::complete;
}
2 changes: 1 addition & 1 deletion external/Kokkos
Submodule Kokkos updated 966 files
Original file line number Diff line number Diff line change
@@ -247,7 +247,8 @@ def plot_dump(
for i in range(n_blocks):
# Plot the actual data, should work if parthenon/output*/ghost_zones = true/false
# but obviously no ghost data will be shown if ghost_zones = false
p.pcolormesh(xf[i, :], yf[i, :], q[i, :, :], vmin=qmin, vmax=qmax)
# print(xf[i,:].shape, yf[i,:].shape, q[i,:,:].shape)
p.pcolormesh(xf[i, :], yf[i, :], np.squeeze(q[i, :, :]), vmin=qmin, vmax=qmax)

# Print the block gid in the center of the block
if len(block_ids) > 0:
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -323,6 +323,7 @@ endif()

#add_compile_options(-fsanitize=thread)
#add_link_options(-fsanitize=thread)
add_link_options(-Wl,-no_pie -lprofiler -ltcmalloc)

target_link_libraries(parthenon PUBLIC Kokkos::kokkos Threads::Threads)

2 changes: 2 additions & 0 deletions src/basic_types.hpp
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@

namespace parthenon {

inline thread_local Kokkos::Serial t_exec_space;

// primitive type alias that allows code to run with either floats or doubles
#if SINGLE_PRECISION_ENABLED
using Real = float;
17 changes: 13 additions & 4 deletions src/bvals/comms/bnd_info.cpp
Original file line number Diff line number Diff line change
@@ -220,6 +220,11 @@ int GetBufferSize(MeshBlock *pmb, const NeighborBlock &nb,
v->GetDim(4) * topo_comp;
}

void BndInfo::LockedBufferCopy(CommBuffer<buf_pool_t<Real>::owner_t> *input_buf) {
//std::lock_guard<std::mutex> lock(mutex);
buf = input_buf->buffer();
}

BndInfo BndInfo::GetSendBndInfo(MeshBlock *pmb, const NeighborBlock &nb,
std::shared_ptr<Variable<Real>> v,
CommBuffer<buf_pool_t<Real>::owner_t> *buf) {
@@ -229,7 +234,8 @@ BndInfo BndInfo::GetSendBndInfo(MeshBlock *pmb, const NeighborBlock &nb,
out.alloc_status = v->GetAllocationStatus();
if (!out.allocated) return out;

out.buf = buf->buffer();
out.LockedBufferCopy(buf);
//out.buf = buf->buffer();

int Nv = v->GetDim(4);
int Nu = v->GetDim(5);
@@ -257,7 +263,8 @@ BndInfo BndInfo::GetSetBndInfo(MeshBlock *pmb, const NeighborBlock &nb,
std::shared_ptr<Variable<Real>> v,
CommBuffer<buf_pool_t<Real>::owner_t> *buf) {
BndInfo out;
out.buf = buf->buffer();
//out.buf = buf->buffer();
out.LockedBufferCopy(buf);
auto buf_state = buf->GetState();
if (buf_state == BufferState::received) {
out.buf_allocated = true;
@@ -454,7 +461,8 @@ BndInfo BndInfo::GetSendCCFluxCor(MeshBlock *pmb, const NeighborBlock &nb,
// Not going to actually do anything with this buffer
return out;
}
out.buf = buf->buffer();
//out.buf = buf->buffer();
out.LockedBufferCopy(buf);

IndexRange ib = pmb->cellbounds.GetBoundsI(IndexDomain::interior);
IndexRange jb = pmb->cellbounds.GetBoundsJ(IndexDomain::interior);
@@ -514,7 +522,8 @@ BndInfo BndInfo::GetSetCCFluxCor(MeshBlock *pmb, const NeighborBlock &nb,
}
out.allocated = true;
out.alloc_status = v->GetAllocationStatus();
out.buf = buf->buffer();
//out.buf = buf->buffer();
out.LockedBufferCopy(buf);

IndexRange ib = pmb->cellbounds.GetBoundsI(IndexDomain::interior);
IndexRange jb = pmb->cellbounds.GetBoundsJ(IndexDomain::interior);
3 changes: 3 additions & 0 deletions src/bvals/comms/bnd_info.hpp
Original file line number Diff line number Diff line change
@@ -56,6 +56,9 @@ struct BndInfo {
BndInfo() = default;
BndInfo(const BndInfo &) = default;

inline static std::mutex mutex;
void LockedBufferCopy(CommBuffer<buf_pool_t<Real>::owner_t> *buf);

// These are are used to generate the BndInfo struct for various
// kinds of boundary types and operations.
static BndInfo GetSendBndInfo(MeshBlock *pmb, const NeighborBlock &nb,
38 changes: 26 additions & 12 deletions src/bvals/comms/boundary_communication.cpp
Original file line number Diff line number Diff line change
@@ -45,13 +45,15 @@ using namespace loops;
using namespace loops::shorthands;

static std::array<std::mutex, 2 * NUM_BNDRY_TYPES> mutex;
//static std::mutex mutex;

template <BoundaryType bound_type>
TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(bound_type) + 1;
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, true);
@@ -64,11 +66,13 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
CheckSendBufferCacheForRebuild<bound_type, true>(md);

if (nbound == 0) {
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();
return TaskStatus::complete;
}
if (other_communication_unfinished) {
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();
return TaskStatus::incomplete;
}

@@ -85,7 +89,8 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
}
}

mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

// Restrict
auto pmb = md->GetBlockData(0)->GetBlockPointer();
@@ -159,6 +164,7 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
else
buf.SendNull();
}
//mutex.unlock();

return TaskStatus::complete;
}
@@ -177,14 +183,16 @@ TaskStatus StartReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(bound_type);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey,
false);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[](auto pbuf) { pbuf->TryStartReceive(); });
@@ -208,14 +216,16 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(bound_type);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey,
false);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

bool all_received = true;
std::for_each(
@@ -260,7 +270,8 @@ TaskStatus SetBounds(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(bound_type);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
@@ -280,7 +291,8 @@ TaskStatus SetBounds(std::shared_ptr<MeshData<Real>> &md) {
}
}

mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

// const Real threshold = Globals::sparse_config.allocation_threshold;
auto &bnd_info = cache.bnd_info;
@@ -359,7 +371,8 @@ TaskStatus ProlongateBounds(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(bound_type);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
@@ -378,7 +391,8 @@ TaskStatus ProlongateBounds(std::shared_ptr<MeshData<Real>> &md) {
ProResInfo::GetSet);
}
}
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

if (nbound > 0 && pmesh->multilevel) {
auto pmb = md->GetBlockData(0)->GetBlockPointer();
6 changes: 3 additions & 3 deletions src/bvals/comms/bvals_utils.hpp
Original file line number Diff line number Diff line change
@@ -95,9 +95,9 @@ void InitializeBufferCache(std::shared_ptr<MeshData<Real>> &md, COMM_MAP *comm_m

// Or, what the hell, you could put them in random order if you want, which
// frighteningly seems to run faster in some cases
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(key_order.begin(), key_order.end(), g);
//std::random_device rd;
//std::mt19937 g(rd());
//std::shuffle(key_order.begin(), key_order.end(), g);

int buff_idx = 0;
pcache->buf_vec.clear();
33 changes: 23 additions & 10 deletions src/bvals/comms/flux_correction.cpp
Original file line number Diff line number Diff line change
@@ -39,12 +39,14 @@ namespace parthenon {
using namespace impl;

static std::array<std::mutex, 2 * NUM_BNDRY_TYPES> mutex;
//static std::mutex mutex;

TaskStatus LoadAndSendFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(BoundaryType::flxcor_send) + 1;
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(BoundaryType::flxcor_send, true);
@@ -58,19 +60,22 @@ TaskStatus LoadAndSendFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
CheckSendBufferCacheForRebuild<BoundaryType::flxcor_send, true>(md);

if (nbound == 0) {
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();
return TaskStatus::complete;
}

if (other_communication_unfinished) {
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();
return TaskStatus::incomplete;
}

if (rebuild)
RebuildBufferCache<BoundaryType::flxcor_send, true>(
md, nbound, BndInfo::GetSendCCFluxCor, ProResInfo::GetSend);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

auto &bnd_info = cache.bnd_info;
PARTHENON_REQUIRE(bnd_info.size() == nbound, "Need same size for boundary info");
@@ -122,21 +127,24 @@ TaskStatus LoadAndSendFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
// Calling Send will send null if the underlying buffer is unallocated
for (auto &buf : cache.buf_vec)
buf->Send();
//mutex.unlock();
return TaskStatus::complete;
}

TaskStatus StartReceiveFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(BoundaryType::flxcor_recv);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(BoundaryType::flxcor_recv, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<BoundaryType::flxcor_recv>(
md, &(pmesh->boundary_comm_flxcor_map), &cache, ReceiveKey, false);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[](auto pbuf) { pbuf->TryStartReceive(); });
@@ -148,14 +156,16 @@ TaskStatus ReceiveFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(BoundaryType::flxcor_recv);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(BoundaryType::flxcor_recv, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<BoundaryType::flxcor_recv>(
md, &(pmesh->boundary_comm_flxcor_map), &cache, ReceiveKey, false);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

bool all_received = true;
std::for_each(
@@ -170,7 +180,8 @@ TaskStatus SetFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
PARTHENON_INSTRUMENT

int mutex_id = 2 * static_cast<int>(BoundaryType::flxcor_recv);
mutex[mutex_id].lock();
//mutex[mutex_id].lock();
//mutex.lock();

Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(BoundaryType::flxcor_recv, false);
@@ -180,7 +191,8 @@ TaskStatus SetFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
if (rebuild)
RebuildBufferCache<BoundaryType::flxcor_recv, false>(
md, nbound, BndInfo::GetSetCCFluxCor, ProResInfo::GetSend);
mutex[mutex_id].unlock();
//mutex[mutex_id].unlock();
//mutex.unlock();

auto &bnd_info = cache.bnd_info;
Kokkos::parallel_for(
@@ -203,6 +215,7 @@ TaskStatus SetFluxCorrections(std::shared_ptr<MeshData<Real>> &md) {
std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[](auto pbuf) { pbuf->Stale(); });

//mutex.unlock();
return TaskStatus::complete;
}

5 changes: 1 addition & 4 deletions src/driver/driver.hpp
Original file line number Diff line number Diff line change
@@ -39,10 +39,7 @@ class Driver {
public:
Driver(ParameterInput *pin, ApplicationInput *app_in, Mesh *pm)
: pinput(pin), app_input(app_in), pmesh(pm), mbcnt_prev(), time_LBandAMR() {
const int nthreads = pin->GetOrAddInteger("parthenon/driver", "nthreads", 1);
printf("Initializing with %d threads...\n", nthreads);
pool = std::make_shared<ThreadPool>(nthreads);
pm->SetThreadPool(pool);
pool = pm->pool;
}
virtual DriverStatus Execute() = 0;
void InitializeOutputs() { pouts = std::make_unique<Outputs>(pmesh, pinput); }
82 changes: 55 additions & 27 deletions src/kokkos_abstraction.hpp

Large diffs are not rendered by default.

80 changes: 51 additions & 29 deletions src/mesh/amr_loadbalance.cpp
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@
#include "parthenon_arrays.hpp"
#include "utils/buffer_utils.hpp"
#include "utils/error_checking.hpp"
#include "utils/thread_pool.hpp"

namespace parthenon {

@@ -650,6 +651,12 @@ bool Mesh::GatherCostListAndCheckBalance() {
return true;
}

void thread_safe_insert(std::unordered_set<LogicalLocation> &myset, LogicalLocation &myval) {
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
myset.insert(myval);
}

//----------------------------------------------------------------------------------------
// \!fn void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, int ntot)
// \brief redistribute MeshBlocks according to the new load balance
@@ -682,8 +689,10 @@ void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, ApplicationInput
PARTHENON_INSTRUMENT
newloc = forest.GetMeshBlockListAndResolveGids();
nbtotal = newloc.size();
for (int ib = 0; ib < nbtotal; ++ib)
newtoold[ib] = forest.GetOldGid(newloc[ib]);

ThreadPoolLoopBlocking(*pool, 0, nbtotal-1, [&](const int i) {
newtoold[i] = forest.GetOldGid(newloc[i]);
});

// create a list mapping the previous gid to the current one
oldtonew[0] = 0;
@@ -701,25 +710,28 @@ void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, ApplicationInput
for (; mb_idx < nbtold; mb_idx++)
oldtonew[mb_idx] = ntot - 1;

current_level = 0;
for (int n = 0; n < ntot; n++) {
//current_level = 0;
//for (int n = 0; n < ntot; n++) {
current_level = ThreadPoolReduce(*pool, 0, ntot-1, [&](const int n) {
// "on" = "old n" = "old gid" = "old global MeshBlock ID"
int on = newtoold[n];
if (newloc[n].level() > current_level) // set the current max level
current_level = newloc[n].level();
//if (newloc[n].level() > current_level) // set the current max level
// current_level = newloc[n].level();
if (newloc[n].level() >= loclist[on].level()) { // same or refined
newcost[n] = costlist[on];
// Keep a list of all blocks refined for below
if (newloc[n].level() > loclist[on].level()) {
newly_refined.insert(newloc[n]);
thread_safe_insert(newly_refined, newloc[n]);
//newly_refined.insert(newloc[n]);
}
} else {
double acost = 0.0;
for (int l = 0; l < nleaf; l++)
acost += costlist[on + l];
newcost[n] = acost / nleaf;
}
}
return newloc[n].level();
}, [](int a, int b) { return std::max(a,b); }, int(0));
} // Construct new list region

// Calculate new load balance
@@ -729,30 +741,39 @@ void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, ApplicationInput
int nbe = nbs + nblist[Globals::my_rank] - 1;

// Restrict fine to coarse buffers
ProResCache_t restriction_cache;
int nrestrict = 0;
for (int on = onbs; on <= onbe; on++) {
int nn = oldtonew[on];
auto pmb = FindMeshBlock(on);
if (newloc[nn].level() < loclist[on].level()) nrestrict += pmb->vars_cc_.size();
}
restriction_cache.Initialize(nrestrict, resolved_packages.get());
int irestrict = 0;
for (int on = onbs; on <= onbe; on++) {
int nn = oldtonew[on];
if (newloc[nn].level() < loclist[on].level()) {
auto restrict_fine_to_coarse_buffers = [&](const int t_onbs, const int t_onbe) {
ProResCache_t restriction_cache;
int nrestrict = 0;
for (int on = t_onbs; on <= t_onbe; on++) {
int nn = oldtonew[on];
auto pmb = FindMeshBlock(on);
for (auto &var : pmb->vars_cc_) {
restriction_cache.RegisterRegionHost(
irestrict++, ProResInfo::GetInteriorRestrict(pmb.get(), NeighborBlock(), var),
var.get(), resolved_packages.get());
if (newloc[nn].level() < loclist[on].level()) nrestrict += pmb->vars_cc_.size();
}

restriction_cache.Initialize(nrestrict, resolved_packages.get());
int irestrict = 0;
for (int on = t_onbs; on <= t_onbe; on++) {
int nn = oldtonew[on];
if (newloc[nn].level() < loclist[on].level()) {
auto pmb = FindMeshBlock(on);
for (auto &var : pmb->vars_cc_) {
restriction_cache.RegisterRegionHost(
irestrict++, ProResInfo::GetInteriorRestrict(pmb.get(), NeighborBlock(), var),
var.get(), resolved_packages.get());
}
}
}
restriction_cache.CopyToDevice();
refinement::Restrict(resolved_packages.get(), restriction_cache,
block_list[0]->cellbounds, block_list[0]->c_cellbounds);
};
std::vector<int> onstart, onstop;
ThreadLoopBounds(*pool, onbs, onbe, onstart, onstop);
for (int it = 0; it < pool->size(); it++) {
pool->enqueue(restrict_fine_to_coarse_buffers, onstart[it], onstop[it]);
}
restriction_cache.CopyToDevice();
refinement::Restrict(resolved_packages.get(), restriction_cache,
block_list[0]->cellbounds, block_list[0]->c_cellbounds);

//pool->run();
pool->wait();
Kokkos::fence();

#ifdef MPI_PARALLEL
@@ -795,6 +816,7 @@ void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, ApplicationInput
RegionSize block_size = GetBlockSize();

for (int n = nbs; n <= nbe; n++) {
//ThreadPoolLoopBlocking(*pool, nbs, nbe, [&, this](const int n) {
int on = newtoold[n];
if ((ranklist[on] == Globals::my_rank) &&
(loclist[on].level() == newloc[n].level())) {
@@ -816,7 +838,7 @@ void Mesh::RedistributeAndRefineMeshBlocks(ParameterInput *pin, ApplicationInput
MeshBlock::Make(n, n - nbs, newloc[n], block_size, block_bcs, this, pin,
app_in, packages, resolved_packages, gflag);
}
}
}//);
} // AMR Construct new MeshBlockList region

// Replace the MeshBlock list
7 changes: 5 additions & 2 deletions src/mesh/mesh.cpp
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@
#include "globals.hpp"
#include "interface/state_descriptor.hpp"
#include "interface/update.hpp"
#include "kokkos_abstraction.hpp"
#include "mesh/mesh.hpp"
#include "mesh/mesh_refinement.hpp"
#include "mesh/meshblock.hpp"
@@ -103,7 +104,8 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages,
// private members:
num_mesh_threads_(pin->GetOrAddInteger("parthenon/mesh", "num_threads", 1)),
use_uniform_meshgen_fn_{true, true, true, true}, lb_flag_(true), lb_automatic_(),
lb_manual_(), MeshBndryFnctn{nullptr, nullptr, nullptr, nullptr, nullptr, nullptr} {
lb_manual_(), MeshBndryFnctn{nullptr, nullptr, nullptr, nullptr, nullptr, nullptr},
pool(std::make_shared<ThreadPool>(pin->GetOrAddInteger("parthenon/execution", "nthreads", 1))) {
std::stringstream msg;
RegionSize block_size;
BoundaryFlag block_bcs[6];
@@ -479,7 +481,8 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, RestartReader &rr,
// private members:
num_mesh_threads_(pin->GetOrAddInteger("parthenon/mesh", "num_threads", 1)),
use_uniform_meshgen_fn_{true, true, true, true}, lb_flag_(true), lb_automatic_(),
lb_manual_(), MeshBndryFnctn{nullptr, nullptr, nullptr, nullptr, nullptr, nullptr} {
lb_manual_(), MeshBndryFnctn{nullptr, nullptr, nullptr, nullptr, nullptr, nullptr},
pool(std::make_shared<ThreadPool>(pin->GetOrAddInteger("parthenon/execution", "nthreads", 1))) {
std::stringstream msg;
RegionSize block_size;
BoundaryFlag block_bcs[6];
3 changes: 1 addition & 2 deletions src/mesh/mesh.hpp
Original file line number Diff line number Diff line change
@@ -260,6 +260,7 @@ class Mesh {
return resolved_packages->GetVariableNames(std::forward<Args>(args)...);
}

std::shared_ptr<ThreadPool> pool;
void SetThreadPool(std::shared_ptr<ThreadPool> p) { pool = p; }

private:
@@ -303,8 +304,6 @@ class Mesh {

int gmg_min_logical_level_ = 0;

std::shared_ptr<ThreadPool> pool;

#ifdef MPI_PARALLEL
// Global map of MPI comms for separate variables
std::unordered_map<std::string, MPI_Comm> mpi_comm_map_;
2 changes: 1 addition & 1 deletion src/mesh/meshblock.hpp
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ class MeshBlock : public std::enable_shared_from_this<MeshBlock> {
int igflag, double icost = 1.0);

// Kokkos execution space for this MeshBlock
DevExecSpace exec_space;
DevExecSpace_t exec_space;

// data
Mesh *pmy_mesh = nullptr; // ptr to Mesh containing this MeshBlock
3 changes: 2 additions & 1 deletion src/parthenon_manager.hpp
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
#include "driver/driver.hpp"
#include "interface/state_descriptor.hpp"
#include "interface/swarm.hpp"
#include "kokkos_abstraction.hpp"
#include "mesh/domain.hpp"
#include "mesh/mesh.hpp"
#include "outputs/restart.hpp"
@@ -36,7 +37,7 @@ enum class ParthenonStatus { ok, complete, error };

class ParthenonManager {
public:
ParthenonManager() { app_input.reset(new ApplicationInput()); }
ParthenonManager() { t_exec_space = DevExecSpace_t(); app_input.reset(new ApplicationInput()); }
ParthenonStatus ParthenonInitEnv(int argc, char *argv[]);
void ParthenonInitPackagesAndMesh();
ParthenonStatus ParthenonFinalize();
3 changes: 3 additions & 0 deletions src/tasks/tasks.hpp
Original file line number Diff line number Diff line change
@@ -420,6 +420,9 @@ class TaskRegion {
pool.enqueue([t, &ProcessTask]() { return ProcessTask(t); });
}

// and run it
//pool.run();

// then wait until everything is done
pool.wait();

39 changes: 36 additions & 3 deletions src/utils/communication_buffer.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include <iostream>
#include <limits>
#include <memory>
#include <shared_mutex>
#include <type_traits>
#include <unordered_map>
#include <utility>
@@ -37,6 +38,32 @@ namespace parthenon {
// received: X
enum class BufferState { stale, sending, sending_null, received, received_null };

class BufferStateThreadSafe {
public:
BufferStateThreadSafe() = default;
BufferStateThreadSafe(BufferState bs) : state_(bs) {}
BufferStateThreadSafe(const BufferStateThreadSafe &bs) = delete;
BufferStateThreadSafe &operator =(BufferState bs) {
mutex.lock();
state_ = bs;
mutex.unlock();
return *this;
}
bool operator==(BufferState bs) {
return (Get() == bs);
}
BufferState Get() {
mutex.lock_shared();
auto val = state_;
mutex.unlock_shared();
return val;
}

private:
BufferState state_;
std::shared_mutex mutex;
};

enum class BuffCommType { sender, receiver, both, sparse_receiver };

template <class T>
@@ -46,7 +73,7 @@ class CommBuffer {
template <typename U>
friend class CommBuffer;

std::shared_ptr<BufferState> state_;
std::shared_ptr<BufferStateThreadSafe> state_;
std::shared_ptr<BuffCommType> comm_type_;
std::shared_ptr<bool> started_irecv_;
std::shared_ptr<int> nrecv_tries_;
@@ -65,6 +92,7 @@ class CommBuffer {
std::function<T()> get_resource_;

T buf_;
inline static std::mutex mutex;

public:
CommBuffer()
@@ -81,6 +109,9 @@ class CommBuffer {

~CommBuffer();

//CommBuffer(const CommBuffer<T> &in) = delete;
//CommBuffer &operator=(const CommBuffer<T> &in) = delete;

template <class U>
CommBuffer(const CommBuffer<U> &in);

@@ -95,19 +126,21 @@ class CommBuffer {

void Allocate() {
if (!active_) {
std::lock_guard<std::mutex> lock(mutex);
buf_ = get_resource_();
active_ = true;
}
}

void Free() {
std::lock_guard<std::mutex> lock(mutex);
buf_ = T();
active_ = false;
}

bool IsActive() const { return active_; }

BufferState GetState() { return *state_; }
BufferState GetState() { return state_->Get(); }

void Send() noexcept;
void SendNull() noexcept;
@@ -132,7 +165,7 @@ class CommBuffer {
template <class T>
CommBuffer<T>::CommBuffer(int tag, int send_rank, int recv_rank, mpi_comm_t comm,
std::function<T()> get_resource, bool do_sparse_allocation)
: state_(std::make_shared<BufferState>(BufferState::stale)),
: state_(std::make_shared<BufferStateThreadSafe>(BufferState::stale)),
comm_type_(std::make_shared<BuffCommType>(BuffCommType::both)),
started_irecv_(std::make_shared<bool>(false)),
nrecv_tries_(std::make_shared<int>(0)),
4 changes: 2 additions & 2 deletions src/utils/sort.hpp
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ void sort(ParArray1D<Key> data, KeyComparator comparator, size_t min_idx,
thrust::sort(first_d, last_d, comparator);
#endif
#else
if (std::is_same<DevExecSpace, HostExecSpace>::value) {
if (std::is_same<DevExecSpace_t, HostExecSpace>::value) {
std::sort(data.data() + min_idx, data.data() + max_idx + 1, comparator);
} else {
PARTHENON_FAIL("sort is not supported outside of CPU or NVIDIA GPU. If you need sort "
@@ -103,7 +103,7 @@ void sort(ParArray1D<Key> data, size_t min_idx, size_t max_idx) {
thrust::sort(first_d, last_d);
#endif
#else
if (std::is_same<DevExecSpace, HostExecSpace>::value) {
if (std::is_same<DevExecSpace_t, HostExecSpace>::value) {
std::sort(data.data() + min_idx, data.data() + max_idx + 1);
} else {
PARTHENON_FAIL("sort is not supported outside of CPU or NVIDIA GPU. If you need sort "
532 changes: 491 additions & 41 deletions src/utils/thread_pool.hpp

Large diffs are not rendered by default.