]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
CNO support broken in both CMA and SCM providers.
authorArlin Davis <arlin.r.davis@intel.com>
Sun, 2 Aug 2009 21:21:09 +0000 (14:21 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Sun, 2 Aug 2009 21:21:09 +0000 (14:21 -0700)
CQ thread/callback mechanism was removed by mistake. Still
need indirect DTO callbacks when CNO is attached to EVD's.

Add CQ event channel to cma provider's thread and add
to select for rdma_cm and async channels.

For scm provider there is not easy way to add this channel
to the select across sockets on windows. So, for portablity
reasons 2 thread is started to process the ASYNC and
CQ channels for events.

Must disable EVD (evd_endabled=FALSE) during destroy
to prevent EVD events firing for CNOs and re-arming CQ while
CQ is being destroyed.

Change dtest to check EVD after CNO timesout.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
dapl/common/dapl_evd_util.c
dapl/openib_cma/dapl_ib_util.h
dapl/openib_cma/device.c
dapl/openib_common/cq.c
dapl/openib_common/dapl_ib_common.h
dapl/openib_common/util.c
dapl/openib_scm/dapl_ib_util.h
dapl/openib_scm/device.c
test/dtest/dtest.c

index 88c3f8f9ff1d8148097dc51ed21d13d56c834eb9..02909e9ae73194e8feb9eff02556b800bc40a555 100644 (file)
@@ -469,6 +469,7 @@ DAT_RETURN dapls_evd_dealloc(IN DAPL_EVD * evd_ptr)
         * Destroy the CQ first, to keep any more callbacks from coming
         * up from it.
         */
+       evd_ptr->evd_enabled = DAT_FALSE;
        if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
                ia_ptr = evd_ptr->header.owner_ia;
 
index f466c0680813be0c83dc388c91b6b1923da487bb..c9ab4d61bff7e27574d3fb95732bc67e09fbf754 100755 (executable)
@@ -84,7 +84,6 @@ typedef struct _ib_hca_transport
 { 
        struct dapl_llist_entry entry;
        int                     destroy;
-       struct dapl_hca         *d_hca;
        struct rdma_cm_id       *cm_id;
        struct ibv_comp_channel *ib_cq;
        ib_cq_handle_t          ib_cq_empty;
@@ -99,6 +98,7 @@ typedef struct _ib_hca_transport
        /* device attributes */
        int                     rd_atom_in;
        int                     rd_atom_out;
+       struct  ibv_context     *ib_ctx;
        struct  ibv_device      *ib_dev;
        /* dapls_modify_qp_state */
        uint16_t                lid;
@@ -119,7 +119,8 @@ void dapli_thread(void *arg);
 DAT_RETURN  dapli_ib_thread_init(void);
 void dapli_ib_thread_destroy(void);
 void dapli_cma_event_cb(void);
-void dapli_async_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
 dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
 void dapls_ib_cm_free(dp_ib_cm_handle_t cm, DAPL_EP *ep);
 DAT_RETURN dapls_modify_qp_state(IN ib_qp_handle_t qp_handle,
index 81203bf7859694431d1916886cc71e5f0beabf6c..743e8fa91b073353677efb9dade04024c73f157b 100644 (file)
@@ -123,6 +123,12 @@ static int dapls_config_verbs(struct ibv_context *verbs)
        return 0;
 }
 
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       channel->comp_channel.Milliseconds = 0;
+       return 0;
+}
+
 static int dapls_thread_signal(void)
 {
        CompManagerCancel(windata.comp_mgr);
@@ -205,6 +211,11 @@ static int dapls_config_verbs(struct ibv_context *verbs)
        return dapls_config_fd(verbs->async_fd);
 }
 
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       return dapls_config_fd(channel->fd);
+}
+
 static int dapls_thread_signal(void)
 {
        return write(g_ib_pipe[1], "w", sizeof "w");
@@ -334,10 +345,6 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                     " open_hca: RDMA channel created (%p)\n", g_cm_events);
 
-       dat_status = dapli_ib_thread_init();
-       if (dat_status != DAT_SUCCESS)
-               return dat_status;
-
        /* HCA name will be hostname or IP address */
        if (getipaddr((char *)hca_name,
                      (char *)&hca_ptr->hca_address, 
@@ -357,6 +364,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                dapl_log(DAPL_DBG_TYPE_ERR,
                         " open_hca: rdma_bind ERR %s."
                         " Is %s configured?\n", strerror(errno), hca_name);
+               rdma_destroy_id(cm_id);
                return DAT_INVALID_ADDRESS;
        }
 
@@ -366,6 +374,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
        dapls_config_verbs(cm_id->verbs);
        hca_ptr->port_num = cm_id->port_num;
        hca_ptr->ib_trans.ib_dev = cm_id->verbs->device;
+       hca_ptr->ib_trans.ib_ctx = cm_id->verbs;
        gid = &cm_id->route.addr.addr.ibaddr.sgid;
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
@@ -374,6 +383,21 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                     (unsigned long long)ntohll(gid->global.subnet_prefix),
                     (unsigned long long)ntohll(gid->global.interface_id));
 
+       /* support for EVD's with CNO's: one channel via thread */
+       hca_ptr->ib_trans.ib_cq =
+           ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+       if (hca_ptr->ib_trans.ib_cq == NULL) {
+               dapl_log(DAPL_DBG_TYPE_ERR,
+                        " open_hca: ibv_create_comp_channel ERR %s\n",
+                        strerror(errno));
+               rdma_destroy_id(cm_id);
+               return DAT_INTERNAL_ERROR;
+       }
+       if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
+               rdma_destroy_id(cm_id);
+               return DAT_INTERNAL_ERROR;
+       }
+
        /* set inline max with env or default, get local lid and gid 0 */
        if (hca_ptr->ib_hca_handle->device->transport_type
            == IBV_TRANSPORT_IWARP)
@@ -395,14 +419,17 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
        /* set default IB MTU */
        hca_ptr->ib_trans.mtu = dapl_ib_mtu(2048);
 
+       dat_status = dapli_ib_thread_init();
+       if (dat_status != DAT_SUCCESS)
+               return dat_status;
        /* 
         * Put new hca_transport on list for async and CQ event processing 
         * Wakeup work thread to add to polling list
         */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry);
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry);
        dapl_os_lock(&g_hca_lock);
        dapl_llist_add_tail(&g_hca_list,
-                           (DAPL_LLIST_ENTRY *) & hca_ptr->ib_trans.entry,
+                           (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
                            &hca_ptr->ib_trans.entry);
        if (dapls_thread_signal() == -1)
                dapl_log(DAPL_DBG_TYPE_UTIL,
@@ -425,7 +452,6 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                     &hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff, 
                     hca_ptr->ib_trans.max_inline_send);
 
-       hca_ptr->ib_trans.d_hca = hca_ptr;
        return DAT_SUCCESS;
 }
 
@@ -574,105 +600,6 @@ bail:
                     " ib_thread_destroy(%d) exit\n", dapl_os_getpid());
 }
 
-void dapli_async_event_cb(struct _ib_hca_transport *hca)
-{
-       struct ibv_async_event event;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
-
-       if (hca->destroy)
-               return;
-
-       if (!ibv_get_async_event(hca->cm_id->verbs, &event)) {
-
-               switch (event.event_type) {
-               case IBV_EVENT_CQ_ERR:
-               {
-                       struct dapl_ep *evd_ptr =
-                               event.element.cq->cq_context;
-
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                                "dapl async_event CQ (%p) ERR %d\n",
-                                evd_ptr, event.event_type);
-
-                       /* report up if async callback still setup */
-                       if (hca->async_cq_error)
-                               hca->async_cq_error(hca->cm_id->verbs,
-                                                       event.element.cq,
-                                                       &event,
-                                                       (void *)evd_ptr);
-                       break;
-               }
-               case IBV_EVENT_COMM_EST:
-               {
-                       /* Received msgs on connected QP before RTU */
-                       dapl_log(DAPL_DBG_TYPE_UTIL,
-                                " async_event COMM_EST(%p) rdata beat RTU\n",
-                                event.element.qp);
-
-                       break;
-               }
-               case IBV_EVENT_QP_FATAL:
-               case IBV_EVENT_QP_REQ_ERR:
-               case IBV_EVENT_QP_ACCESS_ERR:
-               case IBV_EVENT_QP_LAST_WQE_REACHED:
-               case IBV_EVENT_SRQ_ERR:
-               case IBV_EVENT_SRQ_LIMIT_REACHED:
-               case IBV_EVENT_SQ_DRAINED:
-               {
-                       struct dapl_ep *ep_ptr =
-                               event.element.qp->qp_context;
-
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                                "dapl async_event QP (%p) ERR %d\n",
-                                ep_ptr, event.event_type);
-
-                       /* report up if async callback still setup */
-                       if (hca->async_qp_error)
-                               hca->async_qp_error(hca->cm_id->verbs,
-                                                   ep_ptr->qp_handle,
-                                                   &event,
-                                                   (void *)ep_ptr);
-                       break;
-               }
-               case IBV_EVENT_PATH_MIG:
-               case IBV_EVENT_PATH_MIG_ERR:
-               case IBV_EVENT_DEVICE_FATAL:
-               case IBV_EVENT_PORT_ACTIVE:
-               case IBV_EVENT_PORT_ERR:
-               case IBV_EVENT_LID_CHANGE:
-               case IBV_EVENT_PKEY_CHANGE:
-               case IBV_EVENT_SM_CHANGE:
-               {
-                       dapl_log(DAPL_DBG_TYPE_WARN,
-                                "dapl async_event: DEV ERR %d\n",
-                                event.event_type);
-
-                       /* report up if async callback still setup */
-                       if (hca->async_unafiliated)
-                               hca->async_unafiliated(hca->cm_id->
-                                                       verbs, &event,
-                                                       hca->
-                                                       async_un_ctx);
-                       break;
-               }
-               case IBV_EVENT_CLIENT_REREGISTER:
-                       /* no need to report this event this time */
-                       dapl_log(DAPL_DBG_TYPE_UTIL,
-                                " async_event: IBV_CLIENT_REREGISTER\n");
-                       break;
-
-               default:
-                       dapl_log(DAPL_DBG_TYPE_WARN,
-                                "dapl async_event: %d UNKNOWN\n",
-                                event.event_type);
-                       break;
-
-               }
-               ibv_ack_async_event(&event);
-       }
-}
-
 #if defined(_WIN64) || defined(_WIN32)
 /* work thread for uAT, uCM, CQ, and async events */
 void dapli_thread(void *arg)
@@ -721,6 +648,7 @@ void dapli_thread(void *arg)
                                dapl_os_unlock(&g_hca_lock);
                                uhca[idx]->destroy = 2;
                        } else {
+                               dapli_cq_event_cb(uhca[idx]);
                                dapli_async_event_cb(uhca[idx]);
                        }
                }
@@ -732,6 +660,7 @@ void dapli_thread(void *arg)
        dapl_os_unlock(&g_hca_lock);
 }
 #else                          // _WIN64 || WIN32
+
 /* work thread for uAT, uCM, CQ, and async events */
 void dapli_thread(void *arg)
 {
@@ -771,7 +700,13 @@ void dapli_thread(void *arg)
                while (hca) {
 
                        /* uASYNC events */
-                       ufds[++idx].fd = hca->cm_id->verbs->async_fd;
+                       ufds[++idx].fd = hca->ib_ctx->async_fd;
+                       ufds[idx].events = POLLIN;
+                       ufds[idx].revents = 0;
+                       uhca[idx] = hca;
+
+                       /* CQ events are non-direct with CNO's */
+                       ufds[++idx].fd = hca->ib_cq->fd;
                        ufds[idx].events = POLLIN;
                        ufds[idx].revents = 0;
                        uhca[idx] = hca;
@@ -809,9 +744,10 @@ void dapli_thread(void *arg)
                if (ufds[1].revents == POLLIN)
                        dapli_cma_event_cb();
 
-               /* check and process ASYNC events, per device */
+               /* check and process CQ and ASYNC events, per device */
                for (idx = 2; idx < fds; idx++) {
                        if (ufds[idx].revents == POLLIN) {
+                               dapli_cq_event_cb(uhca[idx]);
                                dapli_async_event_cb(uhca[idx]);
                        }
                }
@@ -824,7 +760,7 @@ void dapli_thread(void *arg)
                                         strerror(errno));
 
                        /* cleanup any device on list marked for destroy */
-                       for (idx = 2; idx < fds; idx++) {
+                       for (idx = 3; idx < fds; idx++) {
                                if (uhca[idx] && uhca[idx]->destroy == 1) {
                                        dapl_os_lock(&g_hca_lock);
                                        dapl_llist_remove_entry(
index 096167cb561b64ccae1e2593e834b07a936ddad3..16d4f18e07bc1d92d5354243986e5b4a4f1d79b9 100644 (file)
@@ -171,36 +171,32 @@ DAT_RETURN dapls_ib_get_async_event(IN ib_error_record_t * err_record,
  *     DAT_INSUFFICIENT_RESOURCES
  *
  */
-#if defined(_WIN32)
-
 DAT_RETURN
 dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
                  IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
 {
-       OVERLAPPED *overlap;
+       struct ibv_comp_channel *channel;
        DAT_RETURN ret;
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                     "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
 
-       evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
-                                             *cqlen, evd_ptr, NULL, 0);
+       if (!evd_ptr->cno_ptr)
+               channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
+       else
+               channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
 
-       if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE)
+       if (!channel)
                return DAT_INSUFFICIENT_RESOURCES;
 
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    " cq_object_create: (%p)\n", evd_ptr);
+       evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
+                                             *cqlen, evd_ptr, channel, 0);
 
-       overlap = &evd_ptr->ib_cq_handle->comp_entry.Overlap;
-       overlap->hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
-       if (!overlap->hEvent) {
+       if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
                ret = DAT_INSUFFICIENT_RESOURCES;
                goto err;
        }
 
-       overlap->hEvent = (HANDLE) ((ULONG_PTR) overlap->hEvent | 1);
-
        /* arm cq for events */
        dapls_set_cq_notify(ia_ptr, evd_ptr);
 
@@ -214,7 +210,8 @@ dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
        return DAT_SUCCESS;
 
 err:
-       ibv_destroy_cq(evd_ptr->ib_cq_handle);
+       if (!evd_ptr->cno_ptr)
+               ibv_destroy_comp_channel(channel);
        return ret;
 }
 
@@ -239,18 +236,18 @@ DAT_RETURN dapls_ib_cq_free(IN DAPL_IA * ia_ptr, IN DAPL_EVD * evd_ptr)
 {
        DAT_EVENT event;
        ib_work_completion_t wc;
-       HANDLE hevent;
+       struct ibv_comp_channel *channel;
 
        if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
                /* pull off CQ and EVD entries and toss */
                while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
                while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
 
-               hevent = evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent;
+               channel = evd_ptr->ib_cq_handle->channel;
                if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
                        return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
-               CloseHandle(hevent);
+               if (!evd_ptr->cno_ptr)
+                       ibv_destroy_comp_channel(channel);
                evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
        }
        return DAT_SUCCESS;
@@ -262,105 +259,42 @@ dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr)
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                     " cq_object_wakeup: evd=%p\n", evd_ptr);
 
-       if (!SetEvent(evd_ptr->ib_cq_handle->comp_entry.Overlap.hEvent))
-               return DAT_INTERNAL_ERROR;
-
+       /* no wake up mechanism */
        return DAT_SUCCESS;
 }
 
-DAT_RETURN
-dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
+#if defined(_WIN32)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
 {
-       int status;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    " cq_object_wait: EVD %p time %d\n",
-                    evd_ptr, timeout);
-
-       status = WaitForSingleObject(evd_ptr->ib_cq_handle->
-                                    comp_entry.Overlap.hEvent,
-                                    timeout / 1000);
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    " cq_object_wait: EVD %p status 0x%x\n",
-                    evd_ptr, status);
-       if (status)
-               return DAT_TIMEOUT_EXPIRED;
-
-       InterlockedExchange(&evd_ptr->ib_cq_handle->comp_entry.Busy, 0);
-       return DAT_SUCCESS;
+       channel->comp_channel.Milliseconds =
+               (timeout == DAT_TIMEOUT_INFINITE) ? INFINITE : timeout / 1000;
+       return 0;
 }
 
 #else // WIN32
 
-DAT_RETURN
-dapls_ib_cq_alloc(IN DAPL_IA * ia_ptr,
-                 IN DAPL_EVD * evd_ptr, IN DAT_COUNT * cqlen)
-{
-       struct ibv_comp_channel *channel;
-       DAT_RETURN ret;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen);
-
-       channel = ibv_create_comp_channel(ia_ptr->hca_ptr->ib_hca_handle);
-       if (!channel)
-               return DAT_INSUFFICIENT_RESOURCES;
-
-       evd_ptr->ib_cq_handle = ibv_create_cq(ia_ptr->hca_ptr->ib_hca_handle,
-                                             *cqlen, evd_ptr, channel, 0);
-
-       if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE) {
-               ret = DAT_INSUFFICIENT_RESOURCES;
-               goto err;
-       }
-
-       /* arm cq for events */
-       dapls_set_cq_notify(ia_ptr, evd_ptr);
-
-       /* update with returned cq entry size */
-       *cqlen = evd_ptr->ib_cq_handle->cqe;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    "dapls_ib_cq_alloc: new_cq %p cqlen=%d \n",
-                    evd_ptr->ib_cq_handle, *cqlen);
-
-       return DAT_SUCCESS;
-
-err:
-       ibv_destroy_comp_channel(channel);
-       return ret;
-}
-
-DAT_RETURN dapls_ib_cq_free(IN DAPL_IA * ia_ptr, IN DAPL_EVD * evd_ptr)
+static int
+dapls_wait_comp_channel(IN struct ibv_comp_channel *channel, IN uint32_t timeout)
 {
-       DAT_EVENT event;
-       ib_work_completion_t wc;
-       struct ibv_comp_channel *channel;
-
-       if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
-               /* pull off CQ and EVD entries and toss */
-               while (ibv_poll_cq(evd_ptr->ib_cq_handle, 1, &wc) == 1) ;
-               while (dapl_evd_dequeue(evd_ptr, &event) == DAT_SUCCESS) ;
-
-               channel = evd_ptr->ib_cq_handle->channel;
-               if (ibv_destroy_cq(evd_ptr->ib_cq_handle))
-                       return (dapl_convert_errno(errno, "ibv_destroy_cq"));
-
-               ibv_destroy_comp_channel(channel);
-               evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
-       }
-       return DAT_SUCCESS;
-}
-
-DAT_RETURN
-dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr)
-{
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                    " cq_object_wakeup: evd=%p\n", evd_ptr);
+       int status, timeout_ms;
+       struct pollfd cq_fd = {
+               .fd = channel->fd,
+               .events = POLLIN,
+               .revents = 0
+       };
 
-       /* no wake up mechanism */
-       return DAT_SUCCESS;
+       /* uDAPL timeout values in usecs */
+       timeout_ms = (timeout == DAT_TIMEOUT_INFINITE) ? -1 : timeout / 1000;
+       status = poll(&cq_fd, 1, timeout_ms);
+       if (status > 0)
+               return 0;
+       else if (status == 0)
+               return ETIMEDOUT;
+       else
+               return status;
 }
+#endif
 
 DAT_RETURN
 dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
@@ -368,43 +302,45 @@ dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout)
        struct ibv_comp_channel *channel = evd_ptr->ib_cq_handle->channel;
        struct ibv_cq *ibv_cq = NULL;
        void *context;
-       int status = 0;
-       int timeout_ms = -1;
-       struct pollfd cq_fd = {
-               .fd = channel->fd,
-               .events = POLLIN,
-               .revents = 0
-       };
+       int status;
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                     " cq_object_wait: EVD %p time %d\n",
                     evd_ptr, timeout);
 
-       /* uDAPL timeout values in usecs */
-       if (timeout != DAT_TIMEOUT_INFINITE)
-               timeout_ms = timeout / 1000;
-
-       status = poll(&cq_fd, 1, timeout_ms);
-
-       /* returned event */
-       if (status > 0) {
+       status = dapls_wait_comp_channel(channel, timeout);
+       if (!status) {
                if (!ibv_get_cq_event(channel, &ibv_cq, &context)) {
                        ibv_ack_cq_events(ibv_cq, 1);
                }
-               status = 0;
-
-               /* timeout */
-       } else if (status == 0)
-               status = ETIMEDOUT;
+       }
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                     " cq_object_wait: RET evd %p ibv_cq %p %s\n",
                     evd_ptr, ibv_cq, strerror(errno));
 
-       return (dapl_convert_errno(status, "cq_wait_object_wait"));
+       return dapl_convert_errno(status, "cq_wait_object_wait");
+}
 
+void dapli_cq_event_cb(struct _ib_hca_transport *tp)
+{
+       /* check all comp events on this device */
+       struct dapl_evd *evd = NULL;
+       struct ibv_cq   *ibv_cq = NULL;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", tp);
+
+       while (!ibv_get_cq_event(tp->ib_cq, &ibv_cq, (void*)&evd)) {
+
+               if (!DAPL_BAD_HANDLE(evd, DAPL_MAGIC_EVD)) {
+                       /* Both EVD or EVD->CNO event via callback */
+                       dapl_evd_dto_callback(tp->ib_ctx, 
+                                             evd->ib_cq_handle, (void*)evd);
+               }
+
+               ibv_ack_cq_events(ibv_cq, 1);
+       } 
 }
-#endif
 
 /*
  * dapl_ib_cq_resize
index 0b417b855cf6c29c0aac0f795f7716d763ee3498..2195767cee4096115e6c49bd8e8994d2272ca6f9 100644 (file)
@@ -208,6 +208,8 @@ typedef uint32_t ib_shm_transport_t;
 /* prototypes */
 int32_t        dapls_ib_init(void);
 int32_t        dapls_ib_release(void);
+
+/* util.c */
 enum ibv_mtu dapl_ib_mtu(int mtu);
 char *dapl_ib_mtu_str(enum ibv_mtu mtu);
 DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len);
index da913c56a0c424229d3a5785c60a0d5685d56483..3963e1f9b1bc9755c9cbae580076fbb46c8b42cd 100644 (file)
@@ -320,6 +320,104 @@ DAT_RETURN dapls_ib_setup_async_callback(IN DAPL_IA * ia_ptr,
        return DAT_SUCCESS;
 }
 
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+       struct ibv_async_event event;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n", hca);
+
+       if (hca->destroy)
+               return;
+
+       if (!ibv_get_async_event(hca->ib_ctx, &event)) {
+
+               switch (event.event_type) {
+               case IBV_EVENT_CQ_ERR:
+               {
+                       struct dapl_ep *evd_ptr =
+                               event.element.cq->cq_context;
+
+                       dapl_log(DAPL_DBG_TYPE_ERR,
+                                "dapl async_event CQ (%p) ERR %d\n",
+                                evd_ptr, event.event_type);
+
+                       /* report up if async callback still setup */
+                       if (hca->async_cq_error)
+                               hca->async_cq_error(hca->ib_ctx,
+                                                   event.element.cq,
+                                                   &event,
+                                                   (void *)evd_ptr);
+                       break;
+               }
+               case IBV_EVENT_COMM_EST:
+               {
+                       /* Received msgs on connected QP before RTU */
+                       dapl_log(DAPL_DBG_TYPE_UTIL,
+                                " async_event COMM_EST(%p) rdata beat RTU\n",
+                                event.element.qp);
+
+                       break;
+               }
+               case IBV_EVENT_QP_FATAL:
+               case IBV_EVENT_QP_REQ_ERR:
+               case IBV_EVENT_QP_ACCESS_ERR:
+               case IBV_EVENT_QP_LAST_WQE_REACHED:
+               case IBV_EVENT_SRQ_ERR:
+               case IBV_EVENT_SRQ_LIMIT_REACHED:
+               case IBV_EVENT_SQ_DRAINED:
+               {
+                       struct dapl_ep *ep_ptr =
+                               event.element.qp->qp_context;
+
+                       dapl_log(DAPL_DBG_TYPE_ERR,
+                                "dapl async_event QP (%p) ERR %d\n",
+                                ep_ptr, event.event_type);
+
+                       /* report up if async callback still setup */
+                       if (hca->async_qp_error)
+                               hca->async_qp_error(hca->ib_ctx,
+                                                   ep_ptr->qp_handle,
+                                                   &event,
+                                                   (void *)ep_ptr);
+                       break;
+               }
+               case IBV_EVENT_PATH_MIG:
+               case IBV_EVENT_PATH_MIG_ERR:
+               case IBV_EVENT_DEVICE_FATAL:
+               case IBV_EVENT_PORT_ACTIVE:
+               case IBV_EVENT_PORT_ERR:
+               case IBV_EVENT_LID_CHANGE:
+               case IBV_EVENT_PKEY_CHANGE:
+               case IBV_EVENT_SM_CHANGE:
+               {
+                       dapl_log(DAPL_DBG_TYPE_WARN,
+                                "dapl async_event: DEV ERR %d\n",
+                                event.event_type);
+
+                       /* report up if async callback still setup */
+                       if (hca->async_unafiliated)
+                               hca->async_unafiliated(hca->ib_ctx, 
+                                                      &event,  
+                                                      hca->async_un_ctx);
+                       break;
+               }
+               case IBV_EVENT_CLIENT_REREGISTER:
+                       /* no need to report this event this time */
+                       dapl_log(DAPL_DBG_TYPE_UTIL,
+                                " async_event: IBV_CLIENT_REREGISTER\n");
+                       break;
+
+               default:
+                       dapl_log(DAPL_DBG_TYPE_WARN,
+                                "dapl async_event: %d UNKNOWN\n",
+                                event.event_type);
+                       break;
+
+               }
+               ibv_ack_async_event(&event);
+       }
+}
+
 /*
  * dapls_set_provider_specific_attr
  *
index a5e734e5c29a0b9cee5a2438df127b4c4422138f..933364cac9b73951274bdd1c4588bffe24925d6e 100644 (file)
@@ -78,8 +78,11 @@ typedef dp_ib_cm_handle_t    ib_cm_srvc_handle_t;
 /* ib_hca_transport_t, specific to this implementation */
 typedef struct _ib_hca_transport
 { 
+       struct dapl_llist_entry entry;
+       int                     destroy;
        union ibv_gid           gid;
        struct  ibv_device      *ib_dev;
+       struct  ibv_context     *ib_ctx;
        ib_cq_handle_t          ib_cq_empty;
        DAPL_OS_LOCK            cq_lock;        
        int                     max_inline_send;
@@ -114,6 +117,8 @@ typedef struct _ib_hca_transport
 void cr_thread(void *arg);
 int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
 void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
+void dapli_async_event_cb(struct _ib_hca_transport *tp);
+void dapli_cq_event_cb(struct _ib_hca_transport *tp);
 DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr);
 void dapls_print_cm_list(IN DAPL_IA *ia_ptr);
 dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep);
index d5089aac16207283ecaa7a015dbf71545a4a6d7e..9c91b78f4c389d0762052829581ea3c136aa8e72 100644 (file)
@@ -57,6 +57,96 @@ static const char rcsid[] = "$Id:  $";
 
 #include <stdlib.h>
 
+ib_thread_state_t g_ib_thread_state = 0;
+DAPL_OS_THREAD g_ib_thread;
+DAPL_OS_LOCK g_hca_lock;
+struct dapl_llist_entry *g_hca_list;
+
+void dapli_thread(void *arg);
+DAT_RETURN  dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+
+#if defined(_WIN64) || defined(_WIN32)
+#include "..\..\..\..\..\etc\user\comp_channel.cpp"
+#include <rdma\winverbs.h>
+
+struct ibvw_windata windata;
+
+static int dapls_os_init(void)
+{
+       return ibvw_get_windata(&windata, IBVW_WINDATA_VERSION);
+}
+
+static void dapls_os_release(void)
+{
+       if (windata.comp_mgr)
+               ibvw_release_windata(&windata, IBVW_WINDATA_VERSION);
+       windata.comp_mgr = NULL;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+       verbs->channel.Milliseconds = 0;
+       return 0;
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       channel->comp_channel.Milliseconds = 0;
+       return 0;
+}
+
+static int dapls_thread_signal(void)
+{
+       CompManagerCancel(windata.comp_mgr);
+       return 0;
+}
+#else                          // _WIN64 || WIN32
+int g_ib_pipe[2];
+
+static int dapls_os_init(void)
+{
+       /* create pipe for waking up work thread */
+       return pipe(g_ib_pipe);
+}
+
+static void dapls_os_release(void)
+{
+       /* close pipe? */
+}
+
+static int dapls_config_fd(int fd)
+{
+       int opts;
+
+       opts = fcntl(fd, F_GETFL);
+       if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+               dapl_log(DAPL_DBG_TYPE_ERR,
+                        " dapls_config_fd: fcntl on fd %d ERR %d %s\n",
+                        fd, opts, strerror(errno));
+               return errno;
+       }
+
+       return 0;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+       return dapls_config_fd(verbs->async_fd);
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       return dapls_config_fd(channel->fd);
+}
+
+static int dapls_thread_signal(void)
+{
+       return write(g_ib_pipe[1], "w", sizeof "w");
+}
+#endif
+
+
 static int32_t create_cr_pipe(IN DAPL_HCA * hca_ptr)
 {
        DAPL_SOCKET listen_socket;
@@ -130,35 +220,22 @@ static void destroy_cr_pipe(IN DAPL_HCA * hca_ptr)
  */
 int32_t dapls_ib_init(void)
 {
-       return 0;
-}
+       /* initialize hca_list */
+       dapl_os_lock_init(&g_hca_lock);
+       dapl_llist_init_head(&g_hca_list);
 
-int32_t dapls_ib_release(void)
-{
-       return 0;
-}
+       if (dapls_os_init())
+               return 1;
 
-#if defined(_WIN64) || defined(_WIN32)
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
        return 0;
 }
-#else                          // _WIN64 || WIN32
-int dapls_config_comp_channel(struct ibv_comp_channel *channel)
-{
-       int opts;
-
-       opts = fcntl(channel->fd, F_GETFL);     /* uCQ */
-       if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n",
-                        channel->fd, opts, strerror(errno));
-               return errno;
-       }
 
+int32_t dapls_ib_release(void)
+{
+       dapli_ib_thread_destroy();
+       dapls_os_release();
        return 0;
 }
-#endif
 
 /*
  * dapls_ib_open_hca
@@ -213,7 +290,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                 " open_hca: device %s not found\n", hca_name);
        goto err;
 
-      found:
+found:
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " open_hca: Found dev %s %016llx\n",
                     ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
                     (unsigned long long)
@@ -227,6 +304,8 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                         strerror(errno));
                goto err;
        }
+       hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
+       dapls_config_verbs(hca_ptr->ib_hca_handle);
 
        /* get lid for this hca-port, network order */
        if (ibv_query_port(hca_ptr->ib_hca_handle,
@@ -271,15 +350,8 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
        hca_ptr->ib_trans.mtu =
            dapl_ib_mtu(dapl_os_get_env_val("DAPL_IB_MTU", SCM_IB_MTU));
 
-#ifndef CQ_WAIT_OBJECT
-       /* initialize cq_lock */
-       dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
-       if (dat_status != DAT_SUCCESS) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " open_hca: failed to init cq_lock\n");
-               goto bail;
-       }
-       /* EVD events without direct CQ channels, non-blocking */
+
+       /* EVD events without direct CQ channels, CNO support */
        hca_ptr->ib_trans.ib_cq =
            ibv_create_comp_channel(hca_ptr->ib_hca_handle);
        if (hca_ptr->ib_trans.ib_cq == NULL) {
@@ -288,18 +360,28 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
                         strerror(errno));
                goto bail;
        }
-
-       if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
-               goto bail;
-       }
-
-       if (dapli_cq_thread_init(hca_ptr)) {
+       dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq);
+       
+       dat_status = dapli_ib_thread_init();
+       if (dat_status != DAT_SUCCESS) {
                dapl_log(DAPL_DBG_TYPE_ERR,
-                        " open_hca: cq_thread_init failed for %s\n",
-                        ibv_get_device_name(hca_ptr->ib_trans.ib_dev));
+                        " open_hca: failed to init cq thread lock\n");
                goto bail;
        }
-#endif                         /* CQ_WAIT_OBJECT */
+       /* 
+        * Put new hca_transport on list for async and CQ event processing 
+        * Wakeup work thread to add to polling list
+        */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&hca_ptr->ib_trans.entry);
+       dapl_os_lock(&g_hca_lock);
+       dapl_llist_add_tail(&g_hca_list,
+                           (DAPL_LLIST_ENTRY *) &hca_ptr->ib_trans.entry,
+                           &hca_ptr->ib_trans.entry);
+       if (dapls_thread_signal() == -1)
+               dapl_log(DAPL_DBG_TYPE_UTIL,
+                        " open_hca: thread wakeup error = %s\n",
+                        strerror(errno));
+       dapl_os_unlock(&g_hca_lock);
 
        /* initialize cr_list lock */
        dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
@@ -333,7 +415,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
 
        /* wait for thread */
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-               dapl_os_sleep_usec(2000);
+               dapl_os_sleep_usec(1000);
        }
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
@@ -380,33 +462,297 @@ DAT_RETURN dapls_ib_close_hca(IN DAPL_HCA * hca_ptr)
 {
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " close_hca: %p\n", hca_ptr);
 
-#ifndef CQ_WAIT_OBJECT
-       dapli_cq_thread_destroy(hca_ptr);
-       dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
-#endif                         /* CQ_WAIT_OBJECT */
-
        if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
                if (ibv_close_device(hca_ptr->ib_hca_handle))
                        return (dapl_convert_errno(errno, "ib_close_device"));
                hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
        }
 
+       dapl_os_lock(&g_hca_lock);
+       if (g_ib_thread_state != IB_THREAD_RUN) {
+               dapl_os_unlock(&g_hca_lock);
+               return (DAT_SUCCESS);
+       }
+       dapl_os_unlock(&g_hca_lock);
+
        /* destroy cr_thread and lock */
        hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
-       if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
-               dapl_log(DAPL_DBG_TYPE_UTIL,
-                        " thread_destroy: thread wakeup err = %s\n",
-                        strerror(errno));
+       send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                             " close_hca: waiting for cr_thread\n");
-               if (send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
-                       dapl_log(DAPL_DBG_TYPE_UTIL,
-                                " thread_destroy: thread wakeup err = %s\n",
-                                strerror(errno));
-               dapl_os_sleep_usec(2000);
+               send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
+               dapl_os_sleep_usec(1000);
        }
        dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
        destroy_cr_pipe(hca_ptr); /* no longer need pipe */
+       
+       /* 
+        * Remove hca from async event processing list
+        * Wakeup work thread to remove from polling list
+        */
+       hca_ptr->ib_trans.destroy = 1;
+       if (dapls_thread_signal() == -1)
+               dapl_log(DAPL_DBG_TYPE_UTIL,
+                        " destroy: thread wakeup error = %s\n",
+                        strerror(errno));
+
+       /* wait for thread to remove HCA references */
+       while (hca_ptr->ib_trans.destroy != 2) {
+               if (dapls_thread_signal() == -1)
+                       dapl_log(DAPL_DBG_TYPE_UTIL,
+                                " destroy: thread wakeup error = %s\n",
+                                strerror(errno));
+               dapl_os_sleep_usec(1000);
+       }
+
        return (DAT_SUCCESS);
 }
+
+DAT_RETURN dapli_ib_thread_init(void)
+{
+       DAT_RETURN dat_status;
+
+       dapl_os_lock(&g_hca_lock);
+       if (g_ib_thread_state != IB_THREAD_INIT) {
+               dapl_os_unlock(&g_hca_lock);
+               return DAT_SUCCESS;
+       }
+
+       g_ib_thread_state = IB_THREAD_CREATE;
+       dapl_os_unlock(&g_hca_lock);
+
+       /* create thread to process inbound connect request */
+       dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+       if (dat_status != DAT_SUCCESS)
+               return (dapl_convert_errno(errno,
+                                          "create_thread ERR:"
+                                          " check resource limits"));
+
+       /* wait for thread to start */
+       dapl_os_lock(&g_hca_lock);
+       while (g_ib_thread_state != IB_THREAD_RUN) {
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                            " ib_thread_init: waiting for ib_thread\n");
+               dapl_os_unlock(&g_hca_lock);
+               dapl_os_sleep_usec(1000);
+               dapl_os_lock(&g_hca_lock);
+       }
+       dapl_os_unlock(&g_hca_lock);
+
+       return DAT_SUCCESS;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+       int retries = 10;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                    " ib_thread_destroy(%d)\n", dapl_os_getpid());
+       /* 
+        * wait for async thread to terminate. 
+        * pthread_join would be the correct method
+        * but some applications have some issues
+        */
+
+       /* destroy ib_thread, wait for termination, if not already */
+       dapl_os_lock(&g_hca_lock);
+       if (g_ib_thread_state != IB_THREAD_RUN)
+               goto bail;
+
+       g_ib_thread_state = IB_THREAD_CANCEL;
+       if (dapls_thread_signal() == -1)
+               dapl_log(DAPL_DBG_TYPE_UTIL,
+                        " destroy: thread wakeup error = %s\n",
+                        strerror(errno));
+       while ((g_ib_thread_state != IB_THREAD_EXIT) && (retries--)) {
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                            " ib_thread_destroy: waiting for ib_thread\n");
+               if (dapls_thread_signal() == -1)
+                       dapl_log(DAPL_DBG_TYPE_UTIL,
+                                " destroy: thread wakeup error = %s\n",
+                                strerror(errno));
+               dapl_os_unlock(&g_hca_lock);
+               dapl_os_sleep_usec(2000);
+               dapl_os_lock(&g_hca_lock);
+       }
+bail:
+       dapl_os_unlock(&g_hca_lock);
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                    " ib_thread_destroy(%d) exit\n", dapl_os_getpid());
+}
+
+
+#if defined(_WIN64) || defined(_WIN32)
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+       struct _ib_hca_transport *hca;
+       struct _ib_hca_transport *uhca[8];
+       COMP_CHANNEL *channel;
+       int ret, idx, cnt;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d,0x%x): ENTER: \n",
+                    dapl_os_getpid(), g_ib_thread);
+
+       dapl_os_lock(&g_hca_lock);
+       for (g_ib_thread_state = IB_THREAD_RUN;
+            g_ib_thread_state == IB_THREAD_RUN; 
+            dapl_os_lock(&g_hca_lock)) {
+
+               idx = 0;
+               hca = dapl_llist_is_empty(&g_hca_list) ? NULL :
+                     dapl_llist_peek_head(&g_hca_list);
+
+               while (hca) {
+                       uhca[idx++] = hca;
+                       hca = dapl_llist_next_entry(&g_hca_list,
+                                                   (DAPL_LLIST_ENTRY *)
+                                                   &hca->entry);
+               }
+               cnt = idx;
+
+               dapl_os_unlock(&g_hca_lock);
+               ret = CompManagerPoll(windata.comp_mgr, INFINITE, &channel);
+
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                            " ib_thread(%d) poll_event 0x%x\n",
+                            dapl_os_getpid(), ret);
+
+
+               /* check and process ASYNC events, per device */
+               for (idx = 0; idx < cnt; idx++) {
+                       if (uhca[idx]->destroy == 1) {
+                               dapl_os_lock(&g_hca_lock);
+                               dapl_llist_remove_entry(&g_hca_list,
+                                                       (DAPL_LLIST_ENTRY *)
+                                                       &uhca[idx]->entry);
+                               dapl_os_unlock(&g_hca_lock);
+                               uhca[idx]->destroy = 2;
+                       } else {
+                               dapli_cq_event_cb(uhca[idx]);
+                               dapli_async_event_cb(uhca[idx]);
+                       }
+               }
+       }
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d) EXIT\n",
+                    dapl_os_getpid());
+       g_ib_thread_state = IB_THREAD_EXIT;
+       dapl_os_unlock(&g_hca_lock);
+}
+#else                          // _WIN64 || WIN32
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+       struct pollfd ufds[__FD_SETSIZE];
+       struct _ib_hca_transport *uhca[__FD_SETSIZE] = { NULL };
+       struct _ib_hca_transport *hca;
+       int ret, idx, fds;
+       char rbuf[2];
+
+       dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+                    " ib_thread(%d,0x%x): ENTER: pipe %d \n",
+                    dapl_os_getpid(), g_ib_thread, g_ib_pipe[0]);
+
+       /* Poll across pipe, CM, AT never changes */
+       dapl_os_lock(&g_hca_lock);
+       g_ib_thread_state = IB_THREAD_RUN;
+
+       ufds[0].fd = g_ib_pipe[0];      /* pipe */
+       ufds[0].events = POLLIN;
+
+       while (g_ib_thread_state == IB_THREAD_RUN) {
+
+               /* build ufds after pipe and uCMA events */
+               ufds[0].revents = 0;
+               idx = 0;
+
+               /*  Walk HCA list and setup async and CQ events */
+               if (!dapl_llist_is_empty(&g_hca_list))
+                       hca = dapl_llist_peek_head(&g_hca_list);
+               else
+                       hca = NULL;
+
+               while (hca) {
+
+                       /* uASYNC events */
+                       ufds[++idx].fd = hca->ib_ctx->async_fd;
+                       ufds[idx].events = POLLIN;
+                       ufds[idx].revents = 0;
+                       uhca[idx] = hca;
+
+                       /* CQ events are non-direct with CNO's */
+                       ufds[++idx].fd = hca->ib_cq->fd;
+                       ufds[idx].events = POLLIN;
+                       ufds[idx].revents = 0;
+                       uhca[idx] = hca;
+
+                       dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+                                    " ib_thread(%d) poll_fd: hca[%d]=%p,"
+                                    " async=%d pipe=%d \n",
+                                    dapl_os_getpid(), hca, ufds[idx - 1].fd,
+                                    ufds[0].fd);
+
+                       hca = dapl_llist_next_entry(&g_hca_list,
+                                                   (DAPL_LLIST_ENTRY *)
+                                                   &hca->entry);
+               }
+
+               /* unlock, and setup poll */
+               fds = idx + 1;
+               dapl_os_unlock(&g_hca_lock);
+               ret = poll(ufds, fds, -1);
+               if (ret <= 0) {
+                       dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+                                    " ib_thread(%d): ERR %s poll\n",
+                                    dapl_os_getpid(), strerror(errno));
+                       dapl_os_lock(&g_hca_lock);
+                       continue;
+               }
+
+               dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
+                            " ib_thread(%d) poll_event: "
+                            " async=0x%x pipe=0x%x \n",
+                            dapl_os_getpid(), ufds[idx].revents,
+                            ufds[0].revents);
+
+               /* check and process CQ and ASYNC events, per device */
+               for (idx = 1; idx < fds; idx++) {
+                       if (ufds[idx].revents == POLLIN) {
+                               dapli_cq_event_cb(uhca[idx]);
+                               dapli_async_event_cb(uhca[idx]);
+                       }
+               }
+
+               /* check and process user events, PIPE */
+               if (ufds[0].revents == POLLIN) {
+                       if (read(g_ib_pipe[0], rbuf, 2) == -1)
+                               dapl_log(DAPL_DBG_TYPE_THREAD,
+                                        " cr_thread: pipe rd err= %s\n",
+                                        strerror(errno));
+
+                       /* cleanup any device on list marked for destroy */
+                       for (idx = 1; idx < fds; idx++) {
+                               if (uhca[idx] && uhca[idx]->destroy == 1) {
+                                       dapl_os_lock(&g_hca_lock);
+                                       dapl_llist_remove_entry(
+                                               &g_hca_list,
+                                               (DAPL_LLIST_ENTRY*)
+                                               &uhca[idx]->entry);
+                                       dapl_os_unlock(&g_hca_lock);
+                                       uhca[idx]->destroy = 2;
+                               }
+                       }
+               }
+               dapl_os_lock(&g_hca_lock);
+       }
+
+       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " ib_thread(%d) EXIT\n",
+                    dapl_os_getpid());
+       g_ib_thread_state = IB_THREAD_EXIT;
+       dapl_os_unlock(&g_hca_lock);
+}
+#endif
index 77d78b2bbf900724a9c9f767ab1b6a278ab680ec..739ccca32d00c1dc9596d7c53b1c6c5fd366cc34 100755 (executable)
@@ -689,10 +689,9 @@ send_msg(void *data,
                                LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                          getpid(), evd);
                                if (evd != h_dto_req_evd) {
-                                       fprintf(stderr,
-                                               "%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
-                                               getpid());
-                                       return (DAT_ABORT);
+                                       /* CNO timeout, already on EVD */
+                                       if (evd != NULL)
+                                               return (ret);
                                }
                        }
                        /* use wait to dequeue */
@@ -1085,10 +1084,9 @@ DAT_RETURN connect_ep(char *hostname, DAT_CONN_QUAL conn_id)
                        LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                  getpid(), evd);
                        if (evd != h_dto_rcv_evd) {
-                               fprintf(stderr,
-                                       "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-                                       getpid());
-                               return (DAT_ABORT);
+                               /* CNO timeout, already on EVD */
+                               if (evd != NULL)
+                                       return (ret);
                        }
                }
                /* use wait to dequeue */
@@ -1319,10 +1317,9 @@ DAT_RETURN do_rdma_write_with_msg(void)
                        LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                  getpid(), evd);
                        if (evd != h_dto_rcv_evd) {
-                               fprintf(stderr,
-                                       "%d Error waiting on h_dto_cno: "
-                                       "evd != h_dto_rcv_evd\n", getpid());
-                               return (ret);
+                               /* CNO timeout, already on EVD */
+                               if (evd != NULL)
+                                       return (ret);
                        }
                }
                /* use wait to dequeue */
@@ -1446,10 +1443,9 @@ DAT_RETURN do_rdma_read_with_msg(void)
                                LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                          getpid(), evd);
                                if (evd != h_dto_req_evd) {
-                                       fprintf(stderr,
-                                               "%d Error waiting on h_dto_cno: evd != h_dto_req_evd\n",
-                                               getpid());
-                                       return (DAT_ABORT);
+                                       /* CNO timeout, already on EVD */
+                                       if (evd != NULL)
+                                               return (ret);
                                }
                        }
                        /* use wait to dequeue */
@@ -1501,6 +1497,15 @@ DAT_RETURN do_rdma_read_with_msg(void)
         */
        printf("%d Sending RDMA read completion message\n", getpid());
 
+       /* give remote chance to process read completes */
+       if (use_cno) {
+#if defined(_WIN32) || defined(_WIN64)
+               Sleep(1000);
+#else
+               sleep(1);
+#endif
+       }
+
        ret = send_msg(&rmr_send_msg,
                       sizeof(DAT_RMR_TRIPLET),
                       lmr_context_send_msg,
@@ -1525,14 +1530,14 @@ DAT_RETURN do_rdma_read_with_msg(void)
                LOGPRINTF("%d waiting for message receive event\n", getpid());
                if (use_cno) {
                        DAT_EVD_HANDLE evd = DAT_HANDLE_NULL;
-                       ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
+                       
+                       ret = dat_cno_wait(h_dto_cno, DTO_TIMEOUT, &evd);
                        LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                  getpid(), evd);
                        if (evd != h_dto_rcv_evd) {
-                               fprintf(stderr,
-                                       "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-                                       getpid());
-                               return (ret);
+                               /* CNO timeout, already on EVD */
+                               if (evd != NULL)
+                                       return (ret);
                        }
                }
                /* use wait to dequeue */
@@ -1693,10 +1698,9 @@ DAT_RETURN do_ping_pong_msg()
                                LOGPRINTF("%d cno wait return evd_handle=%p\n",
                                          getpid(), evd);
                                if (evd != h_dto_rcv_evd) {
-                                       fprintf(stderr,
-                                               "%d Error waiting on h_dto_cno: evd != h_dto_rcv_evd\n",
-                                               getpid());
-                                       return (ret);
+                                       /* CNO timeout, already on EVD */
+                                       if (evd != NULL)
+                                               return (ret);
                                }
                        }
                        /* use wait to dequeue */