Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/RDMACM: Cache ibv_query_port #258

Open
wants to merge 1 commit into
base: integration3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/uct/ib/base/ib_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ ucs_status_t uct_ib_device_query(uct_ib_device_t *dev,
break;
case IBV_NODE_CA:
default:
dev->first_port = 1;
dev->first_port = UCT_IB_FIRST_PORT;
dev->num_ports = IBV_DEV_ATTR(dev, phys_port_cnt);
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/uct/ib/base/ib_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#define UCT_IB_MAX_MESSAGE_SIZE (2UL << 30) /* Maximal IB message size */
#define UCT_IB_PKEY_PARTITION_MASK 0x7fff /* IB partition number mask */
#define UCT_IB_PKEY_MEMBERSHIP_MASK 0x8000 /* Full/send-only member */
#define UCT_IB_FIRST_PORT 1
#define UCT_IB_DEV_MAX_PORTS 2
#define UCT_IB_FABRIC_TIME_MAX 32
#define UCT_IB_INVALID_RKEY 0xffffffffu
Expand Down
142 changes: 104 additions & 38 deletions src/uct/ib/rdmacm/rdmacm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,52 +61,114 @@ ucs_status_t uct_rdmacm_cm_reject(struct rdma_cm_id *id)
return UCS_OK;
}

ucs_status_t uct_rdmacm_cm_get_cq(uct_rdmacm_cm_t *cm, struct ibv_context *verbs,
struct ibv_cq **cq_p)
static ucs_status_t
uct_rdmacm_cm_device_context_init(uct_rdmacm_cm_device_context_t *ctx,
uct_rdmacm_cm_t *cm,
struct ibv_context *verbs)
{
const char *dev_name = ibv_get_device_name(verbs->device);
struct ibv_port_attr port_attr;
struct ibv_device_attr dev_attr;
int ret;
int i;

ret = ibv_query_device(verbs, &dev_attr);
if (ret != 0) {
ucs_error("ibv_query_device(%s) failed: %m", dev_name);
return UCS_ERR_IO_ERROR;
}

ctx->eth_ports = 0;
for (i = 0; i < dev_attr.phys_port_cnt; ++i) {
ret = ibv_query_port(verbs, i + UCT_IB_FIRST_PORT, &port_attr);
if (ret != 0) {
ucs_error("ibv_query_port (%s) failed: %m", dev_name);
return UCS_ERR_IO_ERROR;
}

if (IBV_PORT_IS_LINK_LAYER_ETHERNET(&port_attr)) {
ctx->eth_ports |= UCS_BIT(i);
}
}

/* Create a dummy completion queue */
ctx->cq = ibv_create_cq(verbs, 1, NULL, NULL, 0);
if (ctx->cq == NULL) {
ucs_error("ibv_create_cq(%s) failed: %m", dev_name);
return UCS_ERR_IO_ERROR;
}

return UCS_OK;
}

ucs_status_t
uct_rdmacm_cm_get_device_context(uct_rdmacm_cm_t *cm, struct ibv_context *verbs,
uct_rdmacm_cm_device_context_t **ctx_p)
{
struct ibv_cq *cq;
uct_rdmacm_cm_device_context_t *ctx;
ucs_status_t status;
khiter_t iter;
int ret;

iter = kh_put(uct_rdmacm_cm_cqs, &cm->cqs,
iter = kh_put(uct_rdmacm_cm_device_contexts, &cm->ctxs,
ibv_get_device_guid(verbs->device), &ret);
if (ret == -1) {
ucs_error("cm %p: cannot allocate hash entry for CQ", cm);
return UCS_ERR_NO_MEMORY;
ucs_error("cm %p: cannot allocate hash entry for device context", cm);
status = UCS_ERR_NO_MEMORY;
goto out;
}

if (ret == 0) {
/* already exists so use it */
cq = kh_value(&cm->cqs, iter);
ctx = kh_value(&cm->ctxs, iter);
} else {
/* Create a dummy completion queue */
cq = ibv_create_cq(verbs, 1, NULL, NULL, 0);
if (cq == NULL) {
kh_del(uct_rdmacm_cm_cqs, &cm->cqs, iter);
ucs_error("ibv_create_cq() failed: %m");
return UCS_ERR_IO_ERROR;
/* Create a qp context */
ctx = ucs_malloc(sizeof(*ctx), "rdmacm_device_context");
if (ctx == NULL) {
ucs_error("cm %p: failed to allocate device context", cm);
status = UCS_ERR_NO_MEMORY;
goto err_kh_del;
}

status = uct_rdmacm_cm_device_context_init(ctx, cm, verbs);
if (status != UCS_OK) {
goto err_free_ctx;
}

kh_value(&cm->cqs, iter) = cq;
kh_value(&cm->ctxs, iter) = ctx;
}

*cq_p = cq;
*ctx_p = ctx;
return UCS_OK;
err_free_ctx:
ucs_free(ctx);
err_kh_del:
kh_del(uct_rdmacm_cm_device_contexts, &cm->ctxs, iter);
out:
return status;
}

void uct_rdmacm_cm_cqs_cleanup(uct_rdmacm_cm_t *cm)
static void
uct_rdmacm_cm_device_context_cleanup(uct_rdmacm_cm_device_context_t *ctx)
{
struct ibv_cq *cq;
int ret;

kh_foreach_value(&cm->cqs, cq, {
ret = ibv_destroy_cq(cq);
if (ret != 0) {
ucs_warn("ibv_destroy_cq() returned %d: %m", ret);
}
ret = ibv_destroy_cq(ctx->cq);
if (ret != 0) {
ucs_warn("ibv_destroy_cq() returned %d: %m", ret);
}
}

static void uct_rdmacm_cm_cleanup_devices(uct_rdmacm_cm_t *cm)
{
uct_rdmacm_cm_device_context_t *ctx;

kh_foreach_value(&cm->ctxs, ctx, {
uct_rdmacm_cm_device_context_cleanup(ctx);
ucs_free(ctx);
});

kh_destroy_inplace(uct_rdmacm_cm_cqs, &cm->cqs);
kh_destroy_inplace(uct_rdmacm_cm_device_contexts, &cm->ctxs);
}

size_t uct_rdmacm_cm_get_max_conn_priv()
Expand Down Expand Up @@ -186,20 +248,21 @@ static void uct_rdmacm_cm_handle_event_route_resolved(struct rdma_cm_event *even
}
}

static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(struct rdma_cm_id *cm_id,
static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(uct_rdmacm_cm_t *cm,
struct rdma_cm_id *cm_id,
uct_device_addr_t **dev_addr_p,
size_t *dev_addr_len_p)
{
uct_rdmacm_cm_device_context_t *ctx;
char rdmacm_gid_str[64], qp_attr_gid_str[64];
struct ibv_port_attr port_attr;
uct_ib_address_t *dev_addr;
struct ibv_qp_attr qp_attr;
size_t addr_length;
int qp_attr_mask;
char dev_name[UCT_DEVICE_NAME_MAX];
uct_ib_roce_version_info_t roce_info;
unsigned address_pack_flags;
union ibv_gid gid;
ucs_status_t status;
int ret;

/* get the qp attributes in order to modify the qp state.
Expand All @@ -214,10 +277,9 @@ static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(struct rdma_cm_id *cm_id,
return UCS_ERR_IO_ERROR;
}

if (ibv_query_port(cm_id->verbs, cm_id->port_num, &port_attr)) {
uct_rdmacm_cm_id_to_dev_name(cm_id, dev_name);
ucs_error("ibv_query_port (%s) failed: %m", dev_name);
return UCS_ERR_IO_ERROR;
status = uct_rdmacm_cm_get_device_context(cm, cm_id->pd->context, &ctx);
if (status != UCS_OK) {
return status;
}

/* Print diagnostic if gid does not match */
Expand All @@ -231,7 +293,7 @@ static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(struct rdma_cm_id *cm_id,
qp_attr_gid_str, sizeof(qp_attr_gid_str)));
}

if (IBV_PORT_IS_LINK_LAYER_ETHERNET(&port_attr)) {
if (ctx->eth_ports & UCS_BIT(cm_id->port_num - UCT_IB_FIRST_PORT)) {
/* Ethernet address */
ucs_assert(qp_attr.ah_attr.is_global);
gid = qp_attr.ah_attr.grh.dgid;
Expand Down Expand Up @@ -275,7 +337,9 @@ static ucs_status_t uct_rdmacm_cm_id_to_dev_addr(struct rdma_cm_id *cm_id,
return UCS_OK;
}

static void uct_rdmacm_cm_handle_event_connect_request(struct rdma_cm_event *event)
static void
uct_rdmacm_cm_handle_event_connect_request(uct_rdmacm_cm_t *cm,
struct rdma_cm_event *event)
{
uct_rdmacm_priv_data_hdr_t *hdr = (uct_rdmacm_priv_data_hdr_t *)
event->param.conn.private_data;
Expand All @@ -293,7 +357,8 @@ static void uct_rdmacm_cm_handle_event_connect_request(struct rdma_cm_event *eve

uct_rdmacm_cm_id_to_dev_name(event->id, dev_name);

status = uct_rdmacm_cm_id_to_dev_addr(event->id, &dev_addr, &addr_length);
status = uct_rdmacm_cm_id_to_dev_addr(cm, event->id, &dev_addr,
&addr_length);
if (status != UCS_OK) {
goto err;
}
Expand Down Expand Up @@ -364,7 +429,8 @@ static void uct_rdmacm_cm_handle_event_connect_response(struct rdma_cm_event *ev
remote_data.conn_priv_data = hdr + 1;
remote_data.conn_priv_data_length = hdr->length;

status = uct_rdmacm_cm_id_to_dev_addr(event->id, &dev_addr, &addr_length);
status = uct_rdmacm_cm_id_to_dev_addr(uct_rdmacm_cm_ep_get_cm(cep),
event->id, &dev_addr, &addr_length);
if (status != UCS_OK) {
ucs_error("%s: client (ep=%p id=%p) failed to process a connect response",
uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
Expand Down Expand Up @@ -487,7 +553,7 @@ uct_rdmacm_cm_process_event(uct_rdmacm_cm_t *cm, struct rdma_cm_event *event)
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
/* Server side event */
uct_rdmacm_cm_handle_event_connect_request(event);
uct_rdmacm_cm_handle_event_connect_request(cm, event);
/* The server will ack the event after accepting/rejecting the request
* (in ep_create). */
ack_event = 0;
Expand Down Expand Up @@ -651,9 +717,9 @@ UCS_CLASS_INIT_FUNC(uct_rdmacm_cm_t, uct_component_h component,
&uct_rdmacm_cm_iface_ops, worker, component,
config);

kh_init_inplace(uct_rdmacm_cm_cqs, &self->cqs);
kh_init_inplace(uct_rdmacm_cm_device_contexts, &self->ctxs);

self->ev_ch = rdma_create_event_channel();
self->ev_ch = rdma_create_event_channel();
if (self->ev_ch == NULL) {
ucs_error("rdma_create_event_channel failed: %m");
status = UCS_ERR_IO_ERROR;
Expand Down Expand Up @@ -711,7 +777,7 @@ UCS_CLASS_CLEANUP_FUNC(uct_rdmacm_cm_t)

ucs_trace("destroying event_channel %p on cm %p", self->ev_ch, self);
rdma_destroy_event_channel(self->ev_ch);
uct_rdmacm_cm_cqs_cleanup(self);
uct_rdmacm_cm_cleanup_devices(self);
}

UCS_CLASS_DEFINE(uct_rdmacm_cm_t, uct_cm_t);
Expand Down
23 changes: 16 additions & 7 deletions src/uct/ib/rdmacm/rdmacm_cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
#include <ucs/datastruct/khash.h>


KHASH_MAP_INIT_INT64(uct_rdmacm_cm_cqs, struct ibv_cq*);
KHASH_MAP_INIT_INT64(uct_rdmacm_cm_device_contexts,
struct uct_rdmacm_cm_device_context*);


/**
* An rdmacm connection manager
*/
typedef struct uct_rdmacm_cm {
uct_cm_t super;
struct rdma_event_channel *ev_ch;
khash_t(uct_rdmacm_cm_cqs) cqs;
uct_cm_t super;
struct rdma_event_channel *ev_ch;
khash_t(uct_rdmacm_cm_device_contexts) ctxs;

struct {
struct sockaddr *src_addr;
struct sockaddr *src_addr;
} config;
} uct_rdmacm_cm_t;

Expand All @@ -36,6 +37,12 @@ typedef struct uct_rdmacm_cm_config {
} uct_rdmacm_cm_config_t;


typedef struct uct_rdmacm_cm_device_context {
struct ibv_cq *cq;
uint8_t eth_ports;
} uct_rdmacm_cm_device_context_t;


UCS_CLASS_DECLARE_NEW_FUNC(uct_rdmacm_cm_t, uct_cm_t, uct_component_h,
uct_worker_h, const uct_cm_config_t *);
UCS_CLASS_DECLARE_DELETE_FUNC(uct_rdmacm_cm_t, uct_cm_t);
Expand All @@ -55,8 +62,10 @@ ucs_status_t uct_rdmacm_cm_ack_event(struct rdma_cm_event *event);

ucs_status_t uct_rdmacm_cm_reject(struct rdma_cm_id *id);

ucs_status_t uct_rdmacm_cm_get_cq(uct_rdmacm_cm_t *cm, struct ibv_context *verbs,
struct ibv_cq **cq);
ucs_status_t
uct_rdmacm_cm_get_device_context(uct_rdmacm_cm_t *cm,
struct ibv_context *verbs,
uct_rdmacm_cm_device_context_t **ctx_p);

void uct_rdmacm_cm_cqs_cleanup(uct_rdmacm_cm_t *cm);

Expand Down
8 changes: 4 additions & 4 deletions src/uct/ib/rdmacm/rdmacm_cm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,17 @@ ucs_status_t
uct_rdamcm_cm_ep_set_qp_num(struct rdma_conn_param *conn_param,
uct_rdmacm_cm_ep_t *cep)
{
struct ibv_cq *cq;
uct_rdmacm_cm_device_context_t *ctx;
ucs_status_t status;

status = uct_rdmacm_cm_get_cq(uct_rdmacm_cm_ep_get_cm(cep), cep->id->verbs,
&cq);
status = uct_rdmacm_cm_get_device_context(uct_rdmacm_cm_ep_get_cm(cep),
cep->id->verbs, &ctx);
if (status != UCS_OK) {
return status;
}

/* create a dummy qp in order to get a unique qp_num to provide to librdmacm */
status = uct_rdmacm_cm_create_dummy_qp(cep->id, cq, &cep->qp);
status = uct_rdmacm_cm_create_dummy_qp(cep->id, ctx->cq, &cep->qp);
if (status != UCS_OK) {
return status;
}
Expand Down