Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/uct/ib/mlx5/dv/ib_mlx5_dv.c
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ uct_ib_mlx5_devx_create_cq_common(uct_ib_iface_t *iface, uct_ib_dir_t dir,

UCT_IB_MLX5DV_SET(cqc, cqctx, log_cq_size, log_cq_size);
UCT_IB_MLX5DV_SET(cqc, cqctx, cqe_sz, (attr->cqe_size == 128) ? 1 : 0);
UCT_IB_MLX5DV_SET(cqc, cqctx, cc, (attr->cq_size == 1) ? 1 : 0);

if (attr->flags & UCT_IB_MLX5_CQ_CQE_ZIP) {
UCT_IB_MLX5DV_SET(cqc, cqctx, cqe_comp_en, 1);
Expand Down
14 changes: 4 additions & 10 deletions src/uct/ib/mlx5/gdaki/gdaki.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ static UCS_CLASS_INIT_FUNC(uct_rc_gdaki_ep_t, const uct_ep_params_t *params)
return status;
}

init_attr.cq_len[UCT_IB_DIR_TX] = iface->super.super.config.tx_qp_len *
UCT_IB_MLX5_MAX_BB;
init_attr.cq_len[UCT_IB_DIR_TX] = 1;
uct_ib_mlx5_cq_calc_sizes(&iface->super.super.super, UCT_IB_DIR_TX,
&init_attr, 0, &cq_attr);
uct_rc_iface_fill_attr(&iface->super.super, &qp_attr.super,
Expand Down Expand Up @@ -176,16 +175,11 @@ static UCS_CLASS_INIT_FUNC(uct_rc_gdaki_ep_t, const uct_ep_params_t *params)
dev_ep.sq_num = self->qp.super.qp_num;
dev_ep.sq_wqe_daddr = UCS_PTR_BYTE_OFFSET(self->ep_gpu,
qp_attr.umem_offset);
dev_ep.sq_rsvd_index = 0;
dev_ep.sq_ready_index = 0;
dev_ep.sq_wqe_pi = 0;
dev_ep.sq_wqe_num = qp_attr.max_tx;
dev_ep.sq_wqe_num = qp_attr.max_tx;
dev_ep.sq_dbrec = &self->ep_gpu->qp_dbrec[MLX5_SND_DBR];
/* FC mask is used to determine if WQE should be posted with completion.
* qp_attr.max_tx must be a power of 2. */
dev_ep.sq_fc_mask = (qp_attr.max_tx >> 1) - 1;
dev_ep.avail_count = qp_attr.max_tx;
dev_ep.sq_dbrec = &self->ep_gpu->qp_dbrec[MLX5_SND_DBR];

dev_ep.sq_fc_mask = (qp_attr.max_tx >> 1) - 1;
dev_ep.cqe_daddr = UCS_PTR_BYTE_OFFSET(self->ep_gpu, cq_attr.umem_offset);
dev_ep.cqe_num = cq_attr.cq_size;
dev_ep.sq_db = self->sq_db;
Expand Down
183 changes: 94 additions & 89 deletions src/uct/ib/mlx5/gdaki/gdaki.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,99 @@ template<ucs_device_level_t level> UCS_F_DEVICE void uct_rc_mlx5_gda_sync(void)
}
}

UCS_F_DEVICE uint64_t
uct_rc_mlx5_gda_reserv_wqe_thread(uct_rc_gdaki_dev_ep_t *ep, unsigned count)
UCS_F_DEVICE uint16_t uct_rc_mlx5_gda_bswap16(uint16_t x)
{
/* Try to reserve optimistically */
int32_t prev = atomicAdd(&ep->avail_count, -(int32_t)count);
if (prev < (int32_t)count) {
/* Rollback */
atomicAdd(&ep->avail_count, count);
uint32_t ret;
asm volatile("{\n\t"
".reg .b32 mask;\n\t"
".reg .b32 ign;\n\t"
"mov.b32 mask, 0x1;\n\t"
"prmt.b32 %0, %1, ign, mask;\n\t"
"}"
: "=r"(ret)
: "r"((uint32_t)x));
return ret;
}

UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_parse_cqe(uct_rc_gdaki_dev_ep_t *ep,
uint16_t *wqe_cnt,
uint8_t *opcode)
{
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(ep->cqe_daddr);
uint32_t *data_ptr = (uint32_t*)&cqe64->wqe_counter;
uint32_t data = READ_ONCE(*data_ptr);
uint64_t rsvd_idx = READ_ONCE(ep->sq_rsvd_index);

*wqe_cnt = uct_rc_mlx5_gda_bswap16(data);
if (opcode != nullptr) {
*opcode = data >> 28;
}

return rsvd_idx - ((rsvd_idx - *wqe_cnt) & 0xffff);
}

UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_max_alloc_wqe_base(
uct_rc_gdaki_dev_ep_t *ep, unsigned count)
{
uint16_t wqe_cnt;
uint64_t pi;

pi = uct_rc_mlx5_gda_parse_cqe(ep, &wqe_cnt, nullptr);
return pi + ep->sq_wqe_num + 1 - count;
}
Comment on lines +133 to +135
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix off-by-one in max alloc calculation.
uct_rc_mlx5_gda_max_alloc_wqe_base() currently returns pi + sq_wqe_num + 1 - count. When the SQ is full (sq_rsvd_index == pi + sq_wqe_num) and a thread asks for one more WQE, this formula lets the fast-path guard pass, atomicAdd() succeeds, and we end up with sq_rsvd_index == pi + sq_wqe_num + 1. That means we wrap and reuse a slot that still holds an in-flight WQE, corrupting the ring and breaking progress. Drop the extra + 1 so the reservation limit never exceeds the queue capacity.

-    return pi + ep->sq_wqe_num + 1 - count;
+    return pi + ep->sq_wqe_num - count;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pi = uct_rc_mlx5_gda_parse_cqe(ep, &wqe_cnt, nullptr);
return pi + ep->sq_wqe_num + 1 - count;
}
pi = uct_rc_mlx5_gda_parse_cqe(ep, &wqe_cnt, nullptr);
return pi + ep->sq_wqe_num - count;
}
🤖 Prompt for AI Agents
In src/uct/ib/mlx5/gdaki/gdaki.cuh around lines 133 to 135, the calculation in
uct_rc_mlx5_gda_max_alloc_wqe_base() uses "pi + sq_wqe_num + 1 - count" which
allows sq_rsvd_index to exceed the queue capacity and reuse an in-flight WQE;
remove the stray "+ 1" so the function returns "pi + sq_wqe_num - count" (i.e.,
ensure the reservation limit never exceeds sq_wqe_num) to prevent off-by-one
wraparound and ring corruption.


UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_reserv_wqe_thread(
uct_rc_gdaki_dev_ep_t *ep, unsigned count)
{
/* Do not attempt to reserve if the available space is less than the
* requested count, to avoid starvation of threads trying to rollback the
* reservation with atomicCAS. */
uint64_t max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base(ep, count);
if (ep->sq_rsvd_index > max_wqe_base) {
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
}
Comment on lines +144 to 146
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: race condition: ep->sq_rsvd_index read without atomic protection could see stale value, leading to incorrect reservation checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add READ_ONCE to make sure value is not cached?
lock would bee too expensive in this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it fixed?
I used 2 counters approach to fix these 2 things and make the code simpler. Maybe we can still use that?


/* We own count elements, now can safely increment the reserved index */
return atomicAdd(reinterpret_cast<unsigned long long*>(&ep->sq_rsvd_index),
count);
uint64_t wqe_base = atomicAdd(reinterpret_cast<unsigned long long*>(
&ep->sq_rsvd_index),
static_cast<unsigned long long>(count));

/*
* Attempt to reserve 'count' WQEs by atomically incrementing the reserved
* index. If the reservation exceeds the available space in the work queue,
* enter a rollback loop.
*
* Rollback Logic:
* - Calculate the next potential index (wqe_next) after attempting the
* reservation.
* - Use atomic CAS to check if the current reserved index matches wqe_next.
* If it does, revert the reservation by resetting the reserved index to
* wqe_base.
* - A successful CAS indicates no other thread has modified the reserved
* index, allowing the rollback to complete, and the function returns
* UCT_RC_GDA_RESV_WQE_NO_RESOURCE to signal insufficient resources.
* - If CAS fails, it means another thread has modified the reserved index.
* The loop continues to reevaluate resource availability to determine if
* the reservation can now be satisfied, possibly due to other operations
* freeing up resources.
*/
while (wqe_base > max_wqe_base) {
uint64_t wqe_next = wqe_base + count;
if (atomicCAS(reinterpret_cast<unsigned long long*>(&ep->sq_rsvd_index),
wqe_next, wqe_base) == wqe_next) {
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
}

max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base(ep, count);
}
Comment on lines +171 to +179
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: potential livelock: if max_wqe_base keeps getting updated by concurrent operations, threads could spin indefinitely in this rollback loop without bounded retry limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_wqe_base updated eventually will make condition wqe_base > max_wqe_base false and quit loop


return wqe_base;
}

template<ucs_device_level_t level>
UCS_F_DEVICE void
uct_rc_mlx5_gda_reserv_wqe(uct_rc_gdaki_dev_ep_t *ep, unsigned count,
unsigned lane_id, uint64_t &wqe_base)
{
wqe_base = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally added zero initialization to avoid a crash with syndrome 68

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it cause this crash and how this initialization prevent it?
code looks like it's just overwritten by shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this one was quite tricky, and I also struggled to understand.
So I asked chatgpt and gemini, and both pointed that uninitialized wqe_base leads to UB in some cases:

The CUDA execution model might still produce the correct result most of the time because the __shfl_sync instruction will force the other lanes to wait for lane 0 to arrive. When lane 0 finally executes the shuffle, its value will be correctly broadcast.

However, relying on this implicit synchronization is dangerous and can lead to undefined behavior. The code is not robust because it makes assumptions about instruction scheduling and thread divergence that may not hold true on all GPU architectures or with future compiler versions. The most significant risk is that the compiler might perform optimizations based on the uninitialized value of wqe_base in the non-zero lanes before the shuffle call, leading to incorrect code generation.

This issue was not always reproducible on rock, but quite frequently failed in CI with syndrome 68.
So better keep this change


if (lane_id == 0) {
wqe_base = uct_rc_mlx5_gda_reserv_wqe_thread(ep, count);
}
Expand Down Expand Up @@ -211,7 +281,7 @@ UCS_F_DEVICE void uct_rc_mlx5_gda_db(uct_rc_gdaki_dev_ep_t *ep,
UCS_F_DEVICE bool
uct_rc_mlx5_gda_fc(const uct_rc_gdaki_dev_ep_t *ep, uint16_t wqe_idx)
{
return (wqe_idx & ep->sq_fc_mask) == 1;
return !(wqe_idx & ep->sq_fc_mask);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: flow-control condition inverted from (wqe_idx & mask) == 1 to !(wqe_idx & mask); these produce different true sets unless mask is crafted so that & mask yields only 0 or 1; confirm mask semantics. What is the intended range of ep->sq_fc_mask? If it's a multi-bit mask, the new logic will trigger FC on any zero result, not just when the masked value equals 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flow-control request completion at least on half of work queue size,
each time when sq_fc_mask (=wqe_size/2-1) & wqe_idx equal to same number. comparing to 0 supposed to save explicit comparison instruction

}

template<ucs_device_level_t level>
Expand Down Expand Up @@ -494,82 +564,9 @@ uct_rc_mlx5_gda_qedump(const char *pfx, void *buff, ssize_t len)
}
}

UCS_F_DEVICE void uct_rc_mlx5_gda_progress_thread(uct_rc_gdaki_dev_ep_t *ep)
{
void *cqe = ep->cqe_daddr;
size_t cqe_num = ep->cqe_num;
uint64_t cqe_idx = ep->cqe_ci;
uint32_t idx = cqe_idx & (cqe_num - 1);
void *curr_cqe = (uint8_t*)cqe + (idx * DOCA_GPUNETIO_VERBS_CQE_SIZE);
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(curr_cqe);

/* Read last 3 fields with a single atomic operation */
uint32_t *data_ptr = (uint32_t *)&cqe64->wqe_counter;
uint32_t data = READ_ONCE(*data_ptr);
uint8_t op_owner = data >> 24;
if ((op_owner & MLX5_CQE_OWNER_MASK) ^ !!(cqe_idx & cqe_num)) {
return;
}

cuda::atomic_ref<uint64_t, cuda::thread_scope_device> ref(ep->cqe_ci);
if (!ref.compare_exchange_strong(cqe_idx, cqe_idx + 1,
cuda::std::memory_order_relaxed)) {
return;
}

uint8_t opcode = op_owner >> DOCA_GPUNETIO_VERBS_MLX5_CQE_OPCODE_SHIFT;
uint32_t data_cpu = doca_gpu_dev_verbs_bswap32(data);
uint16_t wqe_cnt = (data_cpu >> 16) & 0xffff;
uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1);

cuda::atomic_ref<uint64_t, cuda::thread_scope_device> pi_ref(ep->sq_wqe_pi);
uint64_t sq_wqe_pi = pi_ref.load(cuda::std::memory_order_relaxed);
uint64_t new_wqe_pi;

do {
/* Skip CQE if it's older than current producer index, could be already
* processed by another thread. This handles CQE wrap-around. */
if ((int16_t)(wqe_cnt - (uint16_t)sq_wqe_pi) < 0) {
return;
}

uint16_t completed_delta = wqe_cnt - (uint16_t)sq_wqe_pi;
new_wqe_pi = sq_wqe_pi + completed_delta + 1;
} while (!pi_ref.compare_exchange_weak(sq_wqe_pi, new_wqe_pi,
cuda::std::memory_order_release,
cuda::std::memory_order_relaxed));

if (opcode == MLX5_CQE_REQ) {
atomicAdd(&ep->avail_count, (int32_t)(new_wqe_pi - sq_wqe_pi));
return;
}

auto err_cqe = reinterpret_cast<mlx5_err_cqe_ex*>(cqe64);
auto wqe_ptr = uct_rc_mlx5_gda_get_wqe_ptr(ep, wqe_idx);
ucs_device_error("CQE[%d] with syndrome:%x vendor:%x hw:%x "
"wqe_idx:0x%x qp:0x%x",
idx, err_cqe->syndrome, err_cqe->vendor_err_synd,
err_cqe->hw_err_synd, wqe_idx,
doca_gpu_dev_verbs_bswap32(err_cqe->s_wqe_opcode_qpn) &
0xffffff);
uct_rc_mlx5_gda_qedump("WQE", wqe_ptr, 64);
uct_rc_mlx5_gda_qedump("CQE", cqe64, 64);
pi_ref.fetch_max(sq_wqe_pi | UCT_RC_GDA_WQE_ERR);
}

template<ucs_device_level_t level>
UCS_F_DEVICE void uct_rc_mlx5_gda_ep_progress(uct_device_ep_h tl_ep)
{
uct_rc_gdaki_dev_ep_t *ep = (uct_rc_gdaki_dev_ep_t*)tl_ep;
unsigned num_lanes;
unsigned lane_id;

uct_rc_mlx5_gda_exec_init<level>(lane_id, num_lanes);
if (lane_id == 0) {
uct_rc_mlx5_gda_progress_thread(ep);
}

uct_rc_mlx5_gda_sync<level>();
}

template<ucs_device_level_t level>
Expand All @@ -578,13 +575,21 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_check_completion(
{
uct_rc_gdaki_dev_ep_t *ep = reinterpret_cast<uct_rc_gdaki_dev_ep_t*>(tl_ep);
uct_rc_gda_completion_t *comp = &tl_comp->rc_gda;
uint64_t sq_wqe_pi = ep->sq_wqe_pi;
uint16_t wqe_cnt;
uint8_t opcode;
uint64_t pi;

pi = uct_rc_mlx5_gda_parse_cqe(ep, &wqe_cnt, &opcode);

if ((sq_wqe_pi & UCT_RC_GDA_WQE_MASK) <= comp->wqe_idx) {
if (pi < comp->wqe_idx) {
return UCS_INPROGRESS;
}

if (sq_wqe_pi & UCT_RC_GDA_WQE_ERR) {
if (opcode == MLX5_CQE_REQ_ERR) {
uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1);
auto wqe_ptr = uct_rc_mlx5_gda_get_wqe_ptr(ep, wqe_idx);
uct_rc_mlx5_gda_qedump("WQE", wqe_ptr, 64);
uct_rc_mlx5_gda_qedump("CQE", ep->cqe_daddr, 64);
return UCS_ERR_IO_ERROR;
}

Expand Down
3 changes: 0 additions & 3 deletions src/uct/ib/mlx5/gdaki/gdaki_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ typedef struct {

uint64_t sq_rsvd_index;
uint64_t sq_ready_index;
uint64_t sq_wqe_pi;
uint64_t cqe_ci;
int sq_lock;

uint8_t *sq_wqe_daddr;
Expand All @@ -31,7 +29,6 @@ typedef struct {
uint16_t sq_wqe_num;
uint32_t sq_num;
uint16_t sq_fc_mask;
int32_t avail_count;
} uct_rc_gdaki_dev_ep_t;


Expand Down
7 changes: 4 additions & 3 deletions test/gtest/ucp/cuda/test_kernels.cu
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ ucp_test_kernel_get_state(const test_ucp_device_kernel_params_t &params,
uct_elem, comp);
if ((status == UCS_OK) &&
(device_ep->uct_tl_id == UCT_DEVICE_TL_RC_MLX5_GDA)) {
uint16_t wqe_cnt;
uct_rc_gdaki_dev_ep_t *ep =
reinterpret_cast<uct_rc_gdaki_dev_ep_t*>(device_ep);
result.producer_index = ep->sq_wqe_pi - result.producer_index;
result.producer_index = uct_rc_mlx5_gda_parse_cqe(ep, &wqe_cnt,
nullptr) +
1;
result.ready_index = ep->sq_ready_index - result.ready_index;
result.avail_count = ep->avail_count - result.avail_count;
}
}

Expand Down Expand Up @@ -242,7 +244,6 @@ launch_test_ucp_device_kernel(const test_ucp_device_kernel_params_t &params)
result->status = UCS_ERR_NOT_IMPLEMENTED;
result->producer_index = 0;
result->ready_index = 0;
result->avail_count = 0;

switch (params.level) {
case UCS_DEVICE_LEVEL_THREAD:
Expand Down
1 change: 0 additions & 1 deletion test/gtest/ucp/cuda/test_kernels.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ struct test_ucp_device_kernel_result_t {
ucs_status_t status;
uint64_t producer_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the producer index and retrieve it from sq_rsvd_index maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could call uct_rc_mlx5_gda_read_cqe/calc_pi here

uint64_t ready_index;
int32_t avail_count;
};

test_ucp_device_kernel_result_t
Expand Down
1 change: 0 additions & 1 deletion test/gtest/ucp/test_ucp_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ class test_ucp_device_kernel : public test_ucp_device {
EXPECT_UCS_OK(result.status);
EXPECT_EQ(expected, result.producer_index);
EXPECT_EQ(expected, result.ready_index);
EXPECT_EQ(0, result.avail_count);
}
};

Expand Down
Loading