@@ -52,13 +52,17 @@ struct Endpoint {
52
52
53
53
int inComingCount = 0 ;
54
54
int kStartDepth = 128 ;
55
- int kRxDepth = 256 ;
55
+ int kRxDepth = 128 ;
56
56
int kReplyDepth = kRxDepth ;
57
57
int kMaxInlineSize ;
58
58
WRContext *rx_ctx;
59
59
WRContext *start_ctx;
60
60
WRContext *reply_ctx;
61
61
62
+ struct ibv_mr *rx_ctx_mr = nullptr ;
63
+ struct ibv_mr *start_ctx_mr = nullptr ;
64
+ struct ibv_mr *reply_ctx_mr = nullptr ;
65
+
62
66
ThreadsafeQueue<WRContext *> free_start_ctx;
63
67
ThreadsafeQueue<WRContext *> free_reply_ctx;
64
68
@@ -94,27 +98,24 @@ struct Endpoint {
94
98
}
95
99
96
100
~Endpoint () {
97
- for (int i = 0 ; i < kRxDepth ; ++i) {
98
- if (!(rx_ctx[i].buffer )) {
99
- continue ;
100
- }
101
- free (rx_ctx[i].buffer ->addr );
102
- PS_CHECK_EQ (ibv_dereg_mr (rx_ctx[i].buffer ), 0 );
101
+ if (rx_ctx_mr) {
102
+ void *buf = rx_ctx_mr->addr ;
103
+ PS_CHECK_EQ (ibv_dereg_mr (rx_ctx_mr), 0 );
104
+ free (buf);
103
105
}
104
106
105
- for (int i = 0 ; i < kStartDepth ; ++i) {
106
- if (start_ctx[i].buffer ) {
107
- free (start_ctx[i].buffer ->addr );
108
- PS_CHECK_EQ (ibv_dereg_mr (start_ctx[i].buffer ), 0 );
109
- }
107
+ if (start_ctx_mr) {
108
+ void *buf = start_ctx_mr->addr ;
109
+ PS_CHECK_EQ (ibv_dereg_mr (start_ctx_mr), 0 );
110
+ free (buf);
110
111
}
111
112
112
- for (int i = 0 ; i < kReplyDepth ; ++i) {
113
- if (reply_ctx[i].buffer ) {
114
- free (reply_ctx[i].buffer ->addr );
115
- PS_CHECK_EQ (ibv_dereg_mr (reply_ctx[i].buffer ), 0 );
116
- }
113
+ if (reply_ctx_mr) {
114
+ void *buf = reply_ctx_mr->addr ;
115
+ PS_CHECK_EQ (ibv_dereg_mr (reply_ctx_mr), 0 );
116
+ free (buf);
117
117
}
118
+
118
119
FOR_QPS {
119
120
rdma_destroy_qp (cm_ids[qpIndex]);
120
121
PS_CHECK_EQ (rdma_destroy_id (cm_ids[qpIndex]), 0 ) << strerror (errno);
@@ -150,24 +151,24 @@ struct Endpoint {
150
151
151
152
void SetNodeID (int id) { node_id = id; }
152
153
153
- void InitSendContextHelper (struct ibv_pd *pd, WRContext *ctx,
154
- ThreadsafeQueue<WRContext *> *queue, size_t num,
155
- WRContextType type) {
156
- for (size_t i = 0 ; i < num; ++i) {
157
- void *buf;
158
- aligned_malloc (reinterpret_cast <void **>(&buf), kMempoolChunkSize );
159
- PS_CHECK (buf);
160
- struct ibv_mr *mr = ibv_reg_mr (pd, buf, kMempoolChunkSize , 0 );
161
- PS_CHECK (mr)
162
- << " ibv_reg_mr failed: " << strerror (errno)
163
- << " \n You can try to reduce BYTEPS_RDMA_START_DEPTH (current "
164
- << kStartDepth << " ) or BYTEPS_RDMA_RX_DEPTH (current " << kRxDepth
165
- << " )." ;
154
+ void InitWRContextHelper (struct ibv_pd *pd, WRContext *ctx,
155
+ size_t num, WRContextType type,
156
+ struct ibv_mr **mr, unsigned int access,
157
+ ThreadsafeQueue<WRContext *> *queue = nullptr ) {
158
+ char *buf;
159
+ aligned_malloc (reinterpret_cast <void **>(&buf), kMempoolChunkSize * num);
160
+ PS_CHECK (buf);
161
+ *mr = ibv_reg_mr (pd, buf, kMempoolChunkSize * num, access);
162
+ PS_CHECK (*mr) << " ibv_reg_mr failed: " << strerror (errno);
166
163
164
+ for (size_t i = 0 ; i < num; ++i) {
167
165
ctx[i].type = type;
168
- ctx[i].buffer = mr;
166
+ ctx[i].buffer = buf + i * kMempoolChunkSize ;
167
+ ctx[i].ref_mr = *mr;
169
168
ctx[i].private_data = this ;
170
- queue->Push (&ctx[i]);
169
+ if (queue) {
170
+ queue->Push (&ctx[i]);
171
+ }
171
172
}
172
173
}
173
174
@@ -192,32 +193,20 @@ struct Endpoint {
192
193
<< " , qp=" << id->qp ->qp_num << " , maxInline=" << kMaxInlineSize ;
193
194
if (inited == 0 ) {
194
195
rdma_provider = provider;
195
- InitSendContextHelper (pd, start_ctx, &free_start_ctx, kStartDepth ,
196
- kRendezvousStartContext );
197
- InitSendContextHelper (pd, reply_ctx, &free_reply_ctx, kReplyDepth ,
198
- kRendezvousReplyContext );
196
+ InitWRContextHelper (pd, start_ctx, kStartDepth , kRendezvousStartContext ,
197
+ &start_ctx_mr, 0 , &free_start_ctx);
198
+ InitWRContextHelper (pd, reply_ctx, kReplyDepth , kRendezvousReplyContext ,
199
+ &reply_ctx_mr, 0 , &free_reply_ctx);
200
+ InitWRContextHelper (pd, rx_ctx, kRxDepth , kReceiveContext , &rx_ctx_mr,
201
+ IBV_ACCESS_LOCAL_WRITE);
199
202
}
200
203
204
+ // As only one QP will use the ctx buffer, other QPs just for imm receive.
205
+ // It is OK for all QPs to repeate post recv the rx_ctx. The same buffers
206
+ // but more rqe.
201
207
for (int i = 0 ; i < kRxDepth ; ++i) {
202
- if (inited == 0 ) {
203
- void *buf;
204
- aligned_malloc (reinterpret_cast <void **>(&buf), kMempoolChunkSize );
205
- PS_CHECK (buf);
206
- struct ibv_mr *mr =
207
- ibv_reg_mr (pd, buf, kMempoolChunkSize , IBV_ACCESS_LOCAL_WRITE);
208
- PS_CHECK (mr)
209
- << " ibv_reg_mr failed: " << strerror (errno)
210
- << " \n You can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)"
211
- << " or BYTEPS_RDMA_RX_DEPTH (default 2048)" ;
212
-
213
- rx_ctx[i].type = kReceiveContext ;
214
- rx_ctx[i].buffer = mr;
215
- rx_ctx[i].private_data = this ;
216
- }
217
- }
218
- for (int i = 0 ; i < kRxDepth / QP_NUM; ++i) {
219
208
if (inited < QP_NUM) {
220
- PostRecv (&rx_ctx[i + inited * QP_NUM ], id);
209
+ PostRecv (&rx_ctx[i], id);
221
210
}
222
211
}
223
212
inited++;
@@ -247,9 +236,9 @@ struct Endpoint {
247
236
memset (&wr, 0 , sizeof (wr));
248
237
249
238
struct ibv_sge sge;
250
- sge.addr = reinterpret_cast <uint64_t >(ctx->buffer -> addr );
239
+ sge.addr = reinterpret_cast <uint64_t >(ctx->buffer );
251
240
sge.length = kMempoolChunkSize ;
252
- sge.lkey = ctx->buffer ->lkey ;
241
+ sge.lkey = ctx->ref_mr ->lkey ;
253
242
254
243
wr.wr_id = reinterpret_cast <uint64_t >(ctx);
255
244
wr.next = nullptr ;
@@ -395,7 +384,7 @@ class RDMATransport : public Transport {
395
384
endpoint_->free_start_ctx .WaitAndPop (&context);
396
385
397
386
RendezvousStart *req =
398
- reinterpret_cast <RendezvousStart *>(context->buffer -> addr );
387
+ reinterpret_cast <RendezvousStart *>(context->buffer );
399
388
req->meta_len = msg_buf->inline_len ;
400
389
req->origin_addr = reinterpret_cast <uint64_t >(msg_buf);
401
390
req->data_num = msg_buf->data .size ();
@@ -406,7 +395,7 @@ class RDMATransport : public Transport {
406
395
407
396
struct ibv_sge sge;
408
397
sge.addr = reinterpret_cast <uint64_t >(req);
409
- sge.lkey = context->buffer ->lkey ;
398
+ sge.lkey = context->ref_mr ->lkey ;
410
399
sge.length = sizeof (RendezvousStart);
411
400
412
401
struct ibv_send_wr wr, *bad_wr = nullptr ;
@@ -475,7 +464,7 @@ class RDMATransport : public Transport {
475
464
WRContext *reply_ctx_ptr = nullptr ;
476
465
endpoint_->free_reply_ctx .WaitAndPop (&reply_ctx_ptr);
477
466
auto *resp =
478
- reinterpret_cast <RendezvousReply *>(reply_ctx_ptr->buffer -> addr );
467
+ reinterpret_cast <RendezvousReply *>(reply_ctx_ptr->buffer );
479
468
480
469
// Populate reply with addresses and rkeys for both buffers
481
470
resp->meta_addr = reinterpret_cast <uint64_t >(buf_ctx->meta_buffer );
@@ -500,7 +489,7 @@ class RDMATransport : public Transport {
500
489
struct ibv_sge sge;
501
490
sge.addr = reinterpret_cast <uint64_t >(resp);
502
491
sge.length = sizeof (RendezvousReply);
503
- sge.lkey = reply_ctx_ptr->buffer ->lkey ;
492
+ sge.lkey = reply_ctx_ptr->ref_mr ->lkey ;
504
493
struct ibv_send_wr wr, *bad_wr = nullptr ;
505
494
memset (&wr, 0 , sizeof (wr));
506
495
wr.wr_id = reinterpret_cast <uint64_t >(reply_ctx_ptr);
@@ -528,7 +517,7 @@ class RDMATransport : public Transport {
528
517
WRContext *reply_ctx_ptr = nullptr ;
529
518
endpoint_->free_reply_ctx .WaitAndPop (&reply_ctx_ptr);
530
519
RendezvousReply *resp =
531
- reinterpret_cast <RendezvousReply *>(reply_ctx_ptr->buffer -> addr );
520
+ reinterpret_cast <RendezvousReply *>(reply_ctx_ptr->buffer );
532
521
533
522
// In GDR mode, client still uses single buffer logic,
534
523
// so we populate the single addr/rkey
@@ -547,7 +536,7 @@ class RDMATransport : public Transport {
547
536
struct ibv_sge sge;
548
537
sge.addr = reinterpret_cast <uint64_t >(resp);
549
538
sge.length = sizeof (RendezvousReply);
550
- sge.lkey = reply_ctx_ptr->buffer ->lkey ;
539
+ sge.lkey = reply_ctx_ptr->ref_mr ->lkey ;
551
540
struct ibv_send_wr wr, *bad_wr = nullptr ;
552
541
memset (&wr, 0 , sizeof (wr));
553
542
wr.wr_id = reinterpret_cast <uint64_t >(reply_ctx_ptr);
0 commit comments