* Destroy the CQ first, to keep any more callbacks from coming
* up from it.
*/
+ evd_ptr->evd_enabled = DAT_FALSE;
if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
ia_ptr = evd_ptr->header.owner_ia;
{
struct dapl_llist_entry entry;
int destroy;
- struct dapl_hca *d_hca;
struct rdma_cm_id *cm_id;
struct ibv_comp_channel *ib_cq;
ib_cq_handle_t ib_cq_empty;
/* device attributes */
int rd_atom_in;
int rd_atom_out;
+ struct ibv_context *ib_ctx;
struct ibv_device *ib_dev;
/* dapls_modify_qp_state */
uint16_t lid;
DAT_RETURN dapli_ib_thread_init(void);
void dapli_ib_thread_destroy(void);
void dapli_cma_event_cb(void);
-void dapli_async_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
return 0;
}
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ channel->comp_channel.Milliseconds = 0;
+ return 0;
+}
+
static int dapls_thread_signal(void)
{
CompManagerCancel(windata.comp_mgr);
return dapls_config_fd(verbs->async_fd);
}
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ return dapls_config_fd(channel->fd);
+}
+
static int dapls_thread_signal(void)
{
return write(g_ib_pipe[1], "w", sizeof "w");
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" open_hca: RDMA channel created (%p)\n", g_cm_events);
- dat_status = dapli_ib_thread_init();
- if (dat_status != DAT_SUCCESS)
- return dat_status;
-
/* HCA name will be hostname or IP address */
if (getipaddr((char *)hca_name,
(char *)&hca_ptr->hca_address,
dapl_log(DAPL_DBG_TYPE_ERR,
" open_hca: rdma_bind ERR %s."
" Is %s configured?\n", strerror(errno), hca_name);
+ rdma_destroy_id(cm_id);
return DAT_INVALID_ADDRESS;
}
dapls_config_verbs(cm_id->verbs);
hca_ptr->port_num = cm_id->port_num;
hca_ptr->ib_trans.ib_dev = cm_id->verbs->device;
+ hca_ptr->ib_trans.ib_ctx = cm_id->verbs;
gid = &cm_id->route.addr.addr.ibaddr.sgid;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
(unsigned long long)ntohll(gid->global.subnet_prefix),
(unsigned long long)ntohll(gid->global.interface_id));
+ /* support for EVD's with CNO's: one channel via thread */
+ hca_ptr->ib_trans.ib_cq =
+ ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+ if (hca_ptr->ib_trans.ib_cq == NULL) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " open_hca: ibv_create_comp_channel ERR %s\n",
+ strerror(errno));
+ rdma_destroy_id(cm_id);
+ return DAT_INTERNAL_ERROR;
+ }
+ if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
+ rdma_destroy_id(cm_id);
+ return DAT_INTERNAL_ERROR;
+ }
+
/* set inline max with env or default, get local lid and gid 0 */
if (hca_ptr->ib_hca_handle->device->transport_type
== IBV_TRANSPORT_IWARP)
/* set default IB MTU */
hca_ptr->ib_trans.mtu = dapl_ib_mtu(2048);
+ dat_status = dapli_ib_thread_init();
+ if (dat_status != DAT_SUCCESS)
+ return dat_status;
/*
* Put new hca_transport on list for async and CQ event processing
* Wakeup work thread to add to polling list
*/
- dapl_llist_init_entry((DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry);
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry);
dapl_os_lock(&g_hca_lock);
dapl_llist_add_tail(&g_hca_list,
- (DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry,
+ (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
&hca_ptr->ib_trans.entry);
if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
hca_ptr->ib_trans.max_inline_send);
- hca_ptr->ib_trans.d_hca = hca_ptr;
return DAT_SUCCESS;
}
" ib_thread_destroy(%d) exit\n", dapl_os_getpid());
}
-void dapli_async_event_cb(struct _ib_hca_transport *hca)
-{
- struct ibv_async_event event;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
-
- if (hca->destroy)
- return;
-
- if (!ibv_get_async_event(hca->cm_id->verbs, &event)) {
-
- switch (event.event_type) {
- case IBV_EVENT_CQ_ERR:
- {
- struct dapl_ep *evd_ptr =
- event.element.cq->cq_context;
-
- dapl_log(DAPL_DBG_TYPE_ERR,
- "dapl async_event CQ (%p) ERR %d\n",
- evd_ptr, event.event_type);
-
- /* report up if async callback still setup */
- if (hca->async_cq_error)
- hca->async_cq_error(hca->cm_id->verbs,
- event.element.cq,
- &event,
- (void *)evd_ptr);
- break;
- }
- case IBV_EVENT_COMM_EST:
- {
- /* Received msgs on connected QP before RTU */
- dapl_log(DAPL_DBG_TYPE_UTIL,
- " async_event COMM_EST(%p) rdata beat RTU\n",
- event.element.qp);
-
- break;
- }
- case IBV_EVENT_QP_FATAL:
- case IBV_EVENT_QP_REQ_ERR:
- case IBV_EVENT_QP_ACCESS_ERR:
- case IBV_EVENT_QP_LAST_WQE_REACHED:
- case IBV_EVENT_SRQ_ERR:
- case IBV_EVENT_SRQ_LIMIT_REACHED:
- case IBV_EVENT_SQ_DRAINED:
- {
- struct dapl_ep *ep_ptr =
- event.element.qp->qp_context;
-
- dapl_log(DAPL_DBG_TYPE_ERR,
- "dapl async_event QP (%p) ERR %d\n",
- ep_ptr, event.event_type);
-
- /* report up if async callback still setup */
- if (hca->async_qp_error)
- hca->async_qp_error(hca->cm_id->verbs,
- ep_ptr->qp_handle,
- &event,
- (void *)ep_ptr);
- break;
- }
- case IBV_EVENT_PATH_MIG:
- case IBV_EVENT_PATH_MIG_ERR:
- case IBV_EVENT_DEVICE_FATAL:
- case IBV_EVENT_PORT_ACTIVE:
- case IBV_EVENT_PORT_ERR:
- case IBV_EVENT_LID_CHANGE:
- case IBV_EVENT_PKEY_CHANGE:
- case IBV_EVENT_SM_CHANGE:
- {
- dapl_log(DAPL_DBG_TYPE_WARN,
- "dapl async_event: DEV ERR %d\n",
- event.event_type);
-
- /* report up if async callback still setup */
- if (hca->async_unafiliated)
- hca->async_unafiliated(hca->cm_id->
- verbs, &event,
- hca->
- async_un_ctx);
- break;
- }
- case IBV_EVENT_CLIENT_REREGISTER:
- /* no need to report this event this time */
- dapl_log(DAPL_DBG_TYPE_UTIL,
- " async_event: IBV_CLIENT_REREGISTER\n");
- break;
-
- default:
- dapl_log(DAPL_DBG_TYPE_WARN,
- "dapl async_event: %d UNKNOWN\n",
- event.event_type);
- break;
-
- }
- ibv_ack_async_event(&event);
- }
-}
-
#if defined(_WIN64) || defined(_WIN32)
/* work thread for uAT, uCM, CQ, and async events */
void dapli_thread(void *arg)
dapl_os_unlock(&g_hca_lock);
uhca[idx]->destroy = 2;
} else {
+ dapli_cq_event_cb(uhca[idx]);
dapli_async_event_cb(uhca[idx]);
}
}
dapl_os_unlock(&g_hca_lock);
}
#else // _WIN64 || WIN32
+
/* work thread for uAT, uCM, CQ, and async events */
void dapli_thread(void *arg)
{
while (hca) {
/* uASYNC events */
- ufds[++idx].fd = hca->cm_id->verbs->async_fd;
+ ufds[++idx].fd = hca->ib_ctx->async_fd;
+ ufds[idx].events = POLLIN;
+ ufds[idx].revents = 0;
+ uhca[idx] = hca;
+
+ /* CQ events are non-direct with CNO's */
+ ufds[++idx].fd = hca->ib_cq->fd;
ufds[idx].events = POLLIN;
ufds[idx].revents = 0;
uhca[idx] = hca;
if (ufds[1].revents == POLLIN)
dapli_cma_event_cb();
- /* check and process ASYNC events, per device */
+ /* check and process CQ and ASYNC events, per device */
for (idx = 2; idx < fds; idx++) {
if (ufds[idx].revents == POLLIN) {
+ dapli_cq_event_cb(uhca[idx]);
dapli_async_event_cb(uhca[idx]);
}
}
strerror(errno));
/* cleanup any device on list marked for destroy */
- for (idx = 2; idx < fds; idx++) {
+ for (idx = 3; idx < fds; idx++) {
if (uhca[idx] && uhca[idx]->destroy == 1) {
dapl_os_lock(&g_hca_lock);
dapl_llist_remove_entry(
* DAT_INSUFFICIENT_RESOURCES
*
*/
-#if defined(_WIN32)
-
DAT_RETURN
dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
{
- OVERLAPPED *overlap;
+ struct ibv_comp_channel *channel;
DAT_RETURN ret;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
"dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
- evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
- *cqlen, evd_ptr, NULL, 0);
+ if (!evd_ptr->cno_ptr)
+ channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
+ else
+ channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
- if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE)
+ if (!channel)
return DAT_INSUFFICIENT_RESOURCES;
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_object_create: (%p)\n", evd_ptr);
+ evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
+ *cqlen, evd_ptr, channel, 0);
- overlap = &evd_ptr->ib_cq_handle->comp_entry.Overlap;
- overlap->hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
- if (!overlap->hEvent) {
+ if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
ret = DAT_INSUFFICIENT_RESOURCES;
goto err;
}
- overlap->hEvent = (HANDLE) ((ULONG_PTR) overlap->hEvent | 1);
-
/* arm cq for events */
dapls_set_cq_notify(ia_ptr, evd_ptr);
return DAT_SUCCESS;
err:
- ibv_destroy_cq(evd_ptr->ib_cq_handle);
+ if (!evd_ptr->cno_ptr)
+ ibv_destroy_comp_channel(channel);
return ret;
}
{
DAT_EVENT event;
ib_work_completion_t wc;
- HANDLE hevent;
+ struct ibv_comp_channel *channel;
if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
/* pull off CQ and EVD entries and toss */
while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
- hevent = evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent;
+ channel = evd_ptr->ib_cq_handle->channel;
if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
- CloseHandle(hevent);
+ if (!evd_ptr->cno_ptr)
+ ibv_destroy_comp_channel(channel);
evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
}
return DAT_SUCCESS;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" cq_object_wakeup: evd=%p\n", evd_ptr);
- if (!SetEvent(evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent))
- return DAT_INTERNAL_ERROR;
-
+ /* no wake up mechanism */
return DAT_SUCCESS;
}
-DAT_RETURN
-dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
+#if defined(_WIN32)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
{
- int status;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_object_wait: EVD %p time %d\n",
- evd_ptr, timeout);
-
- status = WaitForSingleObject(evd_ptr->ib_cq_handle->
- comp_entry.Overlap.hEvent,
- timeout / 1000);
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_object_wait: EVD %p status 0x%x\n",
- evd_ptr, status);
- if (status)
- return DAT_TIMEOUT_EXPIRED;
-
- InterlockedExchange(&evd_ptr->ib_cq_handle->comp_entry.Busy, 0);
- return DAT_SUCCESS;
+ channel->comp_channel.Milliseconds =
+ (timeout == DAT_TIMEOUT_INFINITE) ? INFINITE : timeout / 1000;
+ return 0;
}
#else // WIN32
-DAT_RETURN
-dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
- IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
-{
- struct ibv_comp_channel *channel;
- DAT_RETURN ret;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
-
- channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
- if (!channel)
- return DAT_INSUFFICIENT_RESOURCES;
-
- evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
- *cqlen, evd_ptr, channel, 0);
-
- if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
- ret = DAT_INSUFFICIENT_RESOURCES;
- goto err;
- }
-
- /* arm cq for events */
- dapls_set_cq_notify(ia_ptr, evd_ptr);
-
- /* update with returned cq entry size */
- *cqlen = evd_ptr->ib_cq_handle->cqe;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- "dapls_ib_cq_alloc: new_cq %p cqlen=%d \n",
- evd_ptr->ib_cq_handle, *cqlen);
-
- return DAT_SUCCESS;
-
-err:
- ibv_destroy_comp_channel(channel);
- return ret;
-}
-
-DAT_RETURN dapls_ib_cq_free(IN DAPL_IA * ia_ptr, IN DAPL_EVD * evd_ptr)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
{
- DAT_EVENT event;
- ib_work_completion_t wc;
- struct ibv_comp_channel *channel;
-
- if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
- /* pull off CQ and EVD entries and toss */
- while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
- while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
-
- channel = evd_ptr->ib_cq_handle->channel;
- if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
- return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
- ibv_destroy_comp_channel(channel);
- evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
- }
- return DAT_SUCCESS;
-}
-
-DAT_RETURN
-dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr)
-{
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " cq_object_wakeup: evd=%p\n", evd_ptr);
+ int status, timeout_ms;
+ struct pollfd cq_fd = {
+ .fd = channel->fd,
+ .events = POLLIN,
+ .revents = 0
+ };
- /* no wake up mechanism */
- return DAT_SUCCESS;
+ /* uDAPL timeout values in usecs */
+ timeout_ms = (timeout == DAT_TIMEOUT_INFINITE) ? -1 : timeout / 1000;
+ status = poll(&cq_fd, 1, timeout_ms);
+ if (status > 0)
+ return 0;
+ else if (status == 0)
+ return ETIMEDOUT;
+ else
+ return status;
}
+#endif
DAT_RETURN
dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
struct ibv_comp_channel *channel = evd_ptr->ib_cq_handle->channel;
struct ibv_cq *ibv_cq = NULL;
void *context;
- int status = 0;
- int timeout_ms = -1;
- struct pollfd cq_fd = {
- .fd = channel->fd,
- .events = POLLIN,
- .revents = 0
- };
+ int status;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" cq_object_wait: EVD %p time %d\n",
evd_ptr, timeout);
- /* uDAPL timeout values in usecs */
- if (timeout != DAT_TIMEOUT_INFINITE)
- timeout_ms = timeout / 1000;
-
- status = poll(&cq_fd, 1, timeout_ms);
-
- /* returned event */
- if (status > 0) {
+ status = dapls_wait_comp_channel(channel, timeout);
+ if (!status) {
if (!ibv_get_cq_event(channel, &ibv_cq, &context)) {
ibv_ack_cq_events(ibv_cq, 1);
}
- status = 0;
-
- /* timeout */
- } else if (status == 0)
- status = ETIMEDOUT;
+ }
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" cq_object_wait: RET evd %p ibv_cq %p %s\n",
evd_ptr, ibv_cq, strerror(errno));
- return (dapl_convert_errno(status, "cq_wait_object_wait"));
+ return dapl_convert_errno(status, "cq_wait_object_wait");
+}
+void dapli_cq_event_cb(struct _ib_hca_transport *tp)
+{
+ /* check all comp events on this device */
+ struct dapl_evd *evd = NULL;
+ struct ibv_cq *ibv_cq = NULL;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", tp);
+
+ while (!ibv_get_cq_event(tp->ib_cq, &ibv_cq, (void*)&evd)) {
+
+ if (!DAPL_BAD_HANDLE(evd, DAPL_MAGIC_EVD)) {
+ /* Both EVD or EVD->CNO event via callback */
+ dapl_evd_dto_callback(tp->ib_ctx,
+ evd->ib_cq_handle, (void*)evd);
+ }
+
+ ibv_ack_cq_events(ibv_cq, 1);
+ }
}
-#endif
/*
* dapl_ib_cq_resize
/* prototypes */
int32_t dapls_ib_init(void);
int32_t dapls_ib_release(void);
+
+/* util.c */
enum ibv_mtu dapl_ib_mtu(int mtu);
char *dapl_ib_mtu_str(enum ibv_mtu mtu);
DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len);
return DAT_SUCCESS;
}
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+ struct ibv_async_event event;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
+
+ if (hca->destroy)
+ return;
+
+ if (!ibv_get_async_event(hca->ib_ctx, &event)) {
+
+ switch (event.event_type) {
+ case IBV_EVENT_CQ_ERR:
+ {
+ struct dapl_ep *evd_ptr =
+ event.element.cq->cq_context;
+
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "dapl async_event CQ (%p) ERR %d\n",
+ evd_ptr, event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_cq_error)
+ hca->async_cq_error(hca->ib_ctx,
+ event.element.cq,
+ &event,
+ (void *)evd_ptr);
+ break;
+ }
+ case IBV_EVENT_COMM_EST:
+ {
+ /* Received msgs on connected QP before RTU */
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " async_event COMM_EST(%p) rdata beat RTU\n",
+ event.element.qp);
+
+ break;
+ }
+ case IBV_EVENT_QP_FATAL:
+ case IBV_EVENT_QP_REQ_ERR:
+ case IBV_EVENT_QP_ACCESS_ERR:
+ case IBV_EVENT_QP_LAST_WQE_REACHED:
+ case IBV_EVENT_SRQ_ERR:
+ case IBV_EVENT_SRQ_LIMIT_REACHED:
+ case IBV_EVENT_SQ_DRAINED:
+ {
+ struct dapl_ep *ep_ptr =
+ event.element.qp->qp_context;
+
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ "dapl async_event QP (%p) ERR %d\n",
+ ep_ptr, event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_qp_error)
+ hca->async_qp_error(hca->ib_ctx,
+ ep_ptr->qp_handle,
+ &event,
+ (void *)ep_ptr);
+ break;
+ }
+ case IBV_EVENT_PATH_MIG:
+ case IBV_EVENT_PATH_MIG_ERR:
+ case IBV_EVENT_DEVICE_FATAL:
+ case IBV_EVENT_PORT_ACTIVE:
+ case IBV_EVENT_PORT_ERR:
+ case IBV_EVENT_LID_CHANGE:
+ case IBV_EVENT_PKEY_CHANGE:
+ case IBV_EVENT_SM_CHANGE:
+ {
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ "dapl async_event: DEV ERR %d\n",
+ event.event_type);
+
+ /* report up if async callback still setup */
+ if (hca->async_unafiliated)
+ hca->async_unafiliated(hca->ib_ctx,
+ &event,
+ hca->async_un_ctx);
+ break;
+ }
+ case IBV_EVENT_CLIENT_REREGISTER:
+ /* no need to report this event this time */
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " async_event: IBV_CLIENT_REREGISTER\n");
+ break;
+
+ default:
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ "dapl async_event: %d UNKNOWN\n",
+ event.event_type);
+ break;
+
+ }
+ ibv_ack_async_event(&event);
+ }
+}
+
/*
* dapls_set_provider_specific_attr
*
/* ib_hca_transport_t, specific to this implementation */
typedef struct _ib_hca_transport
{
+ struct dapl_llist_entry entry;
+ int destroy;
union ibv_gid gid;
struct ibv_device *ib_dev;
+ struct ibv_context *ib_ctx;
ib_cq_handle_t ib_cq_empty;
DAPL_OS_LOCK cq_lock;
int max_inline_send;
void cr_thread(void *arg);
int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr);
void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
#include <stdlib.h>
+ib_thread_state_t g_ib_thread_state = 0;
+DAPL_OS_THREAD g_ib_thread;
+DAPL_OS_LOCK g_hca_lock;
+struct dapl_llist_entry *g_hca_list;
+
+void dapli_thread(void *arg);
+DAT_RETURN dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+
+#if defined(_WIN64) || defined(_WIN32)
+#include "..\..\..\..\..\etc\user\comp_channel.cpp"
+#include <rdma\winverbs.h>
+
+struct ibvw_windata windata;
+
+static int dapls_os_init(void)
+{
+ return ibvw_get_windata(&windata, IBVW_WINDATA_VERSION);
+}
+
+static void dapls_os_release(void)
+{
+ if (windata.comp_mgr)
+ ibvw_release_windata(&windata, IBVW_WINDATA_VERSION);
+ windata.comp_mgr = NULL;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+ verbs->channel.Milliseconds = 0;
+ return 0;
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ channel->comp_channel.Milliseconds = 0;
+ return 0;
+}
+
+static int dapls_thread_signal(void)
+{
+ CompManagerCancel(windata.comp_mgr);
+ return 0;
+}
+#else // _WIN64 || WIN32
+int g_ib_pipe[2];
+
+static int dapls_os_init(void)
+{
+ /* create pipe for waking up work thread */
+ return pipe(g_ib_pipe);
+}
+
+static void dapls_os_release(void)
+{
+ /* close pipe? */
+}
+
+static int dapls_config_fd(int fd)
+{
+ int opts;
+
+ opts = fcntl(fd, F_GETFL);
+ if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " dapls_config_fd: fcntl on fd %d ERR %d %s\n",
+ fd, opts, strerror(errno));
+ return errno;
+ }
+
+ return 0;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+ return dapls_config_fd(verbs->async_fd);
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ return dapls_config_fd(channel->fd);
+}
+
+static int dapls_thread_signal(void)
+{
+ return write(g_ib_pipe[1], "w", sizeof "w");
+}
+#endif
+
+
static int32_t create_cr_pipe(IN DAPL_HCA * hca_ptr)
{
DAPL_SOCKET listen_socket;
*/
int32_t dapls_ib_init(void)
{
- return 0;
-}
+ /* initialize hca_list */
+ dapl_os_lock_init(&g_hca_lock);
+ dapl_llist_init_head(&g_hca_list);
-int32_t dapls_ib_release(void)
-{
- return 0;
-}
+ if (dapls_os_init())
+ return 1;
-#if defined(_WIN64) || defined(_WIN32)
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
return 0;
}
-#else // _WIN64 || WIN32
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
- int opts;
-
- opts = fcntl(channel->fd, F_GETFL); /* uCQ */
- if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n",
- channel->fd, opts, strerror(errno));
- return errno;
- }
+int32_t dapls_ib_release(void)
+{
+ dapli_ib_thread_destroy();
+ dapls_os_release();
return 0;
}
-#endif
/*
* dapls_ib_open_hca
" open_hca: device %s not found\n", hca_name);
goto err;
- found:
+found:
dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " open_hca: Found dev %s %016llx\n",
ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
(unsigned long long)
strerror(errno));
goto err;
}
+ hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
+ dapls_config_verbs(hca_ptr->ib_hca_handle);
/* get lid for this hca-port, network order */
if (ibv_query_port(hca_ptr->ib_hca_handle,
hca_ptr->ib_trans.mtu =
dapl_ib_mtu(dapl_os_get_env_val("DAPL_IB_MTU", SCM_IB_MTU));
-#ifndef CQ_WAIT_OBJECT
- /* initialize cq_lock */
- dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
- if (dat_status != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: failed to init cq_lock\n");
- goto bail;
- }
- /* EVD events without direct CQ channels, non-blocking */
+
+ /* EVD events without direct CQ channels, CNO support */
hca_ptr->ib_trans.ib_cq =
ibv_create_comp_channel(hca_ptr->ib_hca_handle);
if (hca_ptr->ib_trans.ib_cq == NULL) {
strerror(errno));
goto bail;
}
-
- if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
- goto bail;
- }
-
- if (dapli_cq_thread_init(hca_ptr)) {
+ dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq);
+
+ dat_status = dapli_ib_thread_init();
+ if (dat_status != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: cq_thread_init failed for %s\n",
- ibv_get_device_name(hca_ptr->ib_trans.ib_dev));
+ " open_hca: failed to init cq thread lock\n");
goto bail;
}
-#endif /* CQ_WAIT_OBJECT */
+ /*
+ * Put new hca_transport on list for async and CQ event processing
+ * Wakeup work thread to add to polling list
+ */
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&hca_ptr->ib_trans.entry);
+ dapl_os_lock(&g_hca_lock);
+ dapl_llist_add_tail(&g_hca_list,
+ (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
+ &hca_ptr->ib_trans.entry);
+ if (dapls_thread_signal() == -1)
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " open_hca: thread wakeup error = %s\n",
+ strerror(errno));
+ dapl_os_unlock(&g_hca_lock);
/* initialize cr_list lock */
dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
/* wait for thread */
while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
- dapl_os_sleep_usec(2000);
+ dapl_os_sleep_usec(1000);
}
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
{
dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " close_hca: %p\n", hca_ptr);
-#ifndef CQ_WAIT_OBJECT
- dapli_cq_thread_destroy(hca_ptr);
- dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
-#endif /* CQ_WAIT_OBJECT */
-
if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
if (ibv_close_device(hca_ptr->ib_hca_handle))
return (dapl_convert_errno(errno, "ib_close_device"));
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
}
+ dapl_os_lock(&g_hca_lock);
+ if (g_ib_thread_state != IB_THREAD_RUN) {
+ dapl_os_unlock(&g_hca_lock);
+ return (DAT_SUCCESS);
+ }
+ dapl_os_unlock(&g_hca_lock);
+
/* destroy cr_thread and lock */
hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
- if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
- dapl_log(DAPL_DBG_TYPE_UTIL,
- " thread_destroy: thread wakeup err = %s\n",
- strerror(errno));
+ send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" close_hca: waiting for cr_thread\n");
- if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
- dapl_log(DAPL_DBG_TYPE_UTIL,
- " thread_destroy: thread wakeup err = %s\n",
- strerror(errno));
- dapl_os_sleep_usec(2000);
+ send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
+ dapl_os_sleep_usec(1000);
}
dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
destroy_cr_pipe(hca_ptr); /* no longer need pipe */
+
+ /*
+ * Remove hca from async event processing list
+ * Wakeup work thread to remove from polling list
+ */
+ hca_ptr->ib_trans.destroy = 1;
+ if (dapls_thread_signal() == -1)
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " destroy: thread wakeup error = %s\n",
+ strerror(errno));
+
+ /* wait for thread to remove HCA references */
+ while (hca_ptr->ib_trans.destroy != 2) {
+ if (dapls_thread_signal() == -1)
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " destroy: thread wakeup error = %s\n",
+ strerror(errno));
+ dapl_os_sleep_usec(1000);
+ }
+
return (DAT_SUCCESS);
}
+
+DAT_RETURN dapli_ib_thread_init(void)
+{
+ DAT_RETURN dat_status;
+
+ dapl_os_lock(&g_hca_lock);
+ if (g_ib_thread_state != IB_THREAD_INIT) {
+ dapl_os_unlock(&g_hca_lock);
+ return DAT_SUCCESS;
+ }
+
+ g_ib_thread_state = IB_THREAD_CREATE;
+ dapl_os_unlock(&g_hca_lock);
+
+ /* create thread to process inbound connect request */
+ dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+ if (dat_status != DAT_SUCCESS)
+ return (dapl_convert_errno(errno,
+ "create_thread ERR:"
+ " check resource limits"));
+
+ /* wait for thread to start */
+ dapl_os_lock(&g_hca_lock);
+ while (g_ib_thread_state != IB_THREAD_RUN) {
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_init: waiting for ib_thread\n");
+ dapl_os_unlock(&g_hca_lock);
+ dapl_os_sleep_usec(1000);
+ dapl_os_lock(&g_hca_lock);
+ }
+ dapl_os_unlock(&g_hca_lock);
+
+ return DAT_SUCCESS;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+ int retries = 10;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy(%d)\n", dapl_os_getpid());
+ /*
+ * wait for async thread to terminate.
+ * pthread_join would be the correct method
+ * but some applications have some issues
+ */
+
+ /* destroy ib_thread, wait for termination, if not already */
+ dapl_os_lock(&g_hca_lock);
+ if (g_ib_thread_state != IB_THREAD_RUN)
+ goto bail;
+
+ g_ib_thread_state = IB_THREAD_CANCEL;
+ if (dapls_thread_signal() == -1)
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " destroy: thread wakeup error = %s\n",
+ strerror(errno));
+ while ((g_ib_thread_state != IB_THREAD_EXIT) && (retries--)) {
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy: waiting for ib_thread\n");
+ if (dapls_thread_signal() == -1)
+ dapl_log(DAPL_DBG_TYPE_UTIL,
+ " destroy: thread wakeup error = %s\n",
+ strerror(errno));
+ dapl_os_unlock(&g_hca_lock);
+ dapl_os_sleep_usec(2000);
+ dapl_os_lock(&g_hca_lock);
+ }
+bail:
+ dapl_os_unlock(&g_hca_lock);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread_destroy(%d) exit\n", dapl_os_getpid());
+}
+
+
+#if defined(_WIN64) || defined(_WIN32)
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+ struct _ib_hca_transport *hca;
+ struct _ib_hca_transport *uhca[8];
+ COMP_CHANNEL *channel;
+ int ret, idx, cnt;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d,0x%x): ENTER: \n",
+ dapl_os_getpid(), g_ib_thread);
+
+ dapl_os_lock(&g_hca_lock);
+ for (g_ib_thread_state = IB_THREAD_RUN;
+ g_ib_thread_state == IB_THREAD_RUN;
+ dapl_os_lock(&g_hca_lock)) {
+
+ idx = 0;
+ hca = dapl_llist_is_empty(&g_hca_list) ? NULL :
+ dapl_llist_peek_head(&g_hca_list);
+
+ while (hca) {
+ uhca[idx++] = hca;
+ hca = dapl_llist_next_entry(&g_hca_list,
+ (DAPL_LLIST_ENTRY *)
+ &hca->entry);
+ }
+ cnt = idx;
+
+ dapl_os_unlock(&g_hca_lock);
+ ret = CompManagerPoll(windata.comp_mgr, INFINITE, &channel);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread(%d) poll_event 0x%x\n",
+ dapl_os_getpid(), ret);
+
+
+ /* check and process ASYNC events, per device */
+ for (idx = 0; idx < cnt; idx++) {
+ if (uhca[idx]->destroy == 1) {
+ dapl_os_lock(&g_hca_lock);
+ dapl_llist_remove_entry(&g_hca_list,
+ (DAPL_LLIST_ENTRY *)
+ &uhca[idx]->entry);
+ dapl_os_unlock(&g_hca_lock);
+ uhca[idx]->destroy = 2;
+ } else {
+ dapli_cq_event_cb(uhca[idx]);
+ dapli_async_event_cb(uhca[idx]);
+ }
+ }
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d) EXIT\n",
+ dapl_os_getpid());
+ g_ib_thread_state = IB_THREAD_EXIT;
+ dapl_os_unlock(&g_hca_lock);
+}
+#else // _WIN64 || WIN32
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+ struct pollfd ufds[__FD_SETSIZE];
+ struct _ib_hca_transport *uhca[__FD_SETSIZE] = { NULL };
+ struct _ib_hca_transport *hca;
+ int ret, idx, fds;
+ char rbuf[2];
+
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+ " ib_thread(%d,0x%x): ENTER: pipe %d \n",
+ dapl_os_getpid(), g_ib_thread, g_ib_pipe[0]);
+
+ /* Poll across pipe, CM, AT never changes */
+ dapl_os_lock(&g_hca_lock);
+ g_ib_thread_state = IB_THREAD_RUN;
+
+ ufds[0].fd = g_ib_pipe[0]; /* pipe */
+ ufds[0].events = POLLIN;
+
+ while (g_ib_thread_state == IB_THREAD_RUN) {
+
+ /* build ufds after pipe and uCMA events */
+ ufds[0].revents = 0;
+ idx = 0;
+
+ /* Walk HCA list and setup async and CQ events */
+ if (!dapl_llist_is_empty(&g_hca_list))
+ hca = dapl_llist_peek_head(&g_hca_list);
+ else
+ hca = NULL;
+
+ while (hca) {
+
+ /* uASYNC events */
+ ufds[++idx].fd = hca->ib_ctx->async_fd;
+ ufds[idx].events = POLLIN;
+ ufds[idx].revents = 0;
+ uhca[idx] = hca;
+
+ /* CQ events are non-direct with CNO's */
+ ufds[++idx].fd = hca->ib_cq->fd;
+ ufds[idx].events = POLLIN;
+ ufds[idx].revents = 0;
+ uhca[idx] = hca;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+ " ib_thread(%d) poll_fd: hca[%d]=%p,"
+ " async=%d pipe=%d \n",
+ dapl_os_getpid(), hca, ufds[idx - 1].fd,
+ ufds[0].fd);
+
+ hca = dapl_llist_next_entry(&g_hca_list,
+ (DAPL_LLIST_ENTRY *)
+ &hca->entry);
+ }
+
+ /* unlock, and setup poll */
+ fds = idx + 1;
+ dapl_os_unlock(&g_hca_lock);
+ ret = poll(ufds, fds, -1);
+ if (ret <= 0) {
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+ " ib_thread(%d): ERR %s poll\n",
+ dapl_os_getpid(), strerror(errno));
+ dapl_os_lock(&g_hca_lock);
+ continue;
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+ " ib_thread(%d) poll_event: "
+ " async=0x%x pipe=0x%x \n",
+ dapl_os_getpid(), ufds[idx].revents,
+ ufds[0].revents);
+
+ /* check and process CQ and ASYNC events, per device */
+ for (idx = 1; idx < fds; idx++) {
+ if (ufds[idx].revents == POLLIN) {
+ dapli_cq_event_cb(uhca[idx]);
+ dapli_async_event_cb(uhca[idx]);
+ }
+ }
+
+ /* check and process user events, PIPE */
+ if (ufds[0].revents == POLLIN) {
+ if (read(g_ib_pipe[0], rbuf, 2) == -1)
+ dapl_log(DAPL_DBG_TYPE_THREAD,
+ " cr_thread: pipe rd err= %s\n",
+ strerror(errno));
+
+ /* cleanup any device on list marked for destroy */
+ for (idx = 1; idx < fds; idx++) {
+ if (uhca[idx] && uhca[idx]->destroy == 1) {
+ dapl_os_lock(&g_hca_lock);
+ dapl_llist_remove_entry(
+ &g_hca_list,
+ (DAPL_LLIST_ENTRY*)
+ &uhca[idx]->entry);
+ dapl_os_unlock(&g_hca_lock);
+ uhca[idx]->destroy = 2;
+ }
+ }
+ }
+ dapl_os_lock(&g_hca_lock);
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " ib_thread(%d) EXIT\n",
+ dapl_os_getpid());
+ g_ib_thread_state = IB_THREAD_EXIT;
+ dapl_os_unlock(&g_hca_lock);
+}
+#endif
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_req_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
- getpid());
- return (DAT_ABORT);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_rcv_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
- getpid());
- return (DAT_ABORT);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_rcv_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: "
- "evd != h_dto_rcv_evd\n", getpid());
- return (ret);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_req_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
- getpid());
- return (DAT_ABORT);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */
*/
printf("%d Sending RDMA read completion message\n", getpid());
+ /* give remote chance to process read completes */
+ if (use_cno) {
+#if defined(_WIN32) || defined(_WIN64)
+ Sleep(1000);
+#else
+ sleep(1);
+#endif
+ }
+
ret = send_msg(&rmr_send_msg,
sizeof(DAT_RMR_TRIPLET),
lmr_context_send_msg,
LOGPRINTF("%d waiting for message receive event\n", getpid());
if (use_cno) {
DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
- ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
+
+ ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_rcv_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
- getpid());
- return (ret);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */
LOGPRINTF("%d cno wait return evd_handle=%p\n",
getpid(), evd);
if (evd != h_dto_rcv_evd) {
- fprintf(stderr,
- "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
- getpid());
- return (ret);
+ /* CNO timeout, already on EVD */
+ if (evd != NULL)
+ return (ret);
}
}
/* use wait to dequeue */