Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency/cuda stream Improvements #566

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/flamegpu/gpu/CUDAAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,22 @@ class CUDAAgent : public AgentInterface {
* @return An ID that can be assigned to an agent that wil be stored within this CUDAAgent's CUDAFatAgent
*/
id_t nextID(unsigned int count = 1) override;
/**
* Returns a device pointer to the value returns by nextID(0)
* If the device value is changed, then the internal ID counter must be updated via CUDAAgent::scatterNew()
*/
id_t* getDeviceNextIDAsync(cudaStream_t stream);
/**
* Returns a device pointer to the value returns by nextID(0)
* If the device value is changed, then the internal ID counter must be updated via CUDAAgent::scatterNew()
*/
id_t* getDeviceNextID();
/**
* Assigns IDs to any agents who's ID has the value ID_NOT_SET
* @param hostapi HostAPI object, this is used to provide cub temp storage
* @param stream Stream to perform the work with asynchronously.
*/
void assignIDsAsync(HostAPI& hostapi, cudaStream_t stream);
/**
* Assigns IDs to any agents who's ID has the value ID_NOT_SET
* @param hostapi HostAPI object, this is used to provide cub temp storage
Expand Down
13 changes: 13 additions & 0 deletions include/flamegpu/gpu/CUDAFatAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ class CUDAFatAgent {
* @return An ID that can be assigned to an agent that wil be stored within this CUDAFatAgent
*/
id_t nextID(unsigned int count = 1);
/**
* Returns a device pointer to the value returns by nextID(0), and ayschronously updates a value used on the device using the provided stream.
* If the device value is changed, then the internal ID counter must be updated via CUDAAgent::scatterNew()
* @param stream the cuda stream in which to perform the asynchronous host to device copy.
*/
id_t *getDeviceNextIDAsync(cudaStream_t stream);
/**
* Returns a device pointer to the value returns by nextID(0)
* If the device value is changed, then the internal ID counter must be updated via CUDAAgent::scatterNew()
Expand All @@ -158,11 +164,18 @@ class CUDAFatAgent {
* @param newCount The number of agents birthed on device
*/
void notifyDeviceBirths(unsigned int newCount);
/**
* Assigns IDs to any agents who's ID has the value ID_NOT_SET
* @param hostapi HostAPI object, this is used to provide cub temp storage
* @param stream Stream to perform the work with asynchronously.
*/
void assignIDsAsync(HostAPI& hostapi, cudaStream_t stream);
/**
* Assigns IDs to any agents who's ID has the value ID_NOT_SET
* @param hostapi HostAPI object, this is used to provide cub temp storage
*/
void assignIDs(HostAPI& hostapi);

/**
* Resets the flag agent_ids_have_init
*/
Expand Down
6 changes: 6 additions & 0 deletions src/flamegpu/gpu/CUDAAgent.cu
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,15 @@ std::list<std::shared_ptr<VariableBuffer>> CUDAAgent::getUnboundVariableBuffers(
id_t CUDAAgent::nextID(unsigned int count) {
return fat_agent->nextID(count);
}
id_t* CUDAAgent::getDeviceNextIDAsync(cudaStream_t stream) {
return fat_agent->getDeviceNextIDAsync(stream);
}
id_t* CUDAAgent::getDeviceNextID() {
return fat_agent->getDeviceNextID();
}
void CUDAAgent::assignIDsAsync(HostAPI& hostapi, cudaStream_t stream) {
fat_agent->assignIDsAsync(hostapi, stream);
}
void CUDAAgent::assignIDs(HostAPI& hostapi) {
fat_agent->assignIDs(hostapi);
}
Expand Down
29 changes: 21 additions & 8 deletions src/flamegpu/gpu/CUDAFatAgent.cu
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,23 @@ id_t CUDAFatAgent::nextID(unsigned int count) {
_nextID += count;
return rtn;
}
id_t *CUDAFatAgent::getDeviceNextID() {
id_t *CUDAFatAgent::getDeviceNextIDAsync(cudaStream_t stream) {
if (!d_nextID) {
gpuErrchk(cudaMalloc(&d_nextID, sizeof(id_t)));
}
if (hd_nextID != _nextID) {
gpuErrchk(cudaMemcpy(d_nextID, &_nextID, sizeof(id_t), cudaMemcpyHostToDevice));
gpuErrchk(cudaMemcpyAsync(d_nextID, &_nextID, sizeof(id_t), cudaMemcpyHostToDevice, stream));
hd_nextID = _nextID;
}
return d_nextID;
}
id_t *CUDAFatAgent::getDeviceNextID() {
// Update d_nextID via getDeviceNextIDAsync
this->getDeviceNextIDAsync(0); // @todo - do not just pass the default stream.
// Synchronise the default stream
gpuErrchk(cudaStreamSynchronize(0)); // @todo - Do this nicer?
return d_nextID;
}
void CUDAFatAgent::notifyDeviceBirths(unsigned int newCount) {
_nextID += newCount;
hd_nextID += newCount;
Expand All @@ -336,8 +343,8 @@ void CUDAFatAgent::notifyDeviceBirths(unsigned int newCount) {
assert(t == _nextID); // At the end of device birth they should be equal, as no host birth can occur between pre and post processing agent fn
#endif
}
void CUDAFatAgent::assignIDs(HostAPI& hostapi) {
NVTX_RANGE("CUDAFatAgent::assignIDs");
void CUDAFatAgent::assignIDsAsync(HostAPI& hostapi, cudaStream_t stream) {
NVTX_RANGE("CUDAFatAgent::assignIDsAsync");
if (agent_ids_have_init) return;
id_t h_max = ID_NOT_SET;
// Find the max ID within the current agents
Expand All @@ -353,13 +360,13 @@ void CUDAFatAgent::assignIDs(HostAPI& hostapi) {
if (hostapi.tempStorageRequiresResize(cc, s->getSize())) {
// Resize cub storage
size_t tempByte = 0;
gpuErrchk(cub::DeviceReduce::Max(nullptr, tempByte, static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize()));
gpuErrchk(cub::DeviceReduce::Max(nullptr, tempByte, static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize(), stream));
gpuErrchkLaunch();
hostapi.resizeTempStorage(cc, s->getSize(), tempByte);
}
hostapi.resizeOutputSpace<id_t>();
gpuErrchk(cub::DeviceReduce::Max(hostapi.d_cub_temp, hostapi.d_cub_temp_size, static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize()));
gpuErrchk(cudaMemcpy(&h_max, hostapi.d_output_space, sizeof(id_t), cudaMemcpyDeviceToHost));
gpuErrchk(cub::DeviceReduce::Max(hostapi.d_cub_temp, hostapi.d_cub_temp_size, static_cast<id_t*>(vb->data), reinterpret_cast<id_t*>(hostapi.d_output_space), s->getSize(), stream));
gpuErrchk(cudaMemcpyAsync(&h_max, hostapi.d_output_space, sizeof(id_t), cudaMemcpyDeviceToHost, stream));
_nextID = std::max(_nextID, h_max + 1);
}
}
Expand All @@ -374,14 +381,20 @@ void CUDAFatAgent::assignIDs(HostAPI& hostapi) {
if (vb && vb->data && s->getSize()) {
const unsigned int blockSize = 1024;
const unsigned int blocks = ((s->getSize() - 1) / blockSize) + 1;
allocateIDs<< <blocks, blockSize >> > (static_cast<id_t*>(vb->data), s->getSize(), ID_NOT_SET, _nextID);
allocateIDs<<<blocks, blockSize, 0, stream>>> (static_cast<id_t*>(vb->data), s->getSize(), ID_NOT_SET, _nextID);
gpuErrchkLaunch();
}
_nextID += s->getSizeWithDisabled();
}

agent_ids_have_init = true;
}

void CUDAFatAgent::assignIDs(HostAPI& hostapi) {
this->assignIDsAsync(hostapi, 0); // @todo - don't just pass 0 as the stream.
gpuErrchk(cudaStreamSynchronize(0)); // @todo - batch instead?
}

void CUDAFatAgent::resetIDCounter() {
// Resetting ID whilst agents exist is a bad idea, so fail silently
for (auto& s : states_unique)
Expand Down
22 changes: 17 additions & 5 deletions src/flamegpu/gpu/CUDASimulation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ bool CUDASimulation::step() {
std::unique_ptr<util::detail::Timer> stepTimer = getDriverAppropriateTimer();
stepTimer->start();

// Init any unset agent IDs
this->assignAgentIDs();

// If verbose, print the step number.
if (getSimulationConfig().verbose) {
fprintf(stdout, "Processing Simulation Step %u\n", step_count);
Expand All @@ -299,6 +296,9 @@ bool CUDASimulation::step() {
unsigned int nStreams = getMaximumLayerWidth();
this->createStreams(nStreams);

// Init any unset agent IDs
this->assignAgentIDs();

// Reset message list flags
for (auto m = message_map.begin(); m != message_map.end(); ++m) {
m->second->setTruncateMessageListFlag();
Expand Down Expand Up @@ -672,7 +672,7 @@ void CUDASimulation::stepLayer(const std::shared_ptr<LayerData>& layer, const un
if (auto oa = func_des->agent_output.lock()) {
agentoutput_hash = (detail::curve::Curve::variableRuntimeHash("_agent_birth") ^ funcname_hash) + instance_id;
CUDAAgent& output_agent = getCUDAAgent(oa->name);
d_agentOut_nextID = output_agent.getDeviceNextID();
d_agentOut_nextID = output_agent.getDeviceNextIDAsync(this->getStream(streamIdx));
}

const CUDAAgent& cuda_agent = getCUDAAgent(agent_name);
Expand Down Expand Up @@ -1724,8 +1724,20 @@ void CUDASimulation::assignAgentIDs() {
// Ensure singletons have been initialised
initialiseSingletons();

// Ensure there are enough streasm
unsigned int nStreams = static_cast<unsigned int>(agent_map.size());
this->createStreams(nStreams);

unsigned int idx = 0;
for (auto &a : agent_map) {
a.second->assignIDs(*host_api); // This is cheap if the CUDAAgent thinks it's IDs are already assigned
a.second->assignIDsAsync(*host_api, this->getStream(idx)); // This is cheap if the CUDAAgent thinks it's IDs are already assigned
idx++;
}

// Sync all the streams used here
// @todo - Record an event in each participating stream, and sync that event. Ideally create and destroy the event once.
for (idx = 0; idx < nStreams; idx++) {
gpuErrchk(cudaStreamSynchronize(this->getStream(idx)));
}
agent_ids_have_init = true;
}
Expand Down
5 changes: 1 addition & 4 deletions tests/test_cases/gpu/test_cuda_simulation_concurrency.cu
Original file line number Diff line number Diff line change
Expand Up @@ -844,11 +844,8 @@ RELEASE_ONLY_TEST(TestCUDASimulationConcurrency, ConcurrentMessageOutputInputSpa

/**
* Test for agent birth (to unique lists). Each agent type executes a function, and birth an agent to it's own population.
* @note Disabled since AgentID PR (#512), this PR adds a memcpy (before and) after agent birth.
* @see CUDAFatAgent::getDeviceNextID(): This is called before any agent functon with device birth enabled, however only memcpys on first step or after host agent birth
* @see CUDAFatAgent::notifyDeviceBirths(unsigned int): This is called after any agent function with device birth enabled
*/
RELEASE_ONLY_TEST(TestCUDASimulationConcurrency, DISABLED_LayerConcurrencyBirth) {
RELEASE_ONLY_TEST(TestCUDASimulationConcurrency, LayerConcurrencyBirth) {
// Define a model with multiple agent types
ModelDescription m("LayerConcurrencyBirth");

Expand Down