Skip to content

UCP: wireup slow connect-to-iface lanes on demand #10640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5dc4a48
TODO: hang is around UCP_EP_CONFIG_CMP_IGNORE_ADDR_INDEX and config_l…
evgeny-leksikov Apr 7, 2025
d381740
TODO: re-calculate locally addr_indices on addr reply
evgeny-leksikov Apr 8, 2025
f0e56d8
GTEST: ondemand wireup test
evgeny-leksikov Apr 8, 2025
4da47f3
UCP/WIREUP: remove addr_index from configuration, recalculate locally
evgeny-leksikov Apr 15, 2025
ea6ff13
UCP/WIREUP: Use ucp_ep_get_lane_raw for lane retrieval to handle NULL…
evgeny-leksikov Apr 16, 2025
d034dd5
UCP/WIREUP: Add assertion to suppress clang warning for NULL endpoint
evgeny-leksikov Apr 22, 2025
78b052e
UCP/CONFIG: Remove unused flags parameter from ucp_ep_config_lane_is_…
evgeny-leksikov Apr 22, 2025
e7562a2
UCP/WIREUP: fix compile error non-trivial designated initializers not…
evgeny-leksikov Apr 22, 2025
37e6b96
UCP/WIREUP: fix hang in dc/test_max_lanes.lanes_reconf/1
evgeny-leksikov Apr 23, 2025
1c6a765
UCP/WIREUP: fix coverity issue
evgeny-leksikov Apr 23, 2025
54efc1e
UCP/WIREUP: replace ucp_ep_get_lane with ucp_ep_get_lane_raw for impr…
evgeny-leksikov Apr 24, 2025
31b0720
UCP/WIREUP: Enhance lane value checks with optional all-lanes validat…
evgeny-leksikov Apr 24, 2025
8678fed
UCP/WIREUP: fix number of tested lanes
evgeny-leksikov Apr 24, 2025
5a632f8
GTEST/UCP: fix logs format in test_ucp_wireup_ondemand
evgeny-leksikov Apr 28, 2025
27f2cb0
UCP/CONFIG: Enhance endpoint configuration comparison by introducing …
evgeny-leksikov Apr 28, 2025
a2fd796
UCP/WIREUP: fix static checker
evgeny-leksikov Apr 28, 2025
a210a31
GTEST/UCP: Improve comments and flush receiver in test_ucp_wireup_ond…
evgeny-leksikov Apr 29, 2025
c53afa1
Merge remote-tracking branch 'upstream/master' into ucp_wireup_ondema…
evgeny-leksikov May 7, 2025
0eb9b3d
UCP/WIREUP: do not reset uncompletd REQs in case of ondemand wireup t…
evgeny-leksikov May 9, 2025
4fd3f02
Merge remote-tracking branch 'upstream/master' into ucp_wireup_ondema…
evgeny-leksikov May 9, 2025
d6b65f6
UCP/EP: Enhance endpoint configuration comparison with flags
evgeny-leksikov May 13, 2025
92e28b6
UCP/WIREUP: Introduce address request/reply wireup messages and refac…
evgeny-leksikov May 13, 2025
e61ba5c
UCP/WIREUP: replace ucp_ep_get_lane with ucp_ep_get_lane_raw where NU…
evgeny-leksikov May 13, 2025
d276066
Merge branch 'ucp_wireup_ondemand_p1_ep_config_cmp' into ucp_wireup_o…
evgeny-leksikov May 14, 2025
01bf905
Merge branch 'ucp_wireup_ondemand_p2_slow_lane_init' into ucp_wireup_…
evgeny-leksikov May 14, 2025
556d780
UCP/WIREUP: del leftovers after merge
evgeny-leksikov May 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ static ucs_config_field_t ucp_context_config_table[] = {
ucs_offsetof(ucp_context_config_t, connect_all_to_all),
UCS_CONFIG_TYPE_BOOL},

{"ON_DEMAND_WIREUP", "y", /* TODO: disable by default */
"Enable new protocol selection logic",
ucs_offsetof(ucp_context_config_t, on_demand_wireup), UCS_CONFIG_TYPE_BOOL},

{NULL}
};

Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ typedef struct ucp_context_config {
/** Extend endpoint lanes connections of each local device to all remote
* devices */
int connect_all_to_all;
/** On demand lanes wireup */
int on_demand_wireup;
} ucp_context_config_t;


Expand Down
49 changes: 35 additions & 14 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ ucp_ep_purge_lanes(ucp_ep_h ep, uct_pending_purge_callback_t purge_cb,
uct_ep_h uct_ep;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if ((lane == ucp_ep_get_cm_lane(ep)) || (uct_ep == NULL)) {
continue;
}
Expand All @@ -1301,7 +1301,7 @@ static void ucp_ep_check_lanes(ucp_ep_h ep)
uct_ep_h uct_ep;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if ((uct_ep != NULL) && ucp_is_uct_ep_failed(uct_ep)) {
num_failed_tl_ep++;
}
Expand All @@ -1324,7 +1324,7 @@ ucp_ep_set_lanes_failed(ucp_ep_h ep, uct_ep_h *uct_eps, uct_ep_h failed_ep)
ucp_ep_update_flags(ep, UCP_EP_FLAG_FAILED, UCP_EP_FLAG_LOCAL_CONNECTED);

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
uct_eps[lane] = uct_ep;

/* Set UCT EP to failed UCT EP to make sure if UCP EP won't be destroyed
Expand Down Expand Up @@ -1953,30 +1953,28 @@ int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
}

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2)
const ucp_ep_config_key_t *key2, unsigned flags)
{
ucp_lane_index_t lane;
int i;

ucs_assert(flags & UCP_EP_CONFIG_CMP_FLAG_LANES);
ucs_assert(flags <= UCP_EP_CONFIG_CMP_MASK_ALL);

/* Compare lanes layout */
if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->am_bw_lanes, key2->am_bw_lanes,
sizeof(key1->am_bw_lanes)) ||
memcmp(key1->rma_bw_lanes, key2->rma_bw_lanes,
sizeof(key1->rma_bw_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
(key1->rma_bw_md_map != key2->rma_bw_md_map) ||
(key1->rma_md_map != key2->rma_md_map) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_msg_lane != key2->wireup_msg_lane) ||
(key1->cm_lane != key2->cm_lane) ||
(key1->keepalive_lane != key2->keepalive_lane) ||
(key1->rkey_ptr_lane != key2->rkey_ptr_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->flags != key2->flags) ||
(key1->dst_version != key2->dst_version)) {
(key1->rkey_ptr_lane != key2->rkey_ptr_lane)) {
return 0;
}

Expand All @@ -1986,6 +1984,21 @@ int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
}
}

/* If we are comparing lanes only, we are done */
if (flags != UCP_EP_CONFIG_CMP_MASK_ALL) {
return 1;
}

/* Compare all the rest */
if ((key1->rma_bw_md_map != key2->rma_bw_md_map) ||
(key1->rma_md_map != key2->rma_md_map) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->err_mode != key2->err_mode) ||
(key1->flags != key2->flags) ||
(key1->dst_version != key2->dst_version)) {
return 0;
}

for (i = 0; i < ucs_popcount(key1->reachable_md_map); ++i) {
if (key1->dst_md_cmpts[i] != key2->dst_md_cmpts[i]) {
return 0;
Expand Down Expand Up @@ -3406,7 +3419,7 @@ ucp_wireup_ep_t* ucp_ep_get_cm_wireup_ep(ucp_ep_h ep)
return NULL;
}

uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
return (uct_ep != NULL) ? ucp_wireup_ep(uct_ep) : NULL;
}

Expand All @@ -3420,7 +3433,7 @@ uct_ep_h ucp_ep_get_cm_uct_ep(ucp_ep_h ep)
return NULL;
}

if (ucp_ep_get_lane(ep, lane) == NULL) {
if (ucp_ep_get_lane_raw(ep, lane) == NULL) {
return NULL;
}

Expand All @@ -3439,13 +3452,21 @@ int ucp_ep_is_local_connected(ucp_ep_h ep)
{
int is_local_connected = !!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED);
ucp_wireup_ep_t *wireup_ep;
uct_ep_h uct_ep;
ucp_lane_index_t i;

if (ucp_ep_has_cm_lane(ep)) {
/* For CM case need to check all wireup lanes because transport lanes
* can be not connected yet. */
for (i = 0; is_local_connected && (i < ucp_ep_num_lanes(ep)); ++i) {
wireup_ep = ucp_wireup_ep(ucp_ep_get_lane(ep, i));
uct_ep = ucp_ep_get_lane_raw(ep, i);
if (uct_ep == NULL) {
ucs_assert(ep->worker->context->config.ext.on_demand_wireup);
ucs_assert(!ucp_ep_is_lane_p2p(ep, i));
continue;
}

wireup_ep = ucp_wireup_ep(uct_ep);
is_local_connected = (wireup_ep == NULL) ||
(wireup_ep->flags &
UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED);
Expand Down
22 changes: 21 additions & 1 deletion src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ enum {
};


/**
* Endpoint configuration comparison flags
*/
enum {
/* Compare lanes layout */
UCP_EP_CONFIG_CMP_FLAG_LANES = UCS_BIT(0),
/* Strong compare of all fields */
UCP_EP_CONFIG_CMP_MASK_ALL = UCS_MASK(8)
};


#define UCP_EP_STAT_TAG_OP(_ep, _op) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);

Expand Down Expand Up @@ -770,8 +781,17 @@ int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2,
ucp_lane_index_t lane);

/**
* @brief Compare two endpoint configurations.
*
* @param [in] key1 First config key to compare.
* @param [in] key2 Second config key to compare.
* @param [in] flags Comparison flags (UCP_EP_CONFIG_CMP_XXX).
*
* @return Whether the configurations are equal.
*/
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);
const ucp_ep_config_key_t *key2, unsigned flags);

void ucp_ep_config_name(ucp_worker_h worker, ucp_worker_cfg_index_t cfg_index,
ucs_string_buffer_t *strb);
Expand Down
51 changes: 50 additions & 1 deletion src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,20 @@ static UCS_F_ALWAYS_INLINE uct_ep_h ucp_ep_get_fast_lane(ucp_ep_h ep,
return ep->uct_eps[lane_index];
}

/**
* @brief Get the raw @ref uct_ep pointer for the given lane index.
*
* @param ep The endpoint.
* @param lane_index The lane index.
* @return The @ref uct_ep pointer for the given lane index if it's
* initialized, otherwise NULL.
*
* NOTE: This function is assumed to be used only in the wireup path when the
* state of the endpoint is known and from @ref ucp_ep_get_lane() to
* inittiate on demand wireup of uninitialized lanes.
*/
static UCS_F_ALWAYS_INLINE uct_ep_h
ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
ucp_ep_get_lane_raw(ucp_ep_h ep, ucp_lane_index_t lane_index)
{
ucs_assertv(lane_index < UCP_MAX_LANES, "lane=%d", lane_index);

Expand All @@ -41,6 +53,31 @@ ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
}
}

/**
* @brief Get the uct_ep pointer for the given lane index.
*
* @param ep The endpoint.
* @param lane_index The lane index.
* @return The @ref uct_ep pointer for the given lane index.
*
* NOTE: This is a common wrapper function to access lanes by index from data
* path. It starts on demand wireup protocol for uninitialized lanes and
* returns a valid pointer.
*/
static UCS_F_ALWAYS_INLINE uct_ep_h
ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
{
uct_ep_h uct_ep;
ucs_assertv(lane_index < UCP_MAX_LANES, "lane=%d", lane_index);

uct_ep = ucp_ep_get_lane_raw(ep, lane_index);
if (ucs_likely(uct_ep != NULL)) {
return uct_ep;
}

return ucp_wireup_init_slow_lane(ep, lane_index - UCP_MAX_FAST_PATH_LANES);
}

static UCS_F_ALWAYS_INLINE void ucp_ep_set_lane(ucp_ep_h ep, size_t lane_index,
uct_ep_h uct_ep)
{
Expand Down Expand Up @@ -137,6 +174,18 @@ static inline ucp_lane_index_t ucp_ep_num_lanes(ucp_ep_h ep)
return ucp_ep_config(ep)->key.num_lanes;
}

static inline ucp_lane_index_t ucp_ep_num_valid_lanes(ucp_ep_h ep)
{
ucp_lane_index_t num_lanes;
ucp_lane_index_t i;

for (num_lanes = 0, i = 0; i < ucp_ep_num_lanes(ep); ++i) {
num_lanes += (ucp_ep_get_lane_raw(ep, i) != NULL);
}

return num_lanes;
}

static inline int ucp_ep_is_lane_p2p(ucp_ep_h ep, ucp_lane_index_t lane)
{
return !!(ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane));
Expand Down
10 changes: 8 additions & 2 deletions src/ucp/core/ucp_proxy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,20 @@ uct_ep_h ucp_proxy_ep_extract(uct_ep_h ep)
void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep)
{
ucp_ep_h ucp_ep = proxy_ep->ucp_ep;
uct_ep_h uct_ep;
ucp_lane_index_t lane;

ucs_assert(proxy_ep->uct_ep != NULL);
for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
if (ucp_ep_get_lane(ucp_ep, lane) == &proxy_ep->super) {
uct_ep = ucp_ep_get_lane_raw(ucp_ep, lane);
if (uct_ep == NULL) {
continue;
}

if (uct_ep == &proxy_ep->super) {
ucs_assert(proxy_ep->uct_ep != NULL); /* make sure there is only one match */
ucp_ep_set_lane(ucp_ep, lane, proxy_ep->uct_ep);
proxy_ep->uct_ep = NULL;
proxy_ep->uct_ep = NULL;
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,8 @@ ucs_status_t ucp_worker_get_ep_config(ucp_worker_h worker,

/* Search for the given key in the ep_config array */
ucs_array_for_each(ep_config, &worker->ep_config) {
if (ucp_ep_config_is_equal(&ep_config->key, key)) {
if (ucp_ep_config_is_equal(&ep_config->key, key,
UCP_EP_CONFIG_CMP_MASK_ALL)) {
ep_cfg_index = ep_config - worker->ep_config.buffer;
goto out;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/proto/proto_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ ucp_proto_select_init_protocols(ucp_worker_h worker,
init_params.select_param = select_param;
init_params.ep_cfg_index = ep_cfg_index;
init_params.rkey_cfg_index = rkey_cfg_index;
init_params.ep_config_key = &ucs_array_elem(&worker->ep_config,
ep_cfg_index).key;
init_params.ep_config_key = &ucp_worker_ep_config(worker,
ep_cfg_index)->key;
init_params.ctx = proto_init;

if (rkey_cfg_index == UCP_WORKER_CFG_INDEX_NULL) {
Expand Down
18 changes: 11 additions & 7 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ static void ucp_ep_flush_progress(ucp_request_t *req)

/* Search for next lane to start flush */
lane = ucs_ffs64(all_lanes & ~req->send.flush.started_lanes);
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if (uct_ep == NULL) {
ucs_info("ep %p flush not connected lane %d", ep, lane);
ucp_ep_flush_request_update_uct_comp(req, -1, UCS_BIT(lane));
continue;
}
Expand Down Expand Up @@ -243,7 +244,8 @@ static void ucp_ep_flush_request_resched(ucp_ep_h ep, ucp_request_t *req)
* wireup is done because connect2iface does not create wireup_ep
* without cm mode */
ucs_assertv(!(req->send.flush.started_lanes &
ucp_ep_config(ep)->p2p_lanes),
ucp_ep_config(ep)->p2p_lanes) ||
ep->worker->context->config.ext.on_demand_wireup,
"req=%p flush started_lanes=0x%" PRIx64
" p2p_lanes=0x%" PRIx64,
req, req->send.flush.started_lanes,
Expand Down Expand Up @@ -279,10 +281,10 @@ ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
ucs_assertv(lane != UCP_NULL_LANE, "ep=%p flush_req=%p lane=%d",
ep, req, lane);

status = uct_ep_flush(ucp_ep_get_lane(ep, lane), req->send.flush.uct_flags,
status = uct_ep_flush(ucp_ep_get_lane_raw(ep, lane), req->send.flush.uct_flags,
&req->send.state.uct_comp);
ucp_trace_req(req, "flush ep %p lane[%d]=%p: %s", ep, lane,
ucp_ep_get_lane(ep, lane), ucs_status_string(status));
ucp_ep_get_lane_raw(ep, lane), ucs_status_string(status));
if (status == UCS_OK) {
ucp_ep_flush_request_update_uct_comp(req, -1, UCS_BIT(lane));
} else if (status == UCS_INPROGRESS) {
Expand All @@ -300,7 +302,8 @@ ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
/* If the operation has not completed, and not started on all lanes, add
* slow-path progress to resume */
if (!completed &&
(req->send.flush.started_lanes != UCS_MASK(ucp_ep_num_lanes(ep)))) {
(req->send.flush.started_lanes !=
UCS_MASK(ucp_ep_num_valid_lanes(ep)))) {
ucp_ep_flush_request_resched(ep, req);
}

Expand Down Expand Up @@ -381,6 +384,7 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const char *debug_name,
unsigned uct_flags)
{
ucp_lane_index_t num_lanes = ucp_ep_num_lanes(ep);
ucs_status_t status;
ucp_request_t *req;

Expand All @@ -404,12 +408,12 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
req->send.flush.uct_flags = uct_flags;
req->send.flush.sw_started = 0;
req->send.flush.sw_done = 0;
req->send.flush.num_lanes = ucp_ep_num_lanes(ep);
req->send.flush.num_lanes = num_lanes;
req->send.flush.started_lanes = 0;
req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.state.uct_comp.func = ucp_ep_flush_completion;
req->send.state.uct_comp.count = ucp_ep_num_lanes(ep);
req->send.state.uct_comp.count = num_lanes;
req->send.state.uct_comp.status = UCS_OK;

ucp_request_set_super(req, worker_req);
Expand Down
3 changes: 1 addition & 2 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -2607,15 +2607,14 @@ ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params,
*/
for (lane = 0; lane < key->num_lanes; ++lane) {
ucs_assert(select_ctx->lane_descs[lane].lane_types != 0);
addr_indices[lane] = select_ctx->lane_descs[lane].addr_index;
key->lanes[lane].rsc_index = select_ctx->lane_descs[lane].rsc_index;
key->lanes[lane].dst_md_index = select_ctx->lane_descs[lane].dst_md_index;
key->lanes[lane].dst_sys_dev = select_ctx->lane_descs[lane].dst_sys_dev;
key->lanes[lane].lane_types = select_ctx->lane_descs[lane].lane_types;
key->lanes[lane].seg_size = select_ctx->lane_descs[lane].seg_size;
key->lanes[lane].path_index = ucp_wireup_default_path_index(
select_ctx->lane_descs[lane].path_index);

addr_indices[lane] = select_ctx->lane_descs[lane].addr_index;
ucs_trace("ep %p: construct lane %d to addr_index %d", ep, lane,
select_ctx->lane_descs[lane].addr_index);

Expand Down
Loading
Loading