Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 77 additions & 10 deletions realm_subgraph/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,31 @@ static Event define_subgraph(Subgraph &subgraph,
}

std::vector<size_t> preconditions;
std::vector<std::vector<size_t> > copy_postconditions(num_fields);
size_t next_precondition = 0;

size_t next_postcondition = 0;

std::vector<std::vector<std::vector<size_t> > > copy_postconditions(last_point - first_point + 1);
std::vector<std::vector<size_t> > input_copy_postconditions(last_point - first_point + 1);
std::vector<std::vector<size_t> > output_copy_postconditions(last_point - first_point + 1);

for (auto &point_postconditions : copy_postconditions) {
point_postconditions.resize(num_fields);
}
for (auto &point_postconditions : input_copy_postconditions) {
point_postconditions.resize(num_fields);
for (auto &postcondition : point_postconditions) {
postcondition = next_precondition++;
}
}
for (auto &point_postconditions : output_copy_postconditions) {
point_postconditions.resize(num_fields);
for (auto &postcondition : point_postconditions) {
postcondition = next_postcondition++;
}
}

printf("generating subgraph for timestep %ld to %ld points %ld to %ld\n", start_timestep, stop_timestep, first_point, last_point);
for (long timestep = start_timestep; timestep < stop_timestep; ++timestep) {
long dset = graph.dependence_set_at_timestep(timestep);
long next_dset = graph.dependence_set_at_timestep(timestep + 1);
Expand Down Expand Up @@ -390,20 +412,35 @@ static Event define_subgraph(Subgraph &subgraph,
definition.dependencies.push_back(precondition_dep);
}

for (auto precondition : copy_postconditions.at(fid - FID_FIRST)) {
if (timestep < start_timestep + num_fields) {
printf(" issuing copy for timestep %ld point %ld fid %ld from input\n", timestep, point, fid);
auto precondition = input_copy_postconditions.at(point - first_point).at(fid - FID_FIRST);
SubgraphDefinition::Dependency precondition_dep;
precondition_dep.src_op_kind = SubgraphDefinition::OpKind::OPKIND_COPY;
precondition_dep.src_op_kind = SubgraphDefinition::OpKind::OPKIND_EXT_PRECOND;
precondition_dep.src_op_index = precondition;
precondition_dep.src_op_port = 0;
precondition_dep.tgt_op_kind = SubgraphDefinition::OpKind::OPKIND_TASK;
precondition_dep.tgt_op_index = task_postcondition;
precondition_dep.tgt_op_port = 0;

definition.dependencies.push_back(precondition_dep);
} else {
printf(" issuing copy for timestep %ld point %ld fid %ld from body\n", timestep, point, fid);
for (auto precondition : copy_postconditions.at(point - first_point).at(fid - FID_FIRST)) {
SubgraphDefinition::Dependency precondition_dep;
precondition_dep.src_op_kind = SubgraphDefinition::OpKind::OPKIND_COPY;
precondition_dep.src_op_index = precondition;
precondition_dep.src_op_port = 0;
precondition_dep.tgt_op_kind = SubgraphDefinition::OpKind::OPKIND_TASK;
precondition_dep.tgt_op_index = task_postcondition;
precondition_dep.tgt_op_port = 0;

definition.dependencies.push_back(precondition_dep);
}
}
}

copy_postconditions.at(fid - FID_FIRST).clear();
copy_postconditions.at(point - first_point).at(fid - FID_FIRST).clear();

// RAW dependencies
for (auto interval : graph.reverse_dependencies(next_dset, point)) {
Expand All @@ -426,7 +463,8 @@ static Event define_subgraph(Subgraph &subgraph,
.at(slot),
fid, graph.output_bytes_per_task));

copy_postconditions.at(fid - FID_FIRST).push_back(copy_postcondition);
copy_postconditions.at(point - first_point).at(fid - FID_FIRST).push_back(copy_postcondition);
printf(" adding a copy postcondition for timestep %ld point %ld fid %ld copy %lu\n", timestep, point, fid, copy_postcondition);

if (task_postcondition != SIZE_MAX) {
SubgraphDefinition::Dependency task_dep;
Expand Down Expand Up @@ -585,6 +623,23 @@ static Event define_subgraph(Subgraph &subgraph,
}
}

for (long point = first_point; point <= last_point; ++point) {
for (long fid = FID_FIRST; fid < FID_FIRST + num_fields; ++fid) {
for (auto precondition : copy_postconditions.at(point - first_point).at(fid - FID_FIRST)) {
printf(" setting up output copy postconditions for point %ld fid %ld copy %lu postcondition %lu\n", point, fid, precondition, output_copy_postconditions.at(point - first_point).at(fid - FID_FIRST));
SubgraphDefinition::Dependency precondition_dep;
precondition_dep.src_op_kind = SubgraphDefinition::OpKind::OPKIND_COPY;
precondition_dep.src_op_index = precondition;
precondition_dep.src_op_port = 0;
precondition_dep.tgt_op_kind = SubgraphDefinition::OpKind::OPKIND_EXT_POSTCOND;
precondition_dep.tgt_op_index = output_copy_postconditions.at(point - first_point).at(fid - FID_FIRST);
precondition_dep.tgt_op_port = 0;

definition.dependencies.push_back(precondition_dep);
}
}
}

if (replayable) {
definition.concurrency_mode = SubgraphDefinition::ConcurrencyMode::INSTANTIATION_ORDER;
} else {
Expand All @@ -607,7 +662,9 @@ static Event instantiate_subgraph(Subgraph &subgraph,
std::vector<std::vector<std::vector<std::map<long, Barrier> > > > &war_out,
const std::vector<std::vector<std::vector<std::vector<long> > > > &raw_points_not_in_dset,
const std::vector<std::vector<std::vector<std::vector<long> > > > &war_points_not_in_dset,
const std::vector<std::vector<std::vector<char *> > > &result_base)
const std::vector<std::vector<std::vector<char *> > > &result_base,
const std::vector<Event> &input_copy_postconditions,
std::vector<Event> &output_copy_postconditions)
{
DynamicBufferSerializer global_ser(4096); // FIXME: dynamic allocation

Expand All @@ -617,6 +674,8 @@ static Event instantiate_subgraph(Subgraph &subgraph,

std::vector<Event> preconditions;

preconditions.insert(preconditions.begin(), input_copy_postconditions.begin(), input_copy_postconditions.end());

for (long timestep = start_timestep; timestep < stop_timestep; ++timestep) {
long dset = graph.dependence_set_at_timestep(timestep);
long next_dset = graph.dependence_set_at_timestep(timestep + 1);
Expand Down Expand Up @@ -728,11 +787,10 @@ static Event instantiate_subgraph(Subgraph &subgraph,
}
}

std::vector<Event> postconditions;
return subgraph.instantiate(global_ser.get_buffer(), global_ser.bytes_used(),
ProfilingRequestSet(),
preconditions,
postconditions,
output_copy_postconditions,
subgraph_ready);
}

Expand Down Expand Up @@ -1209,6 +1267,8 @@ void shard_task(const void *args, size_t arglen, const void *userdata,

// Main loop
unsigned long long start_time = 0, stop_time = 0;
std::vector<Event> input_copy_postconditions;
std::vector<Event> output_copy_postconditions;
for (long rep = 0; rep < 1; ++rep) {
start_time = Clock::current_time_in_nanoseconds();
for (size_t graph_index = 0; graph_index < graphs.size(); ++graph_index) {
Expand All @@ -1226,6 +1286,9 @@ void shard_task(const void *args, size_t arglen, const void *userdata,
Event ready = Event::NO_EVENT;
Event postcondition = Event::NO_EVENT;

input_copy_postconditions.clear();
input_copy_postconditions.resize((last_point - first_point + 1)*num_fields, Event::NO_EVENT);

long period = lcm(num_fields, graph.timestep_period());

for (long start_timestep = 0; start_timestep < graph.timesteps; start_timestep += period) {
Expand Down Expand Up @@ -1279,8 +1342,9 @@ void shard_task(const void *args, size_t arglen, const void *userdata,
}

// Replay the subgraph.
output_copy_postconditions.clear();
postcondition = instantiate_subgraph(current_subgraph,
Event::merge_events(current_ready, postcondition),
current_ready,
graph, graph_index,
start_timestep, stop_timestep,
first_point, last_point,
Expand All @@ -1292,8 +1356,11 @@ void shard_task(const void *args, size_t arglen, const void *userdata,
war_out,
raw_points_not_in_dset,
war_points_not_in_dset,
result_base);
result_base,
input_copy_postconditions,
output_copy_postconditions);
events.push_back(postcondition);
input_copy_postconditions.swap(output_copy_postconditions);

if (!replay) {
current_subgraph.destroy(postcondition);
Expand Down