Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/IB/EFA/SRD: Initial add of endpoint #10447

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion src/uct/ib/efa/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ libuct_ib_efa_la_LDFLAGS = $(EFA_LIB) $(IBVERBS_LDFLAGS) \
-version-info $(SOVERSION)

libuct_ib_efa_la_SOURCES = base/ib_efa_md.c \
srd/srd_iface.c
srd/srd_iface.c \
srd/srd_ep.c

noinst_HEADERS = base/ib_efa.h \
srd/srd_iface.h \
srd/srd_ep.h \
srd/srd_def.h

PKG_CONFIG_NAME=ib-efa
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/efa/srd/srd_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include <ucs/datastruct/frag_list.h>


typedef ucs_frag_list_sn_t uct_srd_psn_t;


typedef struct uct_srd_neth {
uint32_t packet_type;
} UCS_S_PACKED uct_srd_neth_t;
Expand All @@ -26,4 +29,10 @@ typedef struct uct_srd_ep_addr {
uct_ib_uint24_t ep_id;
} uct_srd_ep_addr_t;


typedef struct uct_srd_ep_peer_address {
uint32_t dest_qpn;
struct ibv_ah *ah;
yosefe marked this conversation as resolved.
Show resolved Hide resolved
} uct_srd_ep_peer_address_t;

#endif
200 changes: 200 additions & 0 deletions src/uct/ib/efa/srd/srd_ep.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2025. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "srd_ep.h"
#include "srd_iface.h"


static void uct_srd_ep_set_dest_ep_id(uct_srd_ep_t *ep, uint32_t dest_id)
{
ucs_assert(dest_id != UCT_SRD_EP_NULL_ID);
ep->dest_ep_id = dest_id;
ep->flags |= UCT_SRD_EP_FLAG_CONNECTED;
}

static void uct_srd_ep_reset(uct_srd_ep_t *ep)
{
ep->tx.psn = UCT_SRD_INITIAL_PSN;
ep->rx_creq_count = 0;
ucs_frag_list_init(ep->tx.psn - 1, &ep->rx.ooo_pkts,
-1 UCS_STATS_ARG(ep->super.stats));
}

static void *uct_srd_ep_get_peer_address(uct_srd_ep_t *srd_ep)
{
uct_srd_ep_t *ep = ucs_derived_of(srd_ep, uct_srd_ep_t);
brminich marked this conversation as resolved.
Show resolved Hide resolved
return &ep->peer_address;
}

static UCS_CLASS_INIT_FUNC(uct_srd_ep_t, const uct_ep_params_t *params)
{
uct_srd_iface_t *iface = ucs_derived_of(params->iface, uct_srd_iface_t);

memset(self, 0, sizeof(*self));
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);

self->dest_ep_id = UCT_SRD_EP_NULL_ID;
self->path_index = UCT_EP_PARAMS_GET_PATH_INDEX(params);

uct_srd_ep_reset(self);
uct_srd_iface_add_ep(iface, self);
ucs_debug("created ep ep=%p iface=%p ep_id=%d", self, iface, self->ep_id);

return UCS_OK;
}

static UCS_CLASS_CLEANUP_FUNC(uct_srd_ep_t)
{
uct_srd_iface_t *iface = ucs_derived_of(self->super.super.iface,
uct_srd_iface_t);
uct_srd_iface_remove_ep(iface, self);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
ucs_frag_list_cleanup(&self->rx.ooo_pkts);
}

UCS_CLASS_DEFINE(uct_srd_ep_t, uct_base_ep_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_srd_ep_t, uct_ep_t, const uct_ep_params_t*);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_srd_ep_t, uct_ep_t);

ucs_status_t
uct_srd_ep_connect_to_ep_v2(uct_ep_h tl_ep, const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *uct_ep_addr,
const uct_ep_connect_to_ep_params_t *params)
{
uct_srd_ep_t *ep = ucs_derived_of(tl_ep, uct_srd_ep_t);
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t*)dev_addr;
const uct_srd_ep_addr_t *ep_addr = (const uct_srd_ep_addr_t*)uct_ep_addr;
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
void *peer_address;
char buf[128];

ucs_assert_always(ep->dest_ep_id == UCT_SRD_EP_NULL_ID);
ucs_trace_func("");

uct_srd_ep_set_dest_ep_id(ep, uct_ib_unpack_uint24(ep_addr->ep_id));

ucs_frag_list_cleanup(&ep->rx.ooo_pkts);
brminich marked this conversation as resolved.
Show resolved Hide resolved
uct_srd_ep_reset(ep);
yosefe marked this conversation as resolved.
Show resolved Hide resolved

ucs_debug(UCT_IB_IFACE_FMT " slid %d qpn 0x%x epid %u connected to %s "
"qpn 0x%x epid %u",
UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(ep_addr->iface_addr.qp_num), ep->dest_ep_id);

peer_address = uct_srd_ep_get_peer_address(ep);
return uct_srd_iface_unpack_peer_address(iface, ib_addr,
&ep_addr->iface_addr,
ep->path_index, peer_address);
}

static ucs_status_t
uct_srd_ep_connect_to_iface(uct_srd_ep_t *ep, const uct_ib_address_t *ib_addr,
const uct_srd_iface_addr_t *if_addr)
{
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
char buf[128];

ucs_debug(UCT_IB_IFACE_FMT " lid %d qpn 0x%x epid %u ep %p connected to "
"IFACE %s qpn 0x%x",
UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id, ep,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(if_addr->qp_num));

return UCS_OK;
}

ucs_status_t uct_srd_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
uct_srd_ep_t *ep = ucs_derived_of(tl_ep, uct_srd_ep_t);
uct_srd_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_srd_iface_t);
uct_srd_ep_addr_t *ep_addr = (uct_srd_ep_addr_t*)addr;

uct_ib_pack_uint24(ep_addr->iface_addr.qp_num, iface->qp->qp_num);
uct_ib_pack_uint24(ep_addr->ep_id, ep->ep_id);
return UCS_OK;
}

static ucs_status_t
uct_srd_ep_create_connected(const uct_ep_params_t *ep_params,
uct_ep_h *new_ep_p)
{
uct_srd_iface_t *iface = ucs_derived_of(ep_params->iface,
uct_srd_iface_t);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t*)
ep_params->dev_addr;
const uct_srd_iface_addr_t *if_addr = (const uct_srd_iface_addr_t*)
ep_params->iface_addr;
int path_index = UCT_EP_PARAMS_GET_PATH_INDEX(
yosefe marked this conversation as resolved.
Show resolved Hide resolved
ep_params);
/* FIXME: Add CEP connection management */
static uct_srd_ep_conn_sn_t conn_sn;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need CEP, maybe connect using UD as AUX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did not understand, is there any related example i can check?

uct_ep_params_t params;
ucs_status_t status;
uct_srd_ep_t *ep;
uct_ep_h new_ep_h;

*new_ep_p = NULL;
conn_sn++;
yosefe marked this conversation as resolved.
Show resolved Hide resolved

/* First create endpoint */
params.field_mask = UCT_EP_PARAM_FIELD_IFACE |
UCT_EP_PARAM_FIELD_PATH_INDEX;
params.iface = &iface->super.super.super;
params.path_index = path_index;

status = uct_ep_create(&params, &new_ep_h);
if (status != UCS_OK) {
yosefe marked this conversation as resolved.
Show resolved Hide resolved
return status;
}

ep = ucs_derived_of(new_ep_h, uct_srd_ep_t);
ep->conn_sn = conn_sn;

/* Connect it to the interface */
status = uct_srd_ep_connect_to_iface(ep, ib_addr, if_addr);
if (status != UCS_OK) {
goto err_ep_destroy;
}

/* Generate peer address */
status = uct_srd_iface_unpack_peer_address(iface, ib_addr, if_addr,
ep->path_index,
uct_srd_ep_get_peer_address(ep));
if (status != UCS_OK) {
goto err_ep_destroy;
}

*new_ep_p = &ep->super.super;
return status;

err_ep_destroy:
uct_ep_destroy(&ep->super.super);
return status;
}

ucs_status_t uct_srd_ep_create(const uct_ep_params_t *params, uct_ep_h *ep_p)
{
if (ucs_test_all_flags(params->field_mask,
UCT_EP_PARAM_FIELD_DEV_ADDR |
UCT_EP_PARAM_FIELD_IFACE_ADDR)) {
return uct_srd_ep_create_connected(params, ep_p);
}

return UCS_CLASS_NEW_FUNC_NAME(uct_srd_ep_t)(params, ep_p);
yosefe marked this conversation as resolved.
Show resolved Hide resolved
}
59 changes: 59 additions & 0 deletions src/uct/ib/efa/srd/srd_ep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2025. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCT_SRD_EP_H
#define UCT_SRD_EP_H

#include "srd_def.h"


#define UCT_SRD_INITIAL_PSN 1
#define UCT_SRD_EP_NULL_ID ((1 << 24) - 1)
yosefe marked this conversation as resolved.
Show resolved Hide resolved

enum {
UCT_SRD_EP_FLAG_DISCONNECTED = UCS_BIT(0), /* EP was disconnected */
yosefe marked this conversation as resolved.
Show resolved Hide resolved
UCT_SRD_EP_FLAG_PRIVATE = UCS_BIT(1), /* EP jwas created as internal */
brminich marked this conversation as resolved.
Show resolved Hide resolved
UCT_SRD_EP_FLAG_HAS_PENDING = UCS_BIT(2), /* EP has some pending requests */
UCT_SRD_EP_FLAG_CONNECTED = UCS_BIT(3), /* EP is connected to the peer */
};

typedef uint32_t uct_srd_ep_conn_sn_t;

typedef struct uct_srd_ep {
uct_base_ep_t super;
uct_srd_ep_conn_sn_t conn_sn;
yosefe marked this conversation as resolved.
Show resolved Hide resolved

uint16_t flags;
uint32_t ep_id;
uint32_t dest_ep_id;
uint8_t path_index;
uct_srd_ep_peer_address_t peer_address;

struct {
uct_srd_psn_t psn; /* Next PSN to send */
} tx;
struct {
ucs_frag_list_t ooo_pkts; /* Out of order packets that
can not be processed yet */
} rx;

/* connection sequence number. assigned in connect_to_iface() */
uint8_t rx_creq_count;
} uct_srd_ep_t;


ucs_status_t uct_srd_ep_create(const uct_ep_params_t *params, uct_ep_h *ep_p);
ucs_status_t uct_srd_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr);
ucs_status_t
uct_srd_ep_connect_to_ep_v2(uct_ep_h tl_ep, const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *uct_ep_addr,
const uct_ep_connect_to_ep_params_t *params);
void uct_srd_ep_destroy(uct_ep_h ep);

UCS_CLASS_DECLARE_NEW_FUNC(uct_srd_ep_t, uct_ep_t, const uct_ep_params_t*);
UCS_CLASS_DECLARE_DELETE_FUNC(uct_srd_ep_t, uct_ep_t);

#endif
52 changes: 47 additions & 5 deletions src/uct/ib/efa/srd/srd_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,48 @@

static uct_iface_ops_t uct_srd_iface_tl_ops;

void uct_srd_iface_add_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep)
{
ep->ep_id = ucs_ptr_array_insert(&iface->eps, ep);
}

void uct_srd_iface_remove_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep)
{
ucs_assertv(ep->ep_id != UCT_UD_EP_NULL_ID, "iface=%p ep=%p ep_id=%d",
iface, ep, ep->ep_id);

ucs_trace("iface(%p) remove ep=%p ep_id=%d", iface, ep, ep->ep_id);
brminich marked this conversation as resolved.
Show resolved Hide resolved
ucs_ptr_array_remove(&iface->eps, ep->ep_id);
}

ucs_status_t
uct_srd_iface_unpack_peer_address(uct_srd_iface_t *iface,
const uct_ib_address_t *ib_addr,
const uct_srd_iface_addr_t *if_addr,
int path_index, void *address_p)
{
uct_ib_iface_t *ib_iface = &iface->super;
uct_srd_ep_peer_address_t *peer_address = (uct_srd_ep_peer_address_t*)
address_p;
struct ibv_ah_attr ah_attr;
enum ibv_mtu path_mtu;
ucs_status_t status;

memset(peer_address, 0, sizeof(*peer_address));

uct_ib_iface_fill_ah_attr_from_addr(ib_iface, ib_addr, path_index, &ah_attr,
&path_mtu);
status = uct_ib_iface_create_ah(ib_iface, &ah_attr, "SRD connect",
&peer_address->ah);
if (status != UCS_OK) {
return status;
}

peer_address->dest_qpn = uct_ib_unpack_uint24(if_addr->qp_num);

return UCS_OK;
}

ucs_status_t
uct_srd_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr)
{
Expand All @@ -34,6 +76,7 @@ static uct_ib_iface_ops_t uct_srd_iface_ops = {
.ep_query = (uct_ep_query_func_t)
ucs_empty_function_return_unsupported,
.ep_invalidate = ucs_empty_function_return_unsupported,
.ep_connect_to_ep_v2 = uct_srd_ep_connect_to_ep_v2,
.iface_is_reachable_v2 = uct_ib_iface_is_reachable_v2,
},
.create_cq = uct_ib_verbs_create_cq,
Expand Down Expand Up @@ -331,11 +374,10 @@ uct_srd_query_tl_devices(uct_md_h md, uct_tl_device_resource_t **tl_devices_p,
static uct_iface_ops_t uct_srd_iface_tl_ops = {
.ep_flush = ucs_empty_function_return_unsupported,
.ep_fence = ucs_empty_function_return_unsupported,
.ep_create = ucs_empty_function_return_unsupported,
.ep_get_address = ucs_empty_function_return_unsupported,
.ep_connect_to_ep = ucs_empty_function_return_unsupported,
.ep_destroy = (uct_ep_destroy_func_t)
ucs_empty_function_return_unsupported,
.ep_create = uct_srd_ep_create,
.ep_get_address = uct_srd_ep_get_address,
.ep_connect_to_ep = uct_base_ep_connect_to_ep,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_srd_ep_t),
.ep_am_bcopy = (uct_ep_am_bcopy_func_t)
ucs_empty_function_return_unsupported,
.ep_am_zcopy = (uct_ep_am_zcopy_func_t)
Expand Down
9 changes: 9 additions & 0 deletions src/uct/ib/efa/srd/srd_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define UCT_SRD_IFACE_H

#include "srd_def.h"
#include "srd_ep.h"

#include <uct/ib/ud/base/ud_iface_common.h>

Expand Down Expand Up @@ -56,6 +57,14 @@ typedef struct uct_srd_iface {
} uct_srd_iface_t;


void uct_srd_iface_add_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep);
void uct_srd_iface_remove_ep(uct_srd_iface_t *iface, uct_srd_ep_t *ep);

ucs_status_t uct_srd_iface_unpack_peer_address(uct_srd_iface_t *iface,
const uct_ib_address_t *ib_addr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indented

const uct_srd_iface_addr_t *if_addr,
int path_index, void *address_p);

END_C_DECLS

#endif
Loading
Loading