]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
r3326: Changes to support async events. Also consolidated the uAT,uCM,uCQ threads...
authorArlin Davis <ardavis@ichips.intel.com>
Tue, 6 Sep 2005 19:34:46 +0000 (19:34 +0000)
committerJames Lentini <jlentini@netapp.com>
Tue, 6 Sep 2005 19:34:46 +0000 (19:34 +0000)
Signed-off-by: Arlin Davis <ardavis@ichips.intel.com>
Signed-off-by: James Lentini <jlentini@netapp.com>
dapl/openib/dapl_ib_cm.c
dapl/openib/dapl_ib_cq.c
dapl/openib/dapl_ib_util.c
dapl/openib/dapl_ib_util.h

index 99a475ff38b9997cb56b058d14b59f8e4977eae0..dcdaf785e05190a441dee337628e16aa76e68778 100644 (file)
@@ -70,90 +70,6 @@ static inline uint64_t cpu_to_be64(uint64_t x) { return bswap_64(x); }
 static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
 #endif
 
-static int                     g_at_destroy;
-static DAPL_OS_THREAD          g_at_thread;
-static int                     g_cm_destroy;
-static DAPL_OS_THREAD          g_cm_thread;
-static DAPL_OS_LOCK            g_cm_lock;
-static struct dapl_llist_entry *g_cm_list;     
-
-int dapli_cm_thread_init(void)
-{
-       DAT_RETURN dat_status;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_init(%d)\n", getpid());
-
-        /* initialize cr_list lock */
-       dapl_os_lock_init(&g_cm_lock);
-       
-       /* initialize CM list for listens on this HCA */
-       dapl_llist_init_head(&g_cm_list);
-
-       /* create thread to process inbound connect request */
-       dat_status = dapl_os_thread_create(cm_thread, NULL, &g_cm_thread);
-       if (dat_status != DAT_SUCCESS)
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " cm_thread_init: failed to create thread\n");
-               return 1;
-       }
-       return 0;
-}
-
-void dapli_cm_thread_destroy(void)
-{
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d)\n", getpid());
-
-       /* destroy cr_thread and lock */
-       g_cm_destroy = 1;
-       pthread_kill( g_cm_thread, SIGUSR1 );
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) SIGUSR1 sent\n",getpid());
-       while (g_cm_destroy) {
-               struct timespec sleep, remain;
-               sleep.tv_sec = 0;
-               sleep.tv_nsec = 10000000; /* 10 ms */
-               dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                            " cm_thread_destroy: waiting for cm_thread\n");
-               nanosleep (&sleep, &remain);
-       }
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
-}
-
-int dapli_at_thread_init(void)
-{
-       DAT_RETURN dat_status;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
-
-       /* create thread to process AT async requests */
-       dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
-       if (dat_status != DAT_SUCCESS)
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " at_thread_init: failed to create thread\n");
-               return 1;
-       }
-       return 0;
-}
-
-void dapli_at_thread_destroy(void)
-{
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
-
-       /* destroy cr_thread and lock */
-       g_at_destroy = 1;
-       pthread_kill( g_at_thread, SIGUSR1 );
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid());
-       while (g_at_destroy) {
-               struct timespec sleep, remain;
-               sleep.tv_sec = 0;
-               sleep.tv_nsec = 10000000; /* 10 ms */
-               dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                            " at_thread_destroy: waiting for at_thread\n");
-               nanosleep (&sleep, &remain);
-       }
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
-}
 
 void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
 {
@@ -348,12 +264,6 @@ static void dapli_destroy_cm_id(struct dapl_cm_id *conn)
                if (conn->ep)
                        conn->ep->cm_handle = IB_INVALID_HANDLE;
 
-               /* take off the CM thread work queue and free */
-               dapl_os_lock( &g_cm_lock );
-               dapl_llist_remove_entry(&g_cm_list, 
-                                       (DAPL_LLIST_ENTRY*)&conn->entry);
-               dapl_os_unlock(&g_cm_lock);
-
                dapl_os_free(conn, sizeof(*conn));
        }
 }
@@ -426,8 +336,8 @@ static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id  *conn,
        if (new_conn) {
 
                (void)dapl_os_memzero(new_conn, sizeof(*new_conn));
-               dapl_os_lock_init(&new_conn->lock);
                new_conn->cm_id = event->cm_id; /* provided by uCM */
+               event->cm_id->context = new_conn; /* update CM_ID context */
                new_conn->sp = conn->sp;
                new_conn->hca = conn->hca;
                new_conn->service_id = conn->service_id;
@@ -444,13 +354,6 @@ static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id  *conn,
                                event->param.req_rcvd.primary_path,
                                sizeof(struct ib_sa_path_rec));
                                
-               /* put new CR on CM thread event work queue */
-               dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&new_conn->entry);
-               dapl_os_lock( &g_cm_lock );
-               dapl_llist_add_tail(&g_cm_list, 
-                               (DAPL_LLIST_ENTRY*)&new_conn->entry, new_conn);
-               dapl_os_unlock(&g_cm_lock);
-
                dapl_dbg_log(DAPL_DBG_TYPE_CM, " passive_cb: "
                                "REQ on HCA %p SP %p SID %d L_ID %d new_id %d p_data %p\n",
                                new_conn->hca, new_conn->sp, 
@@ -521,18 +424,13 @@ static int dapli_cm_active_cb(struct dapl_cm_id *conn,
                if (conn->ep)
                        conn->ep->cm_handle = IB_INVALID_HANDLE;
                
-               /* take off the CM thread work queue and free */
-               dapl_os_lock( &g_cm_lock );
-               dapl_llist_remove_entry(&g_cm_list, 
-                                       (DAPL_LLIST_ENTRY*)&conn->entry);
-               dapl_os_unlock(&g_cm_lock);
                dapl_os_free(conn, sizeof(*conn));
        }
        return(destroy);
 }
 
 static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
-                               struct ib_cm_event *event)
+                              struct ib_cm_event *event)
 {
        int destroy;
        struct dapl_cm_id *new_conn;
@@ -541,9 +439,6 @@ static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
                     " passive_cb: conn %p id %d event %d\n",
                     conn, conn->cm_id, event->event );
 
-       if (conn->cm_id == 0)
-               return 0;
-
        dapl_os_lock(&conn->lock);
        if (conn->destroy) {
                dapl_os_unlock(&conn->lock);
@@ -608,155 +503,11 @@ static int dapli_cm_passive_cb(struct dapl_cm_id *conn,
                if (conn->ep)
                        conn->ep->cm_handle = IB_INVALID_HANDLE;
 
-               /* take off the CM thread work queue and free */
-               dapl_os_lock( &g_cm_lock );
-               dapl_llist_remove_entry(&g_cm_list, 
-                                       (DAPL_LLIST_ENTRY*)&conn->entry);
-               dapl_os_unlock(&g_cm_lock);
-
                dapl_os_free(conn, sizeof(*conn));
        }
        return(destroy);
 }
 
-/* something to catch the signal */
-static void ib_sig_handler(int signum)
-{
-       return;
-}
-
-/* async CM processing thread */
-void cm_thread(void *arg) 
-{
-       struct dapl_cm_id       *conn, *next_conn;
-       struct ib_cm_event      *event;
-       struct pollfd           ufds;
-       sigset_t                sigset;
-
-       dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                     " cm_thread(%d,0x%x): ENTER: cm_fd %d\n",
-                     getpid(), g_cm_thread, ib_cm_get_fd());
-
-       sigemptyset(&sigset);
-       sigaddset(&sigset, SIGUSR1);
-       pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-       signal( SIGUSR1, ib_sig_handler); 
-       
-       dapl_os_lock( &g_cm_lock );
-       while (!g_cm_destroy) {
-               struct ib_cm_id *cm_id;
-               int ret;
-
-               /* select for CM event, all events process via cm_fd */
-                ufds.fd      = ib_cm_get_fd();
-                ufds.events  = POLLIN;
-                ufds.revents = 0;
-
-               dapl_os_unlock(&g_cm_lock);
-                ret = poll(&ufds, 1, -1); 
-               if (ret <= 0) {
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                                    " cm_thread(%d): ERR %s poll\n",
-                                    getpid(),strerror(errno));
-                       dapl_os_lock(&g_cm_lock);
-                       continue;
-               }
-
-               dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                       " cm_thread: GET EVENT fd=%d n=%d\n",
-                       ib_cm_get_fd(),ret);
-
-               if (ib_cm_event_get_timed(0,&event)) { 
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                               " cm_thread: ERR %s event_get on %d\n", 
-                               strerror(errno), ib_cm_get_fd() );
-                       dapl_os_lock(&g_cm_lock);
-                       continue;
-               }
-               dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                       " cm_thread: GET EVENT fd=%d woke\n",ib_cm_get_fd());
-               dapl_os_lock(&g_cm_lock);
-
-               /* set proper cm_id */
-               if (event->event == IB_CM_REQ_RECEIVED ||
-                       event->event == IB_CM_SIDR_REQ_RECEIVED)
-                       cm_id = event->param.req_rcvd.listen_id;
-               else
-                       cm_id = event->cm_id;
-
-               dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                       " cm_thread: EVENT event(%d) cm_id=%d (%d)\n",
-                       event->event, event->cm_id, cm_id );
-
-               /* 
-                * Walk cm_list looking for connection id in event
-                * no need to walk if uCM would provide context with event
-                */
-               if (!dapl_llist_is_empty(&g_cm_list))
-                       next_conn = dapl_llist_peek_head(&g_cm_list);
-               else
-                       next_conn = NULL;
-
-               ret = 0;
-               while (next_conn) {
-                       conn = next_conn;
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                            " cm_thread: LIST cm %p c_id %d e_id %d)\n", 
-                            conn, conn->cm_id, cm_id );
-                               
-                       next_conn = dapl_llist_next_entry(
-                                       &g_cm_list,
-                                       (DAPL_LLIST_ENTRY*)&conn->entry );
-
-                       if (cm_id == conn->cm_id) {
-                               dapl_os_unlock(&g_cm_lock);
-                               if (conn->sp)
-                                       ret = dapli_cm_passive_cb(conn,event);
-                               else
-                                       ret = dapli_cm_active_cb(conn,event);
-                               dapl_os_lock(&g_cm_lock);
-                               break;
-                       }
-               } 
-               ib_cm_event_put(event);
-               if (ret) {
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                            " cm_thread: destroy cm_id %d\n",cm_id);
-                       ib_cm_destroy_id(cm_id);
-               }
-       }
-       dapl_os_unlock(&g_cm_lock);     
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread(%d) EXIT, cm_list=%s\n",
-                    getpid(),dapl_llist_is_empty(&g_cm_list) ? "EMPTY":"NOT EMPTY");
-       g_cm_destroy = 0;
-}
-
-/* async AT processing thread */
-void at_thread(void *arg) 
-{
-       sigset_t sigset;
-
-       dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                     " at_thread(%d,0x%x): ENTER: at_fd %d\n",
-                     getpid(), g_at_thread, ib_at_get_fd());
-
-       sigemptyset(&sigset);
-       sigaddset(&sigset, SIGUSR1);
-       pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-       signal(SIGUSR1, ib_sig_handler); 
-       
-       while (!g_at_destroy) {
-               /* poll forever until callback or signal */
-               if (ib_at_callback_get_timed(-1) < 0) { 
-                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                               " at_thread: SIG? ret=%s, destroy=%d\n", 
-                               strerror(errno), g_at_destroy );
-               }
-               dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
-       }
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
-       g_at_destroy = 0;
-}
 
 /************************ DAPL provider entry points **********************/
 
@@ -853,13 +604,6 @@ dapls_ib_connect (
        conn->retries = 0;
        dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
        
-       /* put on CM thread work queue */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-       dapl_os_lock( &g_cm_lock );
-       dapl_llist_add_tail(&g_cm_list, 
-                           (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-       dapl_os_unlock(&g_cm_lock);
-
        status = ib_at_route_by_ip(
                ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr, 
                ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr, 
@@ -1019,13 +763,6 @@ dapls_ib_setup_conn_listener (
        conn->hca = ia_ptr->hca_ptr;
        conn->service_id = ServiceID;
 
-       /* put on CM thread work queue */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-       dapl_os_lock( &g_cm_lock );
-       dapl_llist_add_tail(&g_cm_list, 
-                       (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-       dapl_os_unlock(&g_cm_lock);
-
        dapl_dbg_log(DAPL_DBG_TYPE_EP,
                     " setup_listener(conn=%p cm_id=%d)\n",
                     sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1345,8 +1082,6 @@ int dapls_ib_private_data_size (
        return size;
 }
 
-#ifndef SOCKET_CM
-
 /*
  * Map all socket CM event codes to the DAT equivelent.
  */
@@ -1457,7 +1192,44 @@ dapls_ib_get_cm_event (
     return ib_cm_event;
 }
 
-#endif
+void dapli_cm_event_cb()
+{
+       struct ib_cm_event *event;
+               
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_cm_event()\n");
+
+       /* process one CM event, fairness */
+       if(!ib_cm_event_get_timed(0,&event)) {
+               struct dapl_cm_id       *conn;
+               int                     ret;
+               dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                            " dapli_cm_event: EVENT=%p ID=%p CTX=%p\n",
+                            event->event, event->cm_id, 
+                            event->cm_id->context);
+               
+               /* set proper conn from cm_id context*/
+               conn = (struct dapl_cm_id*)event->cm_id->context;
+
+               if (conn->sp) 
+                       ret = dapli_cm_passive_cb(conn,event);
+               else 
+                       ret = dapli_cm_active_cb(conn,event);
+               
+               ib_cm_event_put(event);
+
+               if (ret) 
+                       ib_cm_destroy_id(conn->cm_id);
+       }
+}
+
+void dapli_at_event_cb()
+{
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_at_event_cb()\n");
+
+       /* process one AT event, fairness */
+       ib_at_callback_get_timed(0);
+}
+
 
 /*
  * Local variables:
index ae325781b4fbb0c655c748d8553f3b5b337a9c93..43808681e8989649e30f06c12e0d62fa253d1e50 100644 (file)
 #include "dapl_evd_util.h"
 #include "dapl_ring_buffer_util.h"
 #include <sys/poll.h>
-#include <signal.h>
 
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+void dapli_cq_event_cb(struct _ib_hca_transport *hca)
 {
-        DAT_RETURN dat_status;
-
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
-
-        /* create thread to process inbound connect request */
-        dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,&hca_ptr->ib_trans.cq_thread);
-        if (dat_status != DAT_SUCCESS)
-        {
-                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                             " cq_thread_init: failed to create thread\n");
-                return 1;
-        }
-        return 0;
-}
-
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
-{
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
-
-        /* destroy cr_thread and lock */
-        hca_ptr->ib_trans.cq_destroy = 1;
-        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
-        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr);
-        while (hca_ptr->ib_trans.cq_destroy != 2) {
-                struct timespec sleep, remain;
-                sleep.tv_sec = 0;
-                sleep.tv_nsec = 10000000; /* 10 ms */
-                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                             " cq_thread_destroy: waiting for cq_thread\n");
-                nanosleep (&sleep, &remain);
-        }
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
-       return;
-}
-
-/* something to catch the signal */
-static void ib_cq_handler(int signum)
-{
-        return;
-}
-
-void cq_thread( void *arg )
-{
-       struct dapl_hca *hca_ptr = arg;
-       struct dapl_evd *evd_ptr;
-       struct ibv_cq   *ibv_cq = NULL;
-       sigset_t        sigset;
-       int             status = 0; 
-
-       dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
-  
-        sigemptyset(&sigset);
-        sigaddset(&sigset,SIGUSR1);
-        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-        signal(SIGUSR1, ib_cq_handler);
-
-       /* wait on DTO event, or signal to abort */
-       while (!hca_ptr->ib_trans.cq_destroy) {
-
-               struct pollfd cq_poll = {
-                       .fd      = hca_ptr->ib_hca_handle->cq_fd[0],
+       int i;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca);
+
+       /* check all comp events on this device */
+       for(i=0;i<hca->ib_ctx->num_comp;i++) {
+               struct dapl_evd *evd_ptr = NULL;
+               struct ibv_cq   *ibv_cq = NULL;
+               struct pollfd   cq_fd = {
+                       .fd      = hca->ib_ctx->cq_fd[i],
                        .events  = POLLIN,
                        .revents = 0
                };
-
-               status = poll(&cq_poll, 1, -1);
-               if ((status == 1) &&
-                       (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) {
-       
+               if ((poll(&cq_fd, 1, 0) == 1) &&
+                       (!ibv_get_cq_event(hca->ib_ctx, i, 
+                                          &ibv_cq, (void*)&evd_ptr))) {
+
+                       /* 
+                        * TODO: ibv put event to protect against
+                        * destroy CQ race conditions?
+                        */
                        if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
                                continue;
 
                        /* process DTO event via callback */
-                       dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+                       dapl_evd_dto_callback ( hca->ib_ctx,
                                                evd_ptr->ib_cq_handle,
                                                (void*)evd_ptr );
-               } else {
-
-               }
-       } 
-       hca_ptr->ib_trans.cq_destroy = 2;
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
-       return;
+               } 
+       }
 }
+
 /*
  * Map all verbs DTO completion codes to the DAT equivelent.
  *
index 5cc3b607f27952ffd9f7fe9014ce818c73acb1e9..09fb32611c878e0be2516216b6e870692cb7c3d8 100644 (file)
@@ -55,13 +55,14 @@ static const char rcsid[] = "$Id:  $";
 
 #include <stdlib.h>
 #include <netinet/tcp.h>
-#include <sys/utsname.h>
-#include <unistd.h>    
-#include <fcntl.h>
-#include <strings.h>
-
-int g_dapl_loopback_connection = 0;
+#include <sys/poll.h>
 
+int                    g_dapl_loopback_connection = 0;
+int                    g_ib_destroy = 0;
+int                    g_ib_pipe[2];
+DAPL_OS_THREAD         g_ib_thread;
+DAPL_OS_LOCK           g_hca_lock;
+struct dapl_llist_entry        *g_hca_list;    
 
 /* just get IP address, IPv4 only for now  */
 int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
@@ -130,7 +131,18 @@ int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
 int32_t dapls_ib_init (void)
 {      
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
-       if (dapli_cm_thread_init() || dapli_at_thread_init()) 
+
+       /* initialize hca_list lock */
+       dapl_os_lock_init(&g_hca_lock);
+       
+       /* initialize hca list for CQ events */
+       dapl_llist_init_head(&g_hca_list);
+
+       /* create pipe for waking up work thread */
+       if (pipe(g_ib_pipe))
+               return 1;
+
+       if (dapli_ib_thread_init()) 
                return 1;
 
        return 0;
@@ -139,8 +151,7 @@ int32_t dapls_ib_init (void)
 int32_t dapls_ib_release (void)
 {
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
-       dapli_at_thread_destroy();
-       dapli_cm_thread_destroy();
+       dapli_ib_thread_destroy();
        return 0;
 }
 
@@ -196,6 +207,7 @@ DAT_RETURN dapls_ib_open_hca (
                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
                return DAT_INTERNAL_ERROR;
        }
+       hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle;
 
        /* set inline max with enviromment or default, get local lid and gid 0 */
        hca_ptr->ib_trans.max_inline_send = 
@@ -223,19 +235,22 @@ DAT_RETURN dapls_ib_open_hca (
                goto bail;
        }
 
-       /* one thread for each device open */
-       if (dapli_cq_thread_init(hca_ptr)) {
-               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                             " open_hca: cq_thread_init failed for %s\n", 
-                             ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-               goto bail;
-       }
-
-       /* initialize cq_lock and wait object */
-       dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
-       dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
-  
-       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
+       /* initialize hca wait object for uAT event */
+       dapl_os_wait_object_init(&hca_ptr->ib_trans.wait_object);
+
+       /* 
+        * Put new hca_transport on list for async and CQ event processing 
+        * Wakeup work thread to add to polling list
+        */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry);
+       dapl_os_lock( &g_hca_lock );
+       dapl_llist_add_tail(&g_hca_list, 
+                           (DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry, 
+                           &hca_ptr->ib_trans.entry);
+       write(g_ib_pipe[1], "w", sizeof "w");
+       dapl_os_unlock(&g_hca_lock);
+       
+       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
                      " open_hca: %s, port %d, %s  %d.%d.%d.%d INLINE_MAX=%d\n", 
                      ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
                      ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ?  "AF_INET":"AF_INET6",
@@ -245,7 +260,6 @@ DAT_RETURN dapls_ib_open_hca (
                      ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
                      hca_ptr->ib_trans.max_inline_send );
 
-
        return DAT_SUCCESS;
 
 bail:
@@ -276,16 +290,28 @@ DAT_RETURN dapls_ib_close_hca (   IN   DAPL_HCA   *hca_ptr )
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p->%p\n",
                        hca_ptr,hca_ptr->ib_hca_handle);
 
-       dapli_cq_thread_destroy(hca_ptr);
-
        if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
                if (ibv_close_device(hca_ptr->ib_hca_handle)) 
                        return(dapl_convert_errno(errno,"ib_close_device"));
                hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
        }
-       
-       dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
 
+       /* 
+        * Remove hca from async and CQ event processing list
+        * Wakeup work thread to remove from polling list
+        */
+       hca_ptr->ib_trans.destroy = 1;
+       write(g_ib_pipe[1], "w", sizeof "w");
+
+       /* wait for thread to remove HCA references */
+       while (hca_ptr->ib_trans.destroy != 2) {
+               struct timespec sleep, remain;
+               sleep.tv_sec = 0;
+               sleep.tv_nsec = 10000000; /* 10 ms */
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                            " ib_thread_destroy: waiting on hca %p destroy\n");
+               nanosleep (&sleep, &remain);
+       }
        return (DAT_SUCCESS);
 }
   
@@ -432,31 +458,285 @@ DAT_RETURN dapls_ib_setup_async_callback (
        IN  void                        *context )
 
 {
-    ib_hca_transport_t *hca_ptr;
-
-    dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
-                 " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
-                 ia_ptr, handler_type, evd_ptr, callback, context);
-
-    hca_ptr = &ia_ptr->hca_ptr->ib_trans;
-    switch(handler_type)
-    {
-       case DAPL_ASYNC_UNAFILIATED:
-           hca_ptr->async_unafiliated = callback;
-           break;
-       case DAPL_ASYNC_CQ_ERROR:
-           hca_ptr->async_cq_error = callback;
-           break;
-       case DAPL_ASYNC_CQ_COMPLETION:
-          hca_ptr->async_cq = callback;
-          break;
-       case DAPL_ASYNC_QP_ERROR:
-           hca_ptr->async_qp_error = callback;
-           break;
-       default:
-           break;
-    }
-    return DAT_SUCCESS;
+       ib_hca_transport_t      *hca_ptr;
+
+       dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+                       " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n",
+                       ia_ptr, handler_type, evd_ptr, callback, context);
+
+       hca_ptr = &ia_ptr->hca_ptr->ib_trans;
+       switch(handler_type)
+       {
+               case DAPL_ASYNC_UNAFILIATED:
+                       hca_ptr->async_unafiliated = callback;
+                       hca_ptr->async_un_ctx = context;
+                       break;
+               case DAPL_ASYNC_CQ_ERROR:
+                       hca_ptr->async_cq_error = callback;
+                       hca_ptr->async_cq_ctx = context;
+                       break;
+               case DAPL_ASYNC_CQ_COMPLETION:
+                       hca_ptr->async_cq = callback;
+                       hca_ptr->async_ctx = context;
+                       break;
+               case DAPL_ASYNC_QP_ERROR:
+                       hca_ptr->async_qp_error = callback;
+                       hca_ptr->async_qp_ctx = context;
+                       break;
+               default:
+                       break;
+       }
+       return DAT_SUCCESS;
 }
 
+int dapli_ib_thread_init(void)
+{
+       DAT_RETURN dat_status;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                    " ib_thread_init(%d)\n", getpid());
+
+       /* create thread to process inbound connect request */
+       dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread);
+       if (dat_status != DAT_SUCCESS)
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            " ib_thread_init: failed to create thread\n");
+               return 1;
+       }
+       return 0;
+}
+
+void dapli_ib_thread_destroy(void)
+{
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                    " ib_thread_destroy(%d)\n", getpid());
+
+       /* destroy ib_thread, wait for termination */
+       g_ib_destroy = 1;
+       write(g_ib_pipe[1], "w", sizeof "w");
+       while (g_ib_destroy != 2) {
+               struct timespec sleep, remain;
+               sleep.tv_sec = 0;
+               sleep.tv_nsec = 10000000; /* 10 ms */
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                            " ib_thread_destroy: waiting for ib_thread\n");
+               nanosleep(&sleep, &remain);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                    " ib_thread_destroy(%d) exit\n",getpid());
+}
+
+void dapli_async_event_cb(struct _ib_hca_transport *hca)
+{
+       struct ibv_async_event  event;
+       struct pollfd   async_fd = {
+               .fd      = hca->ib_ctx->async_fd,
+               .events  = POLLIN,
+               .revents = 0
+       };
+       
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_async_event_cb(%p)\n",hca);
+
+       if (hca->destroy)
+               return;
+
+       if ((poll(&async_fd, 1, 0)==1) &&
+               (!ibv_get_async_event(hca->ib_ctx, &event))) {
+
+               switch (event.event_type) {
+
+                       case    IBV_EVENT_CQ_ERR:
+                       {
+                               dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                            " dapli_async_event CQ ERR %d\n",
+                                            event.event_type);                         
+                               
+                               /* report up if async callback still setup */
+                               if (hca->async_cq_error)
+                                       hca->async_cq_error(hca->ib_ctx,
+                                                           &event,
+                                                           hca->async_cq_ctx);
+                               break;
+                       }
+                       case    IBV_EVENT_COMM_EST:
+                       {
+                               /* Received messages on connected QP before RTU */
+                               struct dapl_ep *ep_ptr = event.element.qp->qp_context;
+                               
+                               /* TODO: cannot process COMM_EST until ibv  
+                                * guarantees valid QP context for events. 
+                                * Race conditions exist with QP destroy call. 
+                                * For now, assume the RTU will arrive.
+                                */
+                               dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                                            " dapli_async_event COMM_EST (qp=%p)\n",
+                                            event.element.qp); 
+
+                               if (!DAPL_BAD_HANDLE(ep_ptr, DAPL_MAGIC_EP) &&
+                                   ep_ptr->cm_handle != IB_INVALID_HANDLE)
+                                       ib_cm_establish(ep_ptr->cm_handle->cm_id);
+                       
+                               break;
+                       }
+                       case    IBV_EVENT_QP_FATAL:
+                       case    IBV_EVENT_QP_REQ_ERR:
+                       case    IBV_EVENT_QP_ACCESS_ERR:
+                       case    IBV_EVENT_QP_LAST_WQE_REACHED:
+                       case    IBV_EVENT_SRQ_ERR:
+                       case    IBV_EVENT_SRQ_LIMIT_REACHED:
+                       case    IBV_EVENT_SQ_DRAINED:
+                       {
+                               dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                            " dapli_async_event QP ERR %d\n",
+                                            event.event_type); 
+                               
+                               /* report up if async callback still setup */
+                               if (hca->async_qp_error)
+                                       hca->async_qp_error(hca->ib_ctx,
+                                                           &event,
+                                                           hca->async_qp_ctx);
+                               break;
+                       }
+                       case    IBV_EVENT_PATH_MIG:
+                       case    IBV_EVENT_PATH_MIG_ERR:
+                       case    IBV_EVENT_DEVICE_FATAL:
+                       case    IBV_EVENT_PORT_ACTIVE:
+                       case    IBV_EVENT_PORT_ERR:
+                       case    IBV_EVENT_LID_CHANGE:
+                       case    IBV_EVENT_PKEY_CHANGE:
+                       case    IBV_EVENT_SM_CHANGE:
+                       {
+                               dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                            " dapli_async_event DEV ERR %d\n",
+                                            event.event_type); 
+
+                               /* report up if async callback still setup */
+                               if (hca->async_unafiliated)
+                                       hca->async_unafiliated( 
+                                                       hca->ib_ctx,
+                                                       &event,
+                                                       hca->async_un_ctx);
+                               break;
+                       }
+                       default:
+                       {
+                               dapl_dbg_log (DAPL_DBG_TYPE_WARN,
+                                             "--> DsEventCb: UNKNOWN\n");
+                               break;
+                       }
+               }
+               ibv_put_async_event(&event);
+       }
+}
+
+
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg) 
+{
+       struct pollfd            ufds[__FD_SETSIZE];
+       struct _ib_hca_transport *uhca[__FD_SETSIZE]={NULL};
+       struct _ib_hca_transport *hca;
+       int                      ret,idx,fds;
+       char                     rbuf[2];
+
+       dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
+                     " ib_thread(%d,0x%x): ENTER: pipe %d cm %d at %d\n",
+                     getpid(), g_ib_thread, 
+                     g_ib_pipe[0], ib_cm_get_fd(), 
+                     ib_at_get_fd());
+
+       /* Poll across pipe, CM, AT never changes */
+       dapl_os_lock( &g_hca_lock );
+       
+       ufds[0].fd = g_ib_pipe[0];      /* pipe */
+       ufds[0].events = POLLIN;
+       ufds[1].fd = ib_cm_get_fd();    /* uCM */
+       ufds[1].events = POLLIN;
+       ufds[2].fd = ib_at_get_fd();    /* uAT */
+       ufds[2].events = POLLIN;
+               
+       while (!g_ib_destroy) {
+               
+               /* build ufds after pipe, cm, at events */
+               ufds[0].revents = 0;
+               ufds[1].revents = 0;
+               ufds[2].revents = 0;
+               idx=2;
+
+               /*  Walk HCA list and setup async and CQ events */
+               if (!dapl_llist_is_empty(&g_hca_list))
+                       hca = dapl_llist_peek_head(&g_hca_list);
+               else
+                       hca = NULL;
+               
+               while(hca) {
+                       int i;
+                       ufds[++idx].fd = hca->ib_ctx->async_fd; /* uASYNC */
+                       ufds[idx].events = POLLIN;
+                       ufds[idx].revents = 0;
+                       uhca[idx] = hca;
+                       for (i=0;i<hca->ib_ctx->num_comp;i++) {  /* uCQ */
+                               ufds[++idx].fd = hca->ib_ctx->cq_fd[i]; 
+                               ufds[idx].events = POLLIN;
+                               ufds[idx].revents = 0;
+                               uhca[idx] = hca;
+                       }
+                       hca = dapl_llist_next_entry(
+                                       &g_hca_list,
+                                       (DAPL_LLIST_ENTRY*)&hca->entry);
+               }
+               
+               /* unlock, and setup poll */
+               fds = idx+1;
+               dapl_os_unlock(&g_hca_lock);
+                ret = poll(ufds, fds, -1); 
+               if (ret <= 0) {
+                       dapl_dbg_log(DAPL_DBG_TYPE_WARN,
+                                    " ib_thread(%d): ERR %s poll\n",
+                                    getpid(),strerror(errno));
+                       dapl_os_lock(&g_hca_lock);
+                       continue;
+               }
+
+               /* check and process CQ and ASYNC events, each open device */
+               for(idx=3;idx<fds;idx++) {
+                       if (ufds[idx].revents == POLLIN) {
+                               dapli_cq_event_cb(uhca[idx]);
+                               dapli_async_event_cb(uhca[idx]);
+                       }
+               }
+               
+               /* check and process user events */
+               if (ufds[0].revents == POLLIN) {
+
+                       read(g_ib_pipe[0], rbuf, 2);
+                       
+                       /* cleanup any device on list marked for destroy */
+                       for(idx=3;idx<fds;idx++) {
+                               if(uhca[idx] && uhca[idx]->destroy == 1) {
+                                       dapl_os_lock(&g_hca_lock);
+                                       dapl_llist_remove_entry(
+                                               &g_hca_list, 
+                                               (DAPL_LLIST_ENTRY*)
+                                                       &uhca[idx]->entry);
+                                       dapl_os_unlock(&g_hca_lock);
+                                       uhca[idx]->destroy = 2;
+                               }
+                       }
+               }
+
+               /* CM and AT events */
+               if (ufds[1].revents == POLLIN)
+                       dapli_cm_event_cb();
+
+               if (ufds[2].revents == POLLIN)
+                       dapli_at_event_cb();
+
+               dapl_os_lock(&g_hca_lock);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
+       g_ib_destroy = 2;
+       dapl_os_unlock(&g_hca_lock);    
+}
 
index 6f976a726f426e9a7e83a9f434b1a9746fc5675d..867a2751ceb6d454aa4db01dd4e020ee65ef74a9 100644 (file)
@@ -231,18 +231,22 @@ typedef void (*ib_async_handler_t)(
 /* ib_hca_transport_t, specific to this implementation */
 typedef struct _ib_hca_transport
 { 
-       struct  ibv_device      *ib_dev;
+       struct ib_llist_entry   entry;
+       int                     destroy;
+       struct ibv_device       *ib_dev;
+       struct ibv_context      *ib_ctx;
        ib_cq_handle_t          ib_cq_empty;
-       DAPL_OS_LOCK            cq_lock;
        DAPL_OS_WAIT_OBJECT     wait_object;
-       int                     cq_destroy;
-       DAPL_OS_THREAD          cq_thread;
        int                     max_inline_send;
        union ibv_gid           gid;
        ib_async_handler_t      async_unafiliated;
+       void                    *async_un_ctx;
        ib_async_handler_t      async_cq_error;
+       void                    *async_ctx;
        ib_async_handler_t      async_cq;
+       void                    *async_cq_ctx;
        ib_async_handler_t      async_qp_error;
+       void                    *async_qp_ctx;
 
 } ib_hca_transport_t;
 
@@ -252,21 +256,15 @@ typedef uint32_t ib_shm_transport_t;
 /* prototypes */
 int32_t        dapls_ib_init (void);
 int32_t        dapls_ib_release (void);
-void cm_thread (void *arg);
-int dapli_cm_thread_init(void);
-void dapli_cm_thread_destroy(void);
-void at_thread (void *arg);
-int dapli_at_thread_init(void);
-void dapli_at_thread_destroy(void);
-void cq_thread (void *arg);
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
-void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
-
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
-int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index, 
-                 union ibv_gid *gid);
-int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_thread(void *arg);
+int  dapli_ib_thread_init(void);
+void dapli_ib_thread_destroy(void);
+int  dapli_get_hca_addr(struct dapl_hca *hca_ptr);
 void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
+void dapli_cm_event_cb(void);
+void dapli_at_event_cb(void);
+void dapli_cq_event_cb(struct _ib_hca_transport *hca);
+void dapli_async_event_cb(struct _ib_hca_transport *hca);
 
 DAT_RETURN
 dapls_modify_qp_state ( IN ib_qp_handle_t      qp_handle,