Skip to content

coll/han: disable alltoall for device buffers and MPI_IN_PLACE #13200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions ompi/mca/coll/han/coll_han_alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ int mca_coll_han_alltoall_using_smsc(
{

mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module;
opal_convertor_t convertor;
int send_needs_bounce, have_device_buffer;
size_t packed_size = 0;
enum {
BOUNCE_NOT_INITIALIZED = 0,
BOUNCE_IS_FROM_RBUF = 1,
BOUNCE_IS_FROM_FREELIST = 2,
BOUNCE_IS_FROM_MALLOC = 3,
};


OPAL_OUTPUT_VERBOSE((90, mca_coll_han_component.han_output,
"Entering mca_coll_han_alltoall_using_smsc\n"));
Expand All @@ -82,6 +92,44 @@ int mca_coll_han_alltoall_using_smsc(
comm, han_module->previous_alltoall_module);
}

if (sbuf == MPI_IN_PLACE) {
/* This is not an in-place algorithm */
return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, han_module->previous_alltoall_module);
}

OBJ_CONSTRUCT( &convertor, opal_convertor_t );
send_needs_bounce = 0;
have_device_buffer = 0;
/* get converter for copying to one of the leader ranks, and get packed size: */
opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, &sdtype->super, scount, sbuf, 0, &convertor);
have_device_buffer |= opal_convertor_on_device(&convertor);
send_needs_bounce |= opal_convertor_need_buffers(&convertor);
opal_convertor_cleanup(&convertor);

opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor, &rdtype->super, rcount, rbuf, 0, &convertor);
have_device_buffer |= opal_convertor_on_device(&convertor);
send_needs_bounce |= opal_convertor_need_buffers(&convertor);
opal_convertor_get_packed_size( &convertor, &packed_size );
opal_convertor_cleanup(&convertor);

if (have_device_buffer) {
/*
Although this algorithm is functional for device buffers, it requires an
extra copy through the bounce buffer that doesn't make it efficient.
Prefer another algorithm instead.

Note that Open MPI makes assumptions that if one rank uses a device
buffer in a collective, then all ranks will use device buffers, so there
is no need to communicate before taking this branch.
*/
OBJ_DESTRUCT(&convertor);
return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype,
comm, han_module->previous_alltoall_module);
}



/* Create the subcommunicators */
if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) {
opal_output_verbose(1, mca_coll_han_component.han_output,
Expand All @@ -107,12 +155,11 @@ int mca_coll_han_alltoall_using_smsc(
comm, han_module->previous_alltoall_module);
}

int rc, send_needs_bounce, ii_push_data;
int rc, ii_push_data;
size_t sndsize;
MPI_Aint sextent, rextent, lb;
char *send_bounce;
opal_convertor_t convertor;
size_t packed_size = 0, packed_size_tmp;
char *send_bounce = NULL;
size_t packed_size_tmp;
int use_isend;
void *gather_buf_in[4];
int up_rank;
Expand Down Expand Up @@ -140,22 +187,6 @@ int mca_coll_han_alltoall_using_smsc(
}
if (fanout > up_size) { fanout = up_size; }

OBJ_CONSTRUCT( &convertor, opal_convertor_t );


send_needs_bounce = 0;
/* get converter for copying to one of the leader ranks, and get packed size: */
opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, &sdtype->super, scount, sbuf, 0, &convertor);
send_needs_bounce |= 0 != opal_convertor_on_device(&convertor);
send_needs_bounce |= opal_convertor_need_buffers(&convertor);
opal_convertor_cleanup(&convertor);

opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor, &rdtype->super, rcount, rbuf, 0, &convertor);
send_needs_bounce |= 0 != opal_convertor_on_device(&convertor);
send_needs_bounce |= opal_convertor_need_buffers(&convertor);
opal_convertor_get_packed_size( &convertor, &packed_size );
opal_convertor_cleanup(&convertor);

/*
Because push-mode needs extra synchronizations, we'd like to avoid it,
however it might be necessary:
Expand All @@ -166,7 +197,7 @@ int mca_coll_han_alltoall_using_smsc(

If the application buffer is device memory, we'll also need to exchange
in push mode so that the process which has device registrations can
perform the reads.
perform the reads. (this mode has been disabled)

In both of these cases, we'll need to use the bounce buffer too.
*/
Expand All @@ -186,19 +217,30 @@ int mca_coll_han_alltoall_using_smsc(
inter_recv_reqs = malloc(sizeof(*inter_recv_reqs) * up_size );
char **low_bufs = malloc(low_size * sizeof(*low_bufs));
void **sbuf_map_ctx = malloc(low_size * sizeof(&sbuf_map_ctx));
opal_free_list_item_t *send_fl_item = NULL;

const int nptrs_gather = 3;
void **gather_buf_out = calloc(low_size*nptrs_gather, sizeof(void*));
bool send_bounce_is_allocated = false;
int send_bounce_status = BOUNCE_NOT_INITIALIZED;

do {
start_allgather:
if ( 0 == send_needs_bounce ) {
send_bounce = (char*)rbuf + up_rank*send_bytes_per_fan;
send_bounce_status = BOUNCE_IS_FROM_RBUF;
} else {
if (!send_bounce_is_allocated) {
send_bounce = malloc(send_bytes_per_fan * fanout);
send_bounce_is_allocated = true;
if (send_bounce_status == BOUNCE_NOT_INITIALIZED || send_bounce_status == BOUNCE_IS_FROM_RBUF) {
if (send_bytes_per_fan * fanout < mca_coll_han_component.han_packbuf_bytes) {
send_fl_item = opal_free_list_get(&mca_coll_han_component.pack_buffers);
if (send_fl_item) {
send_bounce_status = BOUNCE_IS_FROM_FREELIST;
send_bounce = send_fl_item->ptr;
}
}
if (!send_fl_item) {
send_bounce = malloc(send_bytes_per_fan * fanout);
send_bounce_status = BOUNCE_IS_FROM_MALLOC;
}
}
}

Expand Down Expand Up @@ -384,7 +426,11 @@ int mca_coll_han_alltoall_using_smsc(
}
}
OBJ_DESTRUCT(&convertor);
if (send_bounce_is_allocated) free(send_bounce);
if (send_bounce_status == BOUNCE_IS_FROM_FREELIST) {
opal_free_list_return(&mca_coll_han_component.pack_buffers, send_fl_item);
} else if (send_bounce_status == BOUNCE_IS_FROM_MALLOC) {
free(send_bounce);
}
free(inter_send_reqs);
free(inter_recv_reqs);
free(sbuf_map_ctx);
Expand Down