]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
dapl scm: Adding socket cm provider for better scalability on large homogeneous clusters.
authorArlin Davis <arlin.r.davis@intel.com>
Wed, 11 Jun 2008 18:11:48 +0000 (11:11 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Mon, 16 Jun 2008 21:50:02 +0000 (14:50 -0700)
Bring socket cm provider back to life with some changes:

better threading support for exchanging QP information.
Avoid blocking during connect to support dynamic connection
model with MPI implementations.

consumer control of ack timeout/retries.

disconnect/reject capabilities via socket exchange.

version support for wire protocol to insure compatibility
with peer scm provider. Add gids to exchange.

validated with Intel MPI on a 14,000+ core fabric using IB DDR.

Makefile.am
dapl.spec.in
dapl/openib_scm/dapl_ib_cm.c
dapl/openib_scm/dapl_ib_cq.c
dapl/openib_scm/dapl_ib_dto.h
dapl/openib_scm/dapl_ib_mem.c
dapl/openib_scm/dapl_ib_qp.c
dapl/openib_scm/dapl_ib_util.c
dapl/openib_scm/dapl_ib_util.h

index 079ad7ffdbb2d7819723fd250b30a54e21e40d57..bc7926d650c5d09185ebff14895a44608bf86094 100644 (file)
@@ -19,9 +19,11 @@ endif
 
 datlibdir = $(libdir)
 dapllibcmadir = $(libdir)
+dapllibscmdir = $(libdir)
 
 datlib_LTLIBRARIES = dat/udat/libdat.la
 dapllibcma_LTLIBRARIES = dapl/udapl/libdaplcma.la
+dapllibscm_LTLIBRARIES = dapl/udapl/libdaplscm.la
 
 dat_udat_libdat_la_CFLAGS = -Wall $(DBGFLAGS) -D_GNU_SOURCE $(OSFLAGS) \
                                -I$(srcdir)/dat/include/ -I$(srcdir)/dat/udat/ \
@@ -32,15 +34,22 @@ dapl_udapl_libdaplcma_la_CFLAGS = -Wall $(DBGFLAGS) -D_GNU_SOURCE $(OSFLAGS) \
                                -I$(srcdir)/dat/include/ -I$(srcdir)/dapl/include/ \
                                -I$(srcdir)/dapl/common -I$(srcdir)/dapl/udapl/linux \
                                -I$(srcdir)/dapl/openib_cma 
+
+dapl_udapl_libdaplscm_la_CFLAGS = -Wall $(DBGFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAGS) \
+                                -DOPENIB -DCQ_WAIT_OBJECT \
+                                -I$(srcdir)/dat/include/ -I$(srcdir)/dapl/include/ \
+                                -I$(srcdir)/dapl/common -I$(srcdir)/dapl/udapl/linux \
+                                -I$(srcdir)/dapl/openib_scm
+
                
 if HAVE_LD_VERSION_SCRIPT
     dat_version_script = -Wl,--version-script=$(srcdir)/dat/udat/libdat.map
     daplcma_version_script = -Wl,--version-script=$(srcdir)/dapl/udapl/libdaplcma.map
-                       
+    daplscm_version_script = -Wl,--version-script=$(srcdir)/dapl/udapl/libdaplscm.map
 else
-    dat_version_script = 
-    daplcma_version_script = 
-    
+    dat_version_script =
+    daplofa_version_script =
+    daplscm_version_script =
 endif
 
 #
@@ -168,6 +177,115 @@ dapl_udapl_libdaplcma_la_LDFLAGS = -version-info 1:2:0 $(daplcma_version_script)
                                   -Wl,-init,dapl_init -Wl,-fini,dapl_fini \
                                   -lpthread -libverbs -lrdmacm 
                                
+#
+# uDAPL OpenFabrics Socket CM version: libdaplscm.so
+#
+dapl_udapl_libdaplscm_la_SOURCES = dapl/udapl/dapl_init.c \
+        dapl/udapl/dapl_evd_create.c               \
+        dapl/udapl/dapl_evd_query.c                \
+        dapl/udapl/dapl_cno_create.c               \
+        dapl/udapl/dapl_cno_modify_agent.c         \
+        dapl/udapl/dapl_cno_free.c                 \
+        dapl/udapl/dapl_cno_wait.c                 \
+        dapl/udapl/dapl_cno_query.c                \
+        dapl/udapl/dapl_lmr_create.c               \
+        dapl/udapl/dapl_evd_wait.c                 \
+        dapl/udapl/dapl_evd_disable.c              \
+        dapl/udapl/dapl_evd_enable.c               \
+        dapl/udapl/dapl_evd_modify_cno.c           \
+        dapl/udapl/dapl_evd_set_unwaitable.c       \
+        dapl/udapl/dapl_evd_clear_unwaitable.c     \
+        dapl/udapl/linux/dapl_osd.c                \
+        dapl/common/dapl_cookie.c                   \
+        dapl/common/dapl_cr_accept.c                \
+        dapl/common/dapl_cr_query.c                 \
+        dapl/common/dapl_cr_reject.c                \
+        dapl/common/dapl_cr_util.c                  \
+        dapl/common/dapl_cr_callback.c              \
+        dapl/common/dapl_cr_handoff.c               \
+        dapl/common/dapl_ep_connect.c               \
+        dapl/common/dapl_ep_create.c                \
+        dapl/common/dapl_ep_disconnect.c            \
+        dapl/common/dapl_ep_dup_connect.c           \
+        dapl/common/dapl_ep_free.c                  \
+        dapl/common/dapl_ep_reset.c                 \
+        dapl/common/dapl_ep_get_status.c            \
+        dapl/common/dapl_ep_modify.c                \
+        dapl/common/dapl_ep_post_rdma_read.c        \
+        dapl/common/dapl_ep_post_rdma_write.c       \
+        dapl/common/dapl_ep_post_recv.c             \
+        dapl/common/dapl_ep_post_send.c             \
+        dapl/common/dapl_ep_query.c                 \
+        dapl/common/dapl_ep_util.c                  \
+        dapl/common/dapl_evd_dequeue.c              \
+        dapl/common/dapl_evd_free.c                 \
+        dapl/common/dapl_evd_post_se.c              \
+        dapl/common/dapl_evd_resize.c               \
+        dapl/common/dapl_evd_util.c                 \
+        dapl/common/dapl_evd_cq_async_error_callb.c \
+        dapl/common/dapl_evd_qp_async_error_callb.c \
+        dapl/common/dapl_evd_un_async_error_callb.c \
+        dapl/common/dapl_evd_connection_callb.c     \
+        dapl/common/dapl_evd_dto_callb.c            \
+        dapl/common/dapl_get_consumer_context.c     \
+        dapl/common/dapl_get_handle_type.c          \
+        dapl/common/dapl_hash.c                     \
+        dapl/common/dapl_hca_util.c                 \
+        dapl/common/dapl_ia_close.c                 \
+        dapl/common/dapl_ia_open.c                  \
+        dapl/common/dapl_ia_query.c                 \
+        dapl/common/dapl_ia_util.c                  \
+        dapl/common/dapl_llist.c                    \
+        dapl/common/dapl_lmr_free.c                 \
+        dapl/common/dapl_lmr_query.c                \
+        dapl/common/dapl_lmr_util.c                 \
+        dapl/common/dapl_lmr_sync_rdma_read.c       \
+        dapl/common/dapl_lmr_sync_rdma_write.c      \
+        dapl/common/dapl_mr_util.c                  \
+        dapl/common/dapl_provider.c                 \
+        dapl/common/dapl_sp_util.c                  \
+        dapl/common/dapl_psp_create.c               \
+        dapl/common/dapl_psp_create_any.c           \
+        dapl/common/dapl_psp_free.c                 \
+        dapl/common/dapl_psp_query.c                \
+        dapl/common/dapl_pz_create.c                \
+        dapl/common/dapl_pz_free.c                  \
+        dapl/common/dapl_pz_query.c                 \
+        dapl/common/dapl_pz_util.c                  \
+        dapl/common/dapl_rmr_create.c               \
+        dapl/common/dapl_rmr_free.c                 \
+        dapl/common/dapl_rmr_bind.c                 \
+        dapl/common/dapl_rmr_query.c                \
+        dapl/common/dapl_rmr_util.c                 \
+        dapl/common/dapl_rsp_create.c               \
+        dapl/common/dapl_rsp_free.c                 \
+        dapl/common/dapl_rsp_query.c                \
+        dapl/common/dapl_cno_util.c                 \
+        dapl/common/dapl_set_consumer_context.c     \
+        dapl/common/dapl_ring_buffer_util.c         \
+        dapl/common/dapl_name_service.c             \
+        dapl/common/dapl_timer_util.c               \
+        dapl/common/dapl_ep_create_with_srq.c       \
+        dapl/common/dapl_ep_recv_query.c            \
+        dapl/common/dapl_ep_set_watermark.c         \
+        dapl/common/dapl_srq_create.c               \
+        dapl/common/dapl_srq_free.c                 \
+        dapl/common/dapl_srq_query.c                \
+        dapl/common/dapl_srq_resize.c               \
+        dapl/common/dapl_srq_post_recv.c            \
+        dapl/common/dapl_srq_set_lw.c               \
+        dapl/common/dapl_srq_util.c                 \
+        dapl/common/dapl_debug.c                    \
+        dapl/openib_scm/dapl_ib_util.c              \
+        dapl/openib_scm/dapl_ib_cq.c                \
+        dapl/openib_scm/dapl_ib_qp.c                \
+        dapl/openib_scm/dapl_ib_cm.c                \
+        dapl/openib_scm/dapl_ib_mem.c
+
+dapl_udapl_libdaplscm_la_LDFLAGS = -version-info 1:2:0 $(daplscm_version_script) \
+                                   -Wl,-init,dapl_init -Wl,-fini,dapl_fini \
+                                   -lpthread -libverbs
+
 libdatincludedir = $(includedir)/dat
 
 libdatinclude_HEADERS = dat/include/dat/dat.h \
@@ -230,6 +348,7 @@ EXTRA_DIST = dat/common/dat_dictionary.h \
             dapl/openib_scm/dapl_ib_util.h \
             dat/udat/libdat.map \
             dapl/udapl/libdaplcma.map \
+             dapl/udapl/libdaplscm.map \
             dapl.spec.in \
             $(man_MANS) \
             test/dapltest/include/dapl_bpool.h \
@@ -271,9 +390,10 @@ install-exec-hook:
        fi; \
        echo OpenIB-cma u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib0 0" ""' >> $(sysconfdir)/dat.conf; \
        echo OpenIB-cma-1 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib1 0" ""' >> $(sysconfdir)/dat.conf; \
-       echo OpenIB-cma-2 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib2 0" ""' >> $(sysconfdir)/dat.conf; \
-       echo OpenIB-cma-3 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib3 0" ""' >> $(sysconfdir)/dat.conf; \
-       echo OpenIB-bond u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"bond0 0" ""' >> $(sysconfdir)/dat.conf;
+        echo OpenIB-mthca0-1 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mthca0 1" ""' >> $(sysconfdir)/dat.conf; \
+        echo OpenIB-mthca0-2 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mthca0 2" ""' >> $(sysconfdir)/dat.conf; \
+        echo OpenIB-mlx4_0-1 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mlx4_0 1" ""' >> $(sysconfdir)/dat.conf; \
+        echo OpenIB-mlx4_0-2 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mlx4_0 2" ""' >> $(sysconfdir)/dat.conf;
 
 uninstall-hook:
        if test -e $(sysconfdir)/dat.conf; then \
index 2153e278e1482e00f840e85c1eea9988c24b4c23..3e64bdc35ad50520b205f06a1148910f82182d78 100644 (file)
@@ -95,10 +95,10 @@ if [ -e %{_sysconfdir}/dat.conf ]; then
 fi
 echo OpenIB-cma u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib0 0" ""'  >> %{_sysconfdir}/dat.conf
 echo OpenIB-cma-1 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib1 0" ""'  >> %{_sysconfdir}/dat.conf
-echo OpenIB-cma-2 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib2 0" ""'  >> %{_sysconfdir}/dat.conf
-echo OpenIB-cma-3 u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"ib3 0" ""'  >> %{_sysconfdir}/dat.conf
-echo OpenIB-bond u1.2 nonthreadsafe default libdaplcma.so.1 dapl.1.2 '"bond0 0" ""'  >> %{_sysconfdir}/dat.conf
-
+echo OpenIB-mthca0-1 u2.0 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mthca0 1" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mthca0-2 u2.0 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mthca0 2" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mlx4_0-1 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mlx4_0 1" ""' >> %{_sysconfdir}/dat.conf
+echo OpenIB-mlx4_0-2 u1.2 nonthreadsafe default libdaplscm.so.1 dapl.1.2 '"mlx4_0 2" ""' >> %{_sysconfdir}/dat.conf
 
 %postun
 /sbin/ldconfig
@@ -130,7 +130,7 @@ fi
 
 %changelog
 * Tue May 20 2008 Arlin Davis <ardavis@ichips.intel.com> - 1.2.7
-- DAT/DAPL Version 1.2.7 Release 1, OFED 1.3.1 GA 
+- DAT/DAPL Version 1.2.7 Release 1, OFED 1.3.1 GA
 
 * Thu May 1 2008 Arlin Davis <ardavis@ichips.intel.com> - 1.2.6
 - DAT/DAPL Version 1.2.6 Release 1, OFED 1.3.1 
index f534e8d984c16994b4f58c8041135619e5703e46..9e686d632fd0e5a1d205c7cb1cdfa057ab1bdfab 100644 (file)
 #include <unistd.h>
 #include <fcntl.h>
 #include <netinet/tcp.h>
-#include <sysfs/libsysfs.h>
+#include <byteswap.h>
+#include <poll.h>
 
-/* prototypes */
-static uint16_t dapli_get_lid( struct ibv_device *dev, int port );
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
 
-static DAT_RETURN dapli_socket_connect ( DAPL_EP               *ep_ptr,
-                                        DAT_IA_ADDRESS_PTR     r_addr,
-                                        DAT_CONN_QUAL          r_qual,
-                                        DAT_COUNT              p_size,
-                                        DAT_PVOID              p_data );
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) {return bswap_64(x);}
+#elif __BYTE_ORDER == __BIG_ENDIAN
+static inline uint64_t cpu_to_be64(uint64_t x) {return x;}
+#endif
 
-static DAT_RETURN dapli_socket_listen ( DAPL_IA                        *ia_ptr,
-                                       DAT_CONN_QUAL           serviceID,
-                                       DAPL_SP                 *sp_ptr );
+extern int g_scm_pipe[2];
 
-static DAT_RETURN dapli_socket_accept( ib_cm_srvc_handle_t cm_ptr );
+static struct ib_cm_handle *dapli_cm_create(void)
+{ 
+       struct ib_cm_handle *cm_ptr;
 
-static DAT_RETURN dapli_socket_accept_final(   DAPL_EP         *ep_ptr,
-                                               DAPL_CR         *cr_ptr,
-                                               DAT_COUNT       p_size,
-                                               DAT_PVOID       p_data );
+       /* Allocate CM, init lock, and initialize */
+       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+               return NULL;
+
+        if (dapl_os_lock_init(&cm_ptr->lock)) 
+               goto bail;
 
-/* XXX temporary hack to get lid */
-static uint16_t dapli_get_lid(IN struct ibv_device *dev, IN int port)
+       (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
+       cm_ptr->dst.ver = htons(DSCM_VER);
+       cm_ptr->socket = -1;
+       return cm_ptr;
+bail:
+       dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+       return NULL;
+}
+
+/* mark for destroy, remove all references, schedule cleanup */
+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
 {
-       char path[128];
-       char val[16];
-       char name[256];
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
+       
+       /* cleanup, never made it to work queue */
+       if (cm_ptr->state == SCM_INIT) {
+               if (cm_ptr->socket >= 0)  
+                       close(cm_ptr->socket);
+               dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+               return;
+       }
 
-       if (sysfs_get_mnt_path(path, sizeof path)) {
-               fprintf(stderr, "Couldn't find sysfs mount.\n");
-               return 0;
+       dapl_os_lock(&cm_ptr->lock);
+       cm_ptr->state = SCM_DESTROY;
+       if (cm_ptr->ep) {
+               cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+               cm_ptr->ep->qp_handle = IB_INVALID_HANDLE;
        }
-       sprintf(name, "%s/class/infiniband/%s/ports/%d/lid", path,
-                ibv_get_device_name(dev), port);
 
-       if (sysfs_read_attribute_value(name, val, sizeof val)) {
-               fprintf(stderr, "Couldn't read LID at %s\n", name);
-               return 0;
+       /* close socket if still active */
+       if (cm_ptr->socket >= 0) {
+               close(cm_ptr->socket);
+               cm_ptr->socket = -1;
        }
-       return strtol(val, NULL, 0);
+       dapl_os_unlock(&cm_ptr->lock);
+
+       /* wakeup work thread */
+        write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+/* queue socket for processing CM work */
+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
+{
+       /* add to work queue for cr thread processing */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+       dapl_os_lock(&cm_ptr->hca->ib_trans.lock);
+       dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list, 
+                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
+       dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
+
+        /* wakeup CM work thread */
+        write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+static uint16_t dapli_get_lid(IN struct ibv_context *ctx, IN uint8_t port)
+{
+       struct ibv_port_attr port_attr;
+
+       if(ibv_query_port(ctx, port,&port_attr))
+               return(0xffff);
+       else
+               return(port_attr.lid);
 }
 
 /*
- * ACTIVE: Create socket, connect, and exchange QP information 
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
  */
 static DAT_RETURN 
-dapli_socket_connect ( DAPL_EP                 *ep_ptr,
-                       DAT_IA_ADDRESS_PTR      r_addr,
-                       DAT_CONN_QUAL           r_qual,
-                       DAT_COUNT               p_size,
-                       DAT_PVOID               p_data )
+dapli_socket_disconnect(ib_cm_handle_t cm_ptr)
 {
-       ib_cm_handle_t  cm_ptr;
-       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
+       DAPL_EP *ep_ptr = cm_ptr->ep;
+       DAT_UINT32 disc_data = htonl(0xdead);
+
+       if (ep_ptr == NULL)
+               return DAT_SUCCESS;
+       
+       dapl_os_lock(&cm_ptr->lock);
+       if ((cm_ptr->state == SCM_INIT) ||
+           (cm_ptr->state == SCM_DISCONNECTED)) {
+               dapl_os_unlock(&cm_ptr->lock);
+               return DAT_SUCCESS;
+       } else {
+               /* send disc date, close socket, schedule destroy */
+               if (cm_ptr->socket >= 0) { 
+                       write(cm_ptr->socket, &disc_data, sizeof(disc_data));
+                       close(cm_ptr->socket);
+                       cm_ptr->socket = -1;
+               }
+               cm_ptr->state = SCM_DISCONNECTED;
+               write(g_scm_pipe[1], "w", sizeof "w");
+       }
+       dapl_os_unlock(&cm_ptr->lock);
+
+
+       if (ep_ptr->cr_ptr) {
+               dapls_cr_callback(cm_ptr,
+                                 IB_CME_DISCONNECTED,
+                                 NULL,
+                                 ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
+       } else {
+               dapl_evd_connection_callback(ep_ptr->cm_handle,
+                                            IB_CME_DISCONNECTED,
+                                            NULL,
+                                            ep_ptr);
+       }       
+
+       /* remove reference from endpoint */
+       ep_ptr->cm_handle = NULL;
+       
+       /* schedule destroy */
+
+
+       return DAT_SUCCESS;
+}
+
+
+/*
+ * ACTIVE: Create socket, connect, defer exchange QP information to CR thread
+ * to avoid blocking. 
+ */
+DAT_RETURN 
+dapli_socket_connect(DAPL_EP           *ep_ptr,
+                    DAT_IA_ADDRESS_PTR r_addr,
+                    DAT_CONN_QUAL      r_qual,
+                    DAT_COUNT          p_size,
+                    DAT_PVOID          p_data)
+{
+       ib_cm_handle_t cm_ptr;
        int             len, opt = 1;
        struct iovec    iovec[2];
-       short           rtu_data = htons(0x0E0F);
-       
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
+       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n", 
+                    r_qual,p_size);
                        
-       /*
-        *  Allocate CM and initialize
-        */
-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
+       cm_ptr = dapli_cm_create();
+       if (cm_ptr == NULL)
                return DAT_INSUFFICIENT_RESOURCES;
-       }
-
-       (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-       cm_ptr->socket = -1;
 
        /* create, connect, sockopt, and exchange QP information */
        if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
@@ -136,197 +231,252 @@ dapli_socket_connect (  DAPL_EP                 *ep_ptr,
 
        ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
 
-       if ( connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0 ) {
+       if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) < 0) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
                             " connect: %s on r_qual %d\n",
                             strerror(errno), (unsigned int)r_qual);
-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+               dapli_cm_destroy(cm_ptr);
                return DAT_INVALID_ADDRESS;
        }
        setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
-       
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
+
+
        /* Send QP info, IA address, and private data */
-       cm_ptr->dst.qpn = ep_ptr->qp_handle->qp_num;
-       cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
-       cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr->ib_trans.ib_dev, 
-                                        ia_ptr->hca_ptr->port_num );
+       cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+       cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
+       cm_ptr->dst.lid = 
+               htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle, 
+                                   (uint8_t)ia_ptr->hca_ptr->port_num));
+       if (cm_ptr->dst.lid == 0xffff)
+               goto bail;
+
+        /* in network order */
+        if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
+                                   (uint8_t)ia_ptr->hca_ptr->port_num,
+                                   0,
+                                    &cm_ptr->dst.gid))
+               goto bail;
+
        cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
-       cm_ptr->dst.p_size = p_size;
+       cm_ptr->dst.p_size = htonl(p_size);
        iovec[0].iov_base = &cm_ptr->dst;
        iovec[0].iov_len  = sizeof(ib_qp_cm_t);
-       if ( p_size ) {
+       if (p_size) {
                iovec[1].iov_base = p_data;
                iovec[1].iov_len  = p_size;
        }
-       len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
-       if ( len != (p_size + sizeof(ib_qp_cm_t)) ) {
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n"); 
+       len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
+       if (len != (p_size + sizeof(ib_qp_cm_t))) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
                             " connect write: ERR %s, wcnt=%d\n",
                             strerror(errno), len); 
                goto bail;
        }
-       dapl_dbg_log(DAPL_DBG_TYPE_EP
+       dapl_dbg_log(DAPL_DBG_TYPE_CM
                     " connect: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-                    cm_ptr->dst.port, cm_ptr->dst.lid, 
-                    cm_ptr->dst.qpn, cm_ptr->dst.p_size ); 
+                    ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+                    ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+        dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                     " connect SRC GID subnet %016llx id %016llx\n",
+                     (unsigned long long) 
+                       cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+                     (unsigned long long) 
+                       cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+       /* queue up to work thread to avoid blocking consumer */
+       cm_ptr->state = SCM_CONN_PENDING;
+       cm_ptr->hca = ia_ptr->hca_ptr;
+       cm_ptr->ep = ep_ptr;
+       dapli_cm_queue(cm_ptr);
+       return DAT_SUCCESS;
+bail:
+       /* close socket, free cm structure */
+       dapli_cm_destroy(cm_ptr);
+       return DAT_INTERNAL_ERROR;
+}
+       
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+void 
+dapli_socket_connect_rtu(ib_cm_handle_t        cm_ptr)
+{
+       DAPL_EP         *ep_ptr = cm_ptr->ep;
+       int             len;
+       struct iovec    iovec[2];
+       short           rtu_data = htons(0x0E0F);
+       ib_cm_events_t  event = IB_CME_DESTINATION_REJECT;
 
        /* read DST information into cm_ptr, overwrite SRC info */
-       len = readv( cm_ptr->socket, iovec, 1 );
-       if ( len != sizeof(ib_qp_cm_t) ) {
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n"); 
+
+       iovec[0].iov_base = &cm_ptr->dst;
+       iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+       len = readv(cm_ptr->socket, iovec, 1);
+       if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DSCM_VER) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " connect read: ERR %s, rcnt=%d\n",
-                            strerror(errno), len); 
+                            " connect_rtu read: ERR %s, rcnt=%d, ver=%d\n",
+                            strerror(errno), len, cm_ptr->dst.ver); 
+               goto bail;
+       }
+       /* check for consumer reject */
+       if (cm_ptr->dst.rej) {
+               dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                            " connect_rtu read: PEER REJ reason=0x%x\n",
+                            ntohs(cm_ptr->dst.rej)); 
+               event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
                goto bail;
        }
+
+       /* convert peer response values to host order */
+       cm_ptr->dst.port = ntohs(cm_ptr->dst.port);
+       cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
+       cm_ptr->dst.qpn = ntohl(cm_ptr->dst.qpn);
+       cm_ptr->dst.p_size = ntohl(cm_ptr->dst.p_size);
+
+       /* save remote address information */
+       dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+                       &cm_ptr->dst.ia_address, 
+                       sizeof(ep_ptr->remote_ia_address));
+
        dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-                    " connect: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+                    " connect_rtu: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+                    inet_ntoa(((struct sockaddr_in *)&cm_ptr->dst.ia_address)->sin_addr),
                     cm_ptr->dst.port, cm_ptr->dst.lid, 
-                    cm_ptr->dst.qpn, cm_ptr->dst.p_size ); 
+                    cm_ptr->dst.qpn, cm_ptr->dst.p_size); 
 
        /* validate private data size before reading */
-       if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
+       if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " connect read: psize (%d) wrong\n",
+                            " connect_rtu read: psize (%d) wrong\n",
                             cm_ptr->dst.p_size ); 
                goto bail;
        }
 
        /* read private data into cm_handle if any present */
-       if ( cm_ptr->dst.p_size ) {
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n"); 
+       if (cm_ptr->dst.p_size) {
                iovec[0].iov_base = cm_ptr->p_data;
                iovec[0].iov_len  = cm_ptr->dst.p_size;
-               len = readv( cm_ptr->socket, iovec, 1 );
-               if ( len != cm_ptr->dst.p_size ) {
+               len = readv(cm_ptr->socket, iovec, 1);
+               if (len != cm_ptr->dst.p_size) {
                        dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                               " connect read pdata: ERR %s, rcnt=%d\n",
+                               " connect_rtu read pdata: ERR %s, rcnt=%d\n",
                                strerror(errno), len); 
                        goto bail;
                }
        }
 
        /* modify QP to RTR and then to RTS with remote info */
-       if ( dapls_modify_qp_state( ep_ptr->qp_handle, 
-                                   IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
+       if (dapls_modify_qp_state(ep_ptr->qp_handle, 
+                                 IBV_QPS_RTR, &cm_ptr->dst) != DAT_SUCCESS)
                goto bail;
 
-       if ( dapls_modify_qp_state( ep_ptr->qp_handle, 
-                                   IBV_QPS_RTS, &cm_ptr->dst ) != DAT_SUCCESS )
+       if (dapls_modify_qp_state(ep_ptr->qp_handle, 
+                                  IBV_QPS_RTS, &cm_ptr->dst) != DAT_SUCCESS)
                goto bail;
                 
        ep_ptr->qp_state = IB_QP_STATE_RTS;
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n"); 
+
        /* complete handshake after final QP state change */
-       write(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
+       write(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
 
        /* init cm_handle and post the event with private data */
        ep_ptr->cm_handle = cm_ptr;
-       dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" ); 
-       dapl_evd_connection_callback(   ep_ptr->cm_handle, 
-                                       IB_CME_CONNECTED
-                                       cm_ptr->p_data
-                                       ep_ptr );       
-       return DAT_SUCCESS;
-
+       cm_ptr->state = SCM_CONNECTED;
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," ACTIVE: connected!\n"); 
+       dapl_evd_connection_callback(cm_ptr
+                                    IB_CME_CONNECTED
+                                    cm_ptr->p_data, 
+                                    ep_ptr);   
+       return;
 bail:
        /* close socket, free cm structure and post error event */
-       if ( cm_ptr->socket >= 0 ) 
-               close(cm_ptr->socket);
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
-       dapl_evd_connection_callback(   ep_ptr->cm_handle, 
-                                       IB_CME_LOCAL_FAILURE, 
-                                       NULL, 
-                                       ep_ptr );
-       return DAT_INTERNAL_ERROR;
+       dapli_cm_destroy(cm_ptr);
+       dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+       dapl_evd_connection_callback(NULL, event, NULL, ep_ptr);
 }
 
-
 /*
  * PASSIVE: Create socket, listen, accept, exchange QP information 
  */
-static DAT_RETURN 
-dapli_socket_listen (  DAPL_IA         *ia_ptr,
-                       DAT_CONN_QUAL   serviceID,
-                       DAPL_SP         *sp_ptr )
+DAT_RETURN 
+dapli_socket_listen(DAPL_IA            *ia_ptr,
+                   DAT_CONN_QUAL       serviceID,
+                   DAPL_SP             *sp_ptr )
 {
        struct sockaddr_in      addr;
        ib_cm_srvc_handle_t     cm_ptr = NULL;
        int                     opt = 1;
        DAT_RETURN              dat_status = DAT_SUCCESS;
 
-       dapl_dbg_log (  DAPL_DBG_TYPE_EP,
-                       " listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
-                       ia_ptr, serviceID, sp_ptr);
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
+                    ia_ptr, serviceID, sp_ptr);
 
-       /* Allocate CM and initialize */
-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+       cm_ptr = dapli_cm_create();
+       if (cm_ptr == NULL)
                return DAT_INSUFFICIENT_RESOURCES;
 
-       (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-       
-       cm_ptr->socket = cm_ptr->l_socket = -1;
        cm_ptr->sp = sp_ptr;
-       cm_ptr->hca_ptr = ia_ptr->hca_ptr;
+       cm_ptr->hca = ia_ptr->hca_ptr;
        
        /* bind, listen, set sockopt, accept, exchange data */
-       if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+       if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
                                "socket for listen returned %d\n", errno);
                dat_status = DAT_INSUFFICIENT_RESOURCES;
                goto bail;
        }
 
-       setsockopt(cm_ptr->l_socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+       setsockopt(cm_ptr->socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
        addr.sin_port        = htons(serviceID);
        addr.sin_family      = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
 
-       if (( bind( cm_ptr->l_socket,(struct sockaddr*)&addr, sizeof(addr) ) < 0) ||
-                  (listen( cm_ptr->l_socket, 128 ) < 0) ) {
-       
+       if ((bind(cm_ptr->socket,(struct sockaddr*)&addr, sizeof(addr)) < 0) ||
+           (listen(cm_ptr->socket, 128) < 0)) {
                dapl_dbg_log( DAPL_DBG_TYPE_CM,
                                " listen: ERROR %s on conn_qual 0x%x\n",
                                strerror(errno),serviceID); 
-
-               if ( errno == EADDRINUSE )
+               if (errno == EADDRINUSE)
                        dat_status = DAT_CONN_QUAL_IN_USE;
                else
                        dat_status = DAT_CONN_QUAL_UNAVAILABLE;
-
                goto bail;
        }
        
        /* set cm_handle for this service point, save listen socket */
        sp_ptr->cm_srvc_handle = cm_ptr;
 
-       /* add to SP->CR thread list */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
-       dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
-       dapl_llist_add_tail(&cm_ptr->hca_ptr->ib_trans.list, 
-                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry, cm_ptr);
-       dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+       /* queue up listen socket to process inbound CR's */
+       cm_ptr->state = SCM_LISTEN;
+       dapli_cm_queue(cm_ptr);
+
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                    " listen: qual 0x%x cr %p s_fd %d\n",
+                    ntohs(serviceID), cm_ptr, cm_ptr->socket ); 
 
-       dapl_dbg_log( DAPL_DBG_TYPE_CM,
-                       " listen: qual 0x%x cr %p s_fd %d\n",
-                       ntohs(serviceID), cm_ptr, cm_ptr->l_socket ); 
-       
        return dat_status;
 bail:
        dapl_dbg_log( DAPL_DBG_TYPE_CM,
                        " listen: ERROR on conn_qual 0x%x\n",serviceID); 
-       if ( cm_ptr->l_socket >= 0 )
-               close( cm_ptr->l_socket );
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+       dapli_cm_destroy(cm_ptr);
        return dat_status;
 }
 
 
 /*
- * PASSIVE: send local QP information, private data, and wait for 
- *         active side to respond with QP RTS/RTR status 
+ * PASSIVE: accept socket, receive peer QP information, private data, post cr_event 
  */
-static DAT_RETURN 
+DAT_RETURN 
 dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 {
        ib_cm_handle_t  acm_ptr;
@@ -334,6 +484,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        int             len;
        DAT_RETURN      dat_status = DAT_SUCCESS;
                
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n"); 
+
        /* Allocate accept CM and initialize */
        if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
                return DAT_INSUFFICIENT_RESOURCES;
@@ -342,155 +494,221 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        
        acm_ptr->socket = -1;
        acm_ptr->sp = cm_ptr->sp;
-       acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+       acm_ptr->hca = cm_ptr->hca;
 
        len = sizeof(acm_ptr->dst.ia_address);
-       acm_ptr->socket = accept(cm_ptr->l_socket, 
+       acm_ptr->socket = accept(cm_ptr->socket, 
                                (struct sockaddr*)&acm_ptr->dst.ia_address, 
-                               (socklen_t*)&len );
+                               (socklen_t*)&len);
 
-       if ( acm_ptr->socket < 0 ) {
+       if (acm_ptr->socket < 0) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
                        " accept: ERR %s on FD %d l_cr %p\n",
-                       strerror(errno),cm_ptr->l_socket,cm_ptr); 
+                       strerror(errno),cm_ptr->socket,cm_ptr); 
                dat_status = DAT_INTERNAL_ERROR;
                goto bail;
        }
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
+
        /* read in DST QP info, IA address. check for private data */
-       len = read( acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t) );
-       if ( len != sizeof(ib_qp_cm_t) ) {
+       len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
+       if (len != sizeof(ib_qp_cm_t) || 
+           ntohs(acm_ptr->dst.ver) != DSCM_VER) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                       " accept read: ERR %s, rcnt=%d\n",
-                       strerror(errno), len); 
+                            " accept read: ERR %s, rcnt=%d, ver=%d\n",
+                            strerror(errno), len, acm_ptr->dst.ver); 
                dat_status = DAT_INTERNAL_ERROR;
                goto bail;
-
        }
+
+       /* convert accepted values to host order */
+       acm_ptr->dst.port = ntohs(acm_ptr->dst.port);
+       acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
+       acm_ptr->dst.qpn = ntohl(acm_ptr->dst.qpn);
+       acm_ptr->dst.p_size = ntohl(acm_ptr->dst.p_size);
+
        dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-               " accept: DST port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-               acm_ptr->dst.port, acm_ptr->dst.lid, 
-               acm_ptr->dst.qpn, acm_ptr->dst.p_size ); 
+                    " accept: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+                    inet_ntoa(((struct sockaddr_in *)&acm_ptr->dst.ia_address)->sin_addr),
+                    acm_ptr->dst.port, acm_ptr->dst.lid, 
+                    acm_ptr->dst.qpn, acm_ptr->dst.p_size); 
 
        /* validate private data size before reading */
-       if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
+       if (acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                       " accept read: psize (%d) wrong\n",
-                       acm_ptr->dst.p_size ); 
+                            " accept read: psize (%d) wrong\n",
+                            acm_ptr->dst.p_size); 
                dat_status = DAT_INTERNAL_ERROR;
                goto bail;
        }
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read private data\n"); 
+
        /* read private data into cm_handle if any present */
-       if ( acm_ptr->dst.p_size ) {
+       if (acm_ptr->dst.p_size) {
                len = read( acm_ptr->socket, 
-                           acm_ptr->p_data, acm_ptr->dst.p_size );
-               if ( len != acm_ptr->dst.p_size ) {
+                           acm_ptr->p_data, acm_ptr->dst.p_size);
+               if (len != acm_ptr->dst.p_size) {
                        dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                               " accept read pdata: ERR %s, rcnt=%d\n",
-                               strerror(errno), len ); 
+                                    " accept read pdata: ERR %s, rcnt=%d\n",
+                                    strerror(errno), len); 
                        dat_status = DAT_INTERNAL_ERROR;
                        goto bail;
                }
-               dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-                               " accept: psize=%d read\n",
-                               acm_ptr->dst.p_size); 
+               dapl_dbg_log(DAPL_DBG_TYPE_EP," accept: psize=%d read\n",len);
                p_data = acm_ptr->p_data;
        }
        
-       /* trigger CR event and return SUCCESS */
-       dapls_cr_callback(  acm_ptr,
-                           IB_CME_CONNECTION_REQUEST_PENDING,
-                           p_data,
-                           acm_ptr->sp );
+       acm_ptr->state = SCM_ACCEPTING;
 
+       /* trigger CR event and return SUCCESS */
+       dapls_cr_callback(acm_ptr,
+                         IB_CME_CONNECTION_REQUEST_PENDING,
+                         p_data,
+                         acm_ptr->sp );
        return DAT_SUCCESS;
-
 bail:
-       if ( acm_ptr->socket >=0 )
-               close( acm_ptr->socket );
-       dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
+       dapli_cm_destroy(acm_ptr);
        return DAT_INTERNAL_ERROR;
 }
 
-
-static DAT_RETURN 
-dapli_socket_accept_final( DAPL_EP             *ep_ptr,
-                          DAPL_CR              *cr_ptr,
-                          DAT_COUNT            p_size,
-                          DAT_PVOID            p_data )
+/*
+ * PASSIVE: consumer accept, send local QP information, private data, 
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread. 
+ */
+DAT_RETURN 
+dapli_socket_accept_usr(DAPL_EP                *ep_ptr,
+                       DAPL_CR         *cr_ptr,
+                       DAT_COUNT       p_size,
+                       DAT_PVOID       p_data)
 {
        DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
-       ib_cm_handle_t  cm_ptr = cr_ptr->ib_cm_handle;
-       ib_qp_cm_t      qp_cm;
+       ib_cm_handle_t  cm_ptr = cr_ptr->ib_cm_handle;
        struct iovec    iovec[2];
        int             len;
-       short           rtu_data = 0;
 
-       if (p_size >  IB_MAX_REP_PDATA_SIZE) 
+       if (p_size > IB_MAX_REP_PDATA_SIZE) 
                return DAT_LENGTH_ERROR;
 
        /* must have a accepted socket */
-       if ( cm_ptr->socket < 0 )
+       if (cm_ptr->socket < 0)
                return DAT_INTERNAL_ERROR;
        
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                    " accept_usr: remote port=0x%x lid=0x%x"
+                    " qpn=0x%x psize=%d\n",
+                    cm_ptr->dst.port, cm_ptr->dst.lid,
+                    cm_ptr->dst.qpn, cm_ptr->dst.p_size); 
+
        /* modify QP to RTR and then to RTS with remote info already read */
-       if ( dapls_modify_qp_state( ep_ptr->qp_handle, 
-                                   IBV_QPS_RTR, &cm_ptr->dst ) != DAT_SUCCESS )
+       if (dapls_modify_qp_state(ep_ptr->qp_handle, 
+                                 IBV_QPS_RTR, &cm_ptr->dst) != DAT_SUCCESS)
                goto bail;
 
-       if ( dapls_modify_qp_state( ep_ptr->qp_handle, 
-                                   IBV_QPS_RTS, &cm_ptr->dst ) != DAT_SUCCESS )
+       if (dapls_modify_qp_state(ep_ptr->qp_handle, 
+                                 IBV_QPS_RTS, &cm_ptr->dst) != DAT_SUCCESS)
                goto bail;
 
        ep_ptr->qp_state = IB_QP_STATE_RTS;
        
-       /* Send QP info, IA address, and private data */
-       qp_cm.qpn = ep_ptr->qp_handle->qp_num;
-       qp_cm.port = ia_ptr->hca_ptr->port_num;
-       qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr->ib_trans.ib_dev, 
-                                  ia_ptr->hca_ptr->port_num );
-       qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
-       qp_cm.p_size = p_size;
-       iovec[0].iov_base = &qp_cm;
+       /* save remote address information */
+       dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+                       &cm_ptr->dst.ia_address, 
+                       sizeof(ep_ptr->remote_ia_address));
+
+       /* send our QP info, IA address, and private data */
+       cm_ptr->dst.qpn = htonl(ep_ptr->qp_handle->qp_num);
+       cm_ptr->dst.port = htons(ia_ptr->hca_ptr->port_num);
+       cm_ptr->dst.lid = htons(dapli_get_lid(ia_ptr->hca_ptr->ib_hca_handle, 
+                                       (uint8_t)ia_ptr->hca_ptr->port_num));
+       if (cm_ptr->dst.lid == 0xffff)
+               goto bail;
+
+        /* in network order */
+       if (ibv_query_gid(ia_ptr->hca_ptr->ib_hca_handle,
+                         (uint8_t)ia_ptr->hca_ptr->port_num,
+                         0,
+                         &cm_ptr->dst.gid))
+               goto bail;
+
+       cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+       cm_ptr->dst.p_size = htonl(p_size);
+       iovec[0].iov_base = &cm_ptr->dst;
        iovec[0].iov_len  = sizeof(ib_qp_cm_t);
        if (p_size) {
                iovec[1].iov_base = p_data;
                iovec[1].iov_len  = p_size;
        }
-       len = writev( cm_ptr->socket, iovec, (p_size ? 2:1) );
+       len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
        if (len != (p_size + sizeof(ib_qp_cm_t))) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " accept_final: ERR %s, wcnt=%d\n",
+                            " accept_rtu: ERR %s, wcnt=%d\n",
                             strerror(errno), len); 
                goto bail;
        }
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-                    " accept_final: SRC port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
-                    qp_cm.port, qp_cm.lid, qp_cm.qpn, qp_cm.p_size ); 
-       
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " accept_usr: local port=0x%x lid=0x%x"
+                    " qpn=0x%x psize=%d\n",
+                    ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+                    ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+        dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                     " accept_usr SRC GID subnet %016llx id %016llx\n",
+                     (unsigned long long) 
+                       cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+                     (unsigned long long) 
+                       cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+
+       /* save state and reference to EP, queue for RTU data */
+       cm_ptr->ep = ep_ptr;
+       cm_ptr->hca = ia_ptr->hca_ptr;
+       cm_ptr->state = SCM_ACCEPTED;
+
+       /* restore remote address information for query */
+       dapl_os_memcpy( &cm_ptr->dst.ia_address, 
+                       &ep_ptr->remote_ia_address,
+                       sizeof(cm_ptr->dst.ia_address));
+
+       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); 
+       dapli_cm_queue(cm_ptr);
+       return DAT_SUCCESS;
+bail:
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR," accept_rtu: ERR !QP_RTR_RTS \n"); 
+       dapli_cm_destroy(cm_ptr);
+       dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+       return DAT_INTERNAL_ERROR;
+}
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+void 
+dapli_socket_accept_rtu(ib_cm_handle_t cm_ptr)
+{
+       int             len;
+       short           rtu_data = 0;
+
        /* complete handshake after final QP state change */
-       len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data) );
-       if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
+       len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
+       if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " accept_final: ERR %s, rcnt=%d rdata=%x\n",
-                            strerror(errno), len, ntohs(rtu_data) ); 
+                            " accept_rtu: ERR %s, rcnt=%d rdata=%x\n",
+                            strerror(errno), len, ntohs(rtu_data)); 
                goto bail;
        }
 
+       /* save state and reference to EP, queue for disc event */
+       cm_ptr->state = SCM_CONNECTED;
+
        /* final data exchange if remote QP state is good to go */
        dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
-       dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
-       return DAT_SUCCESS;
-
+       dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
+       return;
 bail:
-       dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n"); 
-       if ( cm_ptr >= 0 )
-               close( cm_ptr->socket );
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
-       return DAT_INTERNAL_ERROR;
+       dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
+       dapli_cm_destroy(cm_ptr);
+       dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL, cm_ptr->sp);
 }
 
 
@@ -528,18 +746,13 @@ dapls_ib_connect (
        
        dapl_dbg_log ( DAPL_DBG_TYPE_EP,
                        " connect(ep_handle %p ....)\n", ep_handle);
-       /*
-        *  Sanity check
-        */
-       if ( NULL == ep_handle ) 
-               return DAT_SUCCESS;
 
        ep_ptr = (DAPL_EP*)ep_handle;
        qp_ptr = ep_ptr->qp_handle;
 
-       return (dapli_socket_connect(   ep_ptr, remote_ia_address, 
-                                       remote_conn_qual,
-                                       private_data_size, private_data ));
+       return (dapli_socket_connect(ep_ptr, remote_ia_address, 
+                                    remote_conn_qual,
+                                    private_data_size, private_data));
 }
 
 /*
@@ -556,12 +769,11 @@ dapls_ib_connect (
  *
  * Returns:
  *     DAT_SUCCESS
- *
  */
 DAT_RETURN
-dapls_ib_disconnect (
+dapls_ib_disconnect(
        IN      DAPL_EP                 *ep_ptr,
-       IN      DAT_CLOSE_FLAGS         close_flags )
+       IN      DAT_CLOSE_FLAGS         close_flags)
 {
        ib_cm_handle_t  cm_ptr = ep_ptr->cm_handle;
 
@@ -569,28 +781,13 @@ dapls_ib_disconnect (
                        "dapls_ib_disconnect(ep_handle %p ....)\n",
                        ep_ptr);
 
-       if ( cm_ptr->socket >= 0 ) {
-               close( cm_ptr->socket );
-               cm_ptr->socket = -1;
-       }
-       
        /* reinit to modify QP state */
        dapls_ib_reinit_ep(ep_ptr);
 
-       if ( ep_ptr->cr_ptr ) {
-               dapls_cr_callback ( ep_ptr->cm_handle,
-                                   IB_CME_DISCONNECTED,
-                                   NULL,
-                                   ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr );
-       } else {
-               dapl_evd_connection_callback ( ep_ptr->cm_handle,
-                                               IB_CME_DISCONNECTED,
-                                               NULL,
-                                               ep_ptr );
-               ep_ptr->cm_handle = NULL;
-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-       }       
-       return DAT_SUCCESS;
+       if (cm_ptr == NULL) 
+               return DAT_SUCCESS;
+       else
+               return(dapli_socket_disconnect(cm_ptr));
 }
 
 /*
@@ -679,13 +876,14 @@ dapls_ib_remove_conn_listener (
                        ia_ptr, sp_ptr, cm_ptr );
 
        /* close accepted socket, free cm_srvc_handle and return */
-       if ( cm_ptr != NULL ) {
-               if ( cm_ptr->l_socket >= 0 ) {
-                       close( cm_ptr->l_socket );
+       if (cm_ptr != NULL) {
+               if (cm_ptr->socket >= 0) {
+                       close(cm_ptr->socket );
                        cm_ptr->socket = -1;
                }
                /* cr_thread will free */
                sp_ptr->cm_srvc_handle = NULL;
+               write(g_scm_pipe[1], "w", sizeof "w");
        }
        return DAT_SUCCESS;
 }
@@ -720,23 +918,22 @@ dapls_ib_accept_connection (
        DAPL_CR                 *cr_ptr;
        DAPL_EP                 *ep_ptr;
        
-       dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                     "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",
-                     cr_handle, ep_handle, p_data, p_size  );
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",
+                    cr_handle, ep_handle, p_data, p_size  );
 
-       cr_ptr  = (DAPL_CR *) cr_handle;
-       ep_ptr  = (DAPL_EP *) ep_handle;
+       cr_ptr = (DAPL_CR *)cr_handle;
+       ep_ptr = (DAPL_EP *)ep_handle;
        
        /* allocate and attach a QP if necessary */
-       if ( ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED ) {
+       if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {
                DAT_RETURN status;
-               status = dapls_ib_qp_alloc( ep_ptr->header.owner_ia, 
-                                           ep_ptr, ep_ptr );
-               if ( status != DAT_SUCCESS )
+               status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia, 
+                                          ep_ptr, ep_ptr);
+               if (status != DAT_SUCCESS)
                        return status;
        }
-    
-       return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
+       return(dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));
 }
 
 
@@ -759,19 +956,29 @@ dapls_ib_accept_connection (
 DAT_RETURN
 dapls_ib_reject_connection (
        IN  ib_cm_handle_t      ib_cm_handle,
-       IN  int                 reject_reason )
+       IN  int                 reject_reason)
 {
        ib_cm_srvc_handle_t     cm_ptr = ib_cm_handle;
+       struct iovec            iovec;
 
        dapl_dbg_log (DAPL_DBG_TYPE_EP,
                      "dapls_ib_reject_connection(cm_handle %p reason %x)\n",
-                     ib_cm_handle, reject_reason );
-
-       /* just close the socket and return */
-       if ( cm_ptr->socket > 0 ) {
-               close( cm_ptr->socket );
+                     ib_cm_handle, reject_reason);
+
+       /* write reject data to indicate reject */
+       if (cm_ptr->socket >= 0) {
+               cm_ptr->dst.rej = (uint16_t)reject_reason;
+               cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
+               iovec.iov_base = &cm_ptr->dst;
+               iovec.iov_len  = sizeof(ib_qp_cm_t);
+               writev(cm_ptr->socket, &iovec, 1);
+               close(cm_ptr->socket);
                cm_ptr->socket = -1;
        }
+
+       /* cr_thread will destroy CR */
+       cm_ptr->state = SCM_REJECTED;
+        write(g_scm_pipe[1], "w", sizeof "w");
        return DAT_SUCCESS;
 }
 
@@ -991,24 +1198,25 @@ dapls_ib_get_cm_event (
     return ib_cm_event;
 }
 
-/* async CR processing thread to avoid blocking applications */
+/* outbound/inbound CR processing thread to avoid blocking applications */
+#define SCM_MAX_CONN 8192
 void cr_thread(void *arg) 
 {
     struct dapl_hca    *hca_ptr = arg;
-    ib_cm_srvc_handle_t        cr, next_cr;
-    int                        max_fd;
-    fd_set             rfd,rfds;
-    struct timeval     to;
+    ib_cm_handle_t     cr, next_cr;
+    int                ret,idx;
+    char               rbuf[2];
+    struct pollfd      ufds[SCM_MAX_CONN];
      
     dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca %p\n",hca_ptr);
 
     dapl_os_lock( &hca_ptr->ib_trans.lock );
     hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
     while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
-       
-       FD_ZERO( &rfds ); 
-       max_fd = -1;
-       
+       idx=0;
+       ufds[idx].fd = g_scm_pipe[0]; /* wakeup and process work */
+        ufds[idx].events = POLLIN;
+
        if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
             next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
        else
@@ -1016,51 +1224,70 @@ void cr_thread(void *arg)
 
        while (next_cr) {
            cr = next_cr;
-           dapl_dbg_log (DAPL_DBG_TYPE_CM," thread: cm_ptr %p\n", cr );
-           if (cr->l_socket == -1 || 
+           if ((cr->socket == -1) || 
                hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
 
-               dapl_dbg_log(DAPL_DBG_TYPE_CM," thread: Freeing %p\n", cr);
+               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: Free %p\n", cr);
                next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
                                                (DAPL_LLIST_ENTRY*)&cr->entry );
                dapl_llist_remove_entry(&hca_ptr->ib_trans.list, 
                                        (DAPL_LLIST_ENTRY*)&cr->entry);
-               dapl_os_free( cr, sizeof(*cr) );
+               dapl_os_free(cr, sizeof(*cr));
                continue;
            }
-                 
-           FD_SET( cr->l_socket, &rfds ); /* add to select set */
-           if ( cr->l_socket > max_fd )
-               max_fd = cr->l_socket;
-
-           /* individual select poll to check for work */
-           FD_ZERO(&rfd);
-           FD_SET(cr->l_socket, &rfd);
-           dapl_os_unlock(&hca_ptr->ib_trans.lock);    
-           to.tv_sec  = 0;
-           to.tv_usec = 0;
-           if ( select(cr->l_socket + 1,&rfd, NULL, NULL, &to) < 0) {
-               dapl_dbg_log (DAPL_DBG_TYPE_CM,
-                         " thread: ERR %s on cr %p sk %d\n", 
-                         strerror(errno), cr, cr->l_socket);
-               close(cr->l_socket);
-               cr->l_socket = -1;
-           } else if ( FD_ISSET(cr->l_socket, &rfd) && 
-                       dapli_socket_accept(cr)) {
-               close(cr->l_socket);
-               cr->l_socket = -1;
+
+           if (idx==SCM_MAX_CONN-1) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",idx+1);
+               continue;
            }
-           dapl_os_lock( &hca_ptr->ib_trans.lock );
+               
+           /* Add to ufds for poll, check for immediate work */
+           ufds[++idx].fd = cr->socket; /* add listen or cr */
+           ufds[idx].events = POLLIN;
+
+           /* check socket for event, accept in or connect out */
+           dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n", 
+                               cr, cr->socket, ufds[idx].fd);
+           dapl_os_unlock(&hca_ptr->ib_trans.lock);
+           ret = poll(&ufds[idx],1,1);
+           dapl_dbg_log(DAPL_DBG_TYPE_CM," poll wakeup ret=%d cr->st=%d ev=%d fd=%d\n",
+                        ret,cr->state,ufds[idx].revents,ufds[idx].fd);
+
+           /* data on listen, qp exchange, and on disconnect request */
+           if ((ret == 1) && ufds[idx].revents == POLLIN) {
+               if (cr->socket > 0) {
+                       if (cr->state == SCM_LISTEN)
+                               dapli_socket_accept(cr);
+                       else if (cr->state == SCM_ACCEPTED)
+                               dapli_socket_accept_rtu(cr);
+                       else if (cr->state == SCM_CONN_PENDING)
+                               dapli_socket_connect_rtu(cr);
+                       else if (cr->state == SCM_CONNECTED)
+                               dapli_socket_disconnect(cr);
+               }
+           } else if (ret != 0) {
+               dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                            " cr_thread(cr=%p) st=%d poll ERR= %s\n",
+                            cr,cr->state,strerror(errno));
+               /* POLLUP or poll error case, issue event if connected */
+               if (cr->state == SCM_CONNECTED)
+                       dapli_socket_disconnect(cr);
+           } 
+           dapl_os_lock(&hca_ptr->ib_trans.lock);
            next_cr =  dapl_llist_next_entry(&hca_ptr->ib_trans.list,
-                                            (DAPL_LLIST_ENTRY*)&cr->entry );
+                                            (DAPL_LLIST_ENTRY*)&cr->entry);
        } 
-       dapl_os_unlock( &hca_ptr->ib_trans.lock );
-       to.tv_sec  = 0;
-       to.tv_usec = 100000; /* wakeup and check destroy */
-       select(max_fd + 1, &rfds, NULL, NULL, &to);
-       dapl_os_lock( &hca_ptr->ib_trans.lock );
+       dapl_os_unlock(&hca_ptr->ib_trans.lock);
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, %d\n", idx+1);
+       poll(ufds,idx+1,-1); /* infinite, all sockets and pipe */
+       /* if pipe used to wakeup, consume */
+       if (ufds[0].revents == POLLIN)
+               read(g_scm_pipe[0], rbuf, 2);
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
+       dapl_os_lock(&hca_ptr->ib_trans.lock);
     } 
-    dapl_os_unlock( &hca_ptr->ib_trans.lock ); 
+    dapl_os_unlock(&hca_ptr->ib_trans.lock);   
     hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
     dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p) exit\n",hca_ptr);
 }
index 7ac70376a21a26d0426e33f0da855050b0492613..56b729e1732c04e180134467d8ac46d101729be9 100644 (file)
@@ -97,7 +97,7 @@ void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
         while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
                 struct timespec sleep, remain;
                 sleep.tv_sec = 0;
-                sleep.tv_nsec = 200000000; /* 200 ms */
+                sleep.tv_nsec = 2000000; /* 2 ms */
                 dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
                              " cq_thread_destroy: waiting for cq_thread\n");
                 nanosleep (&sleep, &remain);
@@ -422,12 +422,21 @@ DAT_RETURN dapls_ib_cq_free (
        IN  DAPL_IA             *ia_ptr,
        IN  DAPL_EVD            *evd_ptr)
 {
-       if ( evd_ptr->ib_cq_handle != IB_INVALID_HANDLE ) {
-               /* copy all entries on CQ to EVD before destroying */   
-               dapls_evd_copy_cq(evd_ptr); 
+       DAT_EVENT event;
+
+       if (evd_ptr->ib_cq_handle != IB_INVALID_HANDLE) {
+               /* pull off CQ and EVD entries and toss */      
+               while (dapls_ib_completion_poll == DAT_SUCCESS);
+               while (dapl_evd_dequeue(evd_ptr,&event) != DAT_QUEUE_EMPTY);
+#if 1 
+               ibv_destroy_cq(evd_ptr->ib_cq_handle); 
+               evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
+               return DAT_SUCCESS;
+#else
                if (ibv_destroy_cq(evd_ptr->ib_cq_handle)) 
                        return(dapl_convert_errno(errno,"destroy_cq"));
                evd_ptr->ib_cq_handle = IB_INVALID_HANDLE;
+#endif
        }
        return DAT_SUCCESS;
 }
@@ -600,7 +609,7 @@ dapls_ib_wait_object_wait (
                status = ETIMEDOUT;
        
        dapl_dbg_log (DAPL_DBG_TYPE_CM, 
-                     " cq_object_wait: RET evd %p ibv_cq %p ibv_ctx %p %s\n",
+                     " cq_object_wait: RET evd %p ibv_cq %p %s\n",
                      evd_ptr, ibv_cq,strerror(errno));
        
        return(dapl_convert_errno(status,"cq_wait_object_wait"));
index bea3e4d8630bb38fb445655a354b2de4c725547e..b15f34765b5d7a9ebf4c8f84752634bc7f75426b 100644 (file)
@@ -35,7 +35,7 @@
  *
  *   Description: 
  *
- *   The uDAPL openib provider - DTO operations and CQE macros 
+ *   The OpenIB SCM (socket CM) provider - DTO operations and CQE macros 
  *
  ****************************************************************************
  *                Source Control System Information
@@ -61,26 +61,25 @@ STATIC _INLINE_ int dapls_cqe_opcode(ib_work_completion_t *cqe_p);
  */
 STATIC _INLINE_ DAT_RETURN 
 dapls_ib_post_recv (
-       IN  DAPL_EP             *ep_ptr,
+       IN  DAPL_EP             *ep_ptr,
        IN  DAPL_COOKIE         *cookie,
-       IN  DAT_COUNT           segments,
+       IN  DAT_COUNT           segments,
        IN  DAT_LMR_TRIPLET     *local_iov )
 {
-       ib_data_segment_t       ds_array[DEFAULT_DS_ENTRIES];
-       ib_data_segment_t       *ds_array_p, *ds_array_start_p = NULL;
-       struct ibv_recv_wr      wr;
-       struct ibv_recv_wr      *bad_wr;
-       DAT_COUNT               i, total_len;
-       int                     ret;
+       ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
+       ib_data_segment_t *ds_array_p;
+       struct ibv_recv_wr wr;
+       struct ibv_recv_wr *bad_wr;
+       DAT_COUNT i, total_len;
        
-       dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                     " post_rcv: ep %p cookie %p segs %d l_iov %p\n",
-                     ep_ptr, cookie, segments, local_iov);
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " post_rcv: ep %p cookie %p segs %d l_iov %p\n",
+                    ep_ptr, cookie, segments, local_iov);
 
-       if ( segments <= DEFAULT_DS_ENTRIES 
+       if (segments <= DEFAULT_DS_ENTRIES
                ds_array_p = ds_array;
        else
-               ds_array_start_p = ds_array_p =
+               ds_array_p = 
                        dapl_os_alloc(segments * sizeof(ib_data_segment_t));
 
        if (NULL == ds_array_p)
@@ -93,18 +92,18 @@ dapls_ib_post_recv (
        wr.wr_id = (uint64_t)(uintptr_t)cookie;
        wr.sg_list = ds_array_p;
 
-       for (i = 0; i < segments; i++ ) {
-               if ( !local_iov[i].segment_length )
+       for (i = 0; i < segments; i++) {
+               if (!local_iov[i].segment_length)
                        continue;
 
-               ds_array_p->addr  = (uint64_t) local_iov[i].virtual_address;
+               ds_array_p->addr = (uint64_t) local_iov[i].virtual_address;
                ds_array_p->length = local_iov[i].segment_length;
-               ds_array_p->lkey  = local_iov[i].lmr_context;
+               ds_array_p->lkey = local_iov[i].lmr_context;
                
-               dapl_dbg_log (  DAPL_DBG_TYPE_EP, 
-                               " post_rcv: l_key 0x%x va %p len %d\n",
-                               ds_array_p->lkey, ds_array_p->addr, 
-                               ds_array_p->length );
+               dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                            " post_rcv: l_key 0x%x va %p len %d\n",
+                            ds_array_p->lkey, ds_array_p->addr, 
+                            ds_array_p->length );
 
                total_len += ds_array_p->length;
                wr.num_sge++;
@@ -114,18 +113,12 @@ dapls_ib_post_recv (
        if (cookie != NULL) 
                cookie->val.dto.size = total_len;
 
-       ret = ibv_post_recv(ep_ptr->qp_handle, &wr, &bad_wr);
+       if (ibv_post_recv(ep_ptr->qp_handle, &wr, &bad_wr))
+               return( dapl_convert_errno(errno,"ibv_recv") );
        
-       if (ds_array_start_p != NULL)
-               dapl_os_free(ds_array_start_p, segments * sizeof(ib_data_segment_t));
-
-       if (ret)
-               return( dapl_convert_errno(EFAULT,"ibv_recv") );
-
        return DAT_SUCCESS;
 }
 
-
 /*
  * dapls_ib_post_send
  *
@@ -133,35 +126,36 @@ dapls_ib_post_recv (
  */
 STATIC _INLINE_ DAT_RETURN 
 dapls_ib_post_send (
-    IN  DAPL_EP                        *ep_ptr,
-    IN  ib_send_op_type_t       op_type,
-    IN  DAPL_COOKIE            *cookie,
-    IN  DAT_COUNT              segments,
-    IN  DAT_LMR_TRIPLET                *local_iov,
-    IN  const DAT_RMR_TRIPLET  *remote_iov,
-    IN  DAT_COMPLETION_FLAGS   completion_flags)
+       IN  DAPL_EP                     *ep_ptr,
+       IN  ib_send_op_type_t           op_type,
+       IN  DAPL_COOKIE                 *cookie,
+       IN  DAT_COUNT                   segments,
+       IN  DAT_LMR_TRIPLET             *local_iov,
+       IN  const DAT_RMR_TRIPLET       *remote_iov,
+       IN  DAT_COMPLETION_FLAGS        completion_flags)
 {
-       dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                     " post_snd: ep %p op %d ck %p sgs %d l_iov %p r_iov %p f %d\n",
-                     ep_ptr, op_type, cookie, segments, local_iov, 
-                     remote_iov, completion_flags);
-
-       ib_data_segment_t       ds_array[DEFAULT_DS_ENTRIES];
-       ib_data_segment_t       *ds_array_p, *ds_array_start_p = NULL;
-       struct ibv_send_wr      wr;
-       struct ibv_send_wr      *bad_wr;
-       ib_hca_transport_t      *ibt_ptr = &ep_ptr->header.owner_ia->hca_ptr->ib_trans;
-       DAT_COUNT               i, total_len;
-       int                     ret;
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " post_snd: ep %p op %d ck %p sgs",
+                    "%d l_iov %p r_iov %p f %d\n",
+                    ep_ptr, op_type, cookie, segments, local_iov, 
+                    remote_iov, completion_flags);
+
+       ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
+       ib_data_segment_t *ds_array_p;
+       struct ibv_send_wr wr;
+       struct ibv_send_wr *bad_wr;
+       ib_hca_transport_t *ibt_ptr = 
+               &ep_ptr->header.owner_ia->hca_ptr->ib_trans;
+       DAT_COUNT i, total_len;
        
-       dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                     " post_snd: ep %p cookie %p segs %d l_iov %p\n",
-                     ep_ptr, cookie, segments, local_iov);
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " post_snd: ep %p cookie %p segs %d l_iov %p\n",
+                    ep_ptr, cookie, segments, local_iov);
 
-       if( segments <= DEFAULT_DS_ENTRIES 
+       if(segments <= DEFAULT_DS_ENTRIES
                ds_array_p = ds_array;
        else
-               ds_array_start_p = ds_array_p =
+               ds_array_p = 
                        dapl_os_alloc(segments * sizeof(ib_data_segment_t));
 
        if (NULL == ds_array_p)
@@ -180,14 +174,14 @@ dapls_ib_post_send (
                if ( !local_iov[i].segment_length )
                        continue;
 
-               ds_array_p->addr  = (uint64_t) local_iov[i].virtual_address;
+               ds_array_p->addr = (uint64_t) local_iov[i].virtual_address;
                ds_array_p->length = local_iov[i].segment_length;
-               ds_array_p->lkey  = local_iov[i].lmr_context;
+               ds_array_p->lkey = local_iov[i].lmr_context;
                
-               dapl_dbg_log (  DAPL_DBG_TYPE_EP, 
-                               " post_snd: lkey 0x%x va %p len %d \n",
-                               ds_array_p->lkey, ds_array_p->addr, 
-                               ds_array_p->length );
+               dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                            " post_snd: lkey 0x%x va %p len %d\n",
+                            ds_array_p->lkey, ds_array_p->addr, 
+                            ds_array_p->length );
 
                total_len += ds_array_p->length;
                wr.num_sge++;
@@ -196,20 +190,21 @@ dapls_ib_post_send (
 
        if (cookie != NULL) 
                cookie->val.dto.size = total_len;
-       
+
        if ((op_type == OP_RDMA_WRITE) || (op_type == OP_RDMA_READ)) {
                wr.wr.rdma.remote_addr = remote_iov->target_address;
                wr.wr.rdma.rkey = remote_iov->rmr_context;
-               dapl_dbg_log (  DAPL_DBG_TYPE_EP, 
-                               " post_snd_rdma: rkey 0x%x va %#016Lx\n",
-                               wr.wr.rdma.rkey, wr.wr.rdma.remote_addr );
+               dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                            " post_snd_rdma: rkey 0x%x va %#016Lx\n",
+                            wr.wr.rdma.rkey, wr.wr.rdma.remote_addr);
        }
 
+
        /* inline data for send or write ops */
-       if ((total_len <= ibt_ptr->max_inline_send ) && 
+       if ((total_len <= ibt_ptr->max_inline_send) && 
           ((op_type == OP_SEND) || (op_type == OP_RDMA_WRITE))) 
                wr.send_flags |= IBV_SEND_INLINE;
-
+       
        /* set completion flags in work request */
        wr.send_flags |= (DAT_COMPLETION_SUPPRESS_FLAG & 
                                completion_flags) ? 0 : IBV_SEND_SIGNALED;
@@ -218,24 +213,19 @@ dapls_ib_post_send (
        wr.send_flags |= (DAT_COMPLETION_SOLICITED_WAIT_FLAG & 
                                completion_flags) ? IBV_SEND_SOLICITED : 0;
 
-       dapl_dbg_log (DAPL_DBG_TYPE_EP, 
-                     " post_snd: op 0x%x flags 0x%x sglist %p, %d\n", 
-                       wr.opcode, wr.send_flags, wr.sg_list, wr.num_sge);
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                    " post_snd: op 0x%x flags 0x%x sglist %p, %d\n", 
+                    wr.opcode, wr.send_flags, wr.sg_list, wr.num_sge);
 
-       ret = ibv_post_send(ep_ptr->qp_handle, &wr, &bad_wr);
+       if (ibv_post_send(ep_ptr->qp_handle, &wr, &bad_wr))
+               return( dapl_convert_errno(errno,"ibv_recv") );
        
-       if (ds_array_start_p != NULL)
-               dapl_os_free(ds_array_start_p, segments * sizeof(ib_data_segment_t));
-
-       if (ret)
-               return( dapl_convert_errno(EFAULT,"ibv_send") );
-
-       dapl_dbg_log (DAPL_DBG_TYPE_EP," post_snd: returned\n");
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," post_snd: returned\n");
        return DAT_SUCCESS;
 }
 
 STATIC _INLINE_ DAT_RETURN 
-dapls_ib_optional_prv_dat (
+dapls_ib_optional_prv_dat(
        IN  DAPL_CR             *cr_ptr,
        IN  const void          *event_data,
        OUT   DAPL_CR           **cr_pp)
@@ -243,34 +233,68 @@ dapls_ib_optional_prv_dat (
     return DAT_SUCCESS;
 }
 
+/* map Work Completions to DAPL WR operations */
 STATIC _INLINE_ int dapls_cqe_opcode(ib_work_completion_t *cqe_p)
 {
-    switch (cqe_p->opcode) {
+       switch (cqe_p->opcode) {
        case IBV_WC_SEND:
-           return (OP_SEND);
+               return (OP_SEND);
        case IBV_WC_RDMA_WRITE:
-           return (OP_RDMA_WRITE);
+               if (cqe_p->wc_flags & IBV_WC_WITH_IMM)
+                       return (OP_RDMA_WRITE_IMM);
+               else
+                       return (OP_RDMA_WRITE);
        case IBV_WC_RDMA_READ:
-           return (OP_RDMA_READ);
+               return (OP_RDMA_READ);
        case IBV_WC_COMP_SWAP:
-           return (OP_COMP_AND_SWAP);
+               return (OP_COMP_AND_SWAP);
        case IBV_WC_FETCH_ADD:
-           return (OP_FETCH_AND_ADD);
+               return (OP_FETCH_AND_ADD);
        case IBV_WC_BIND_MW:
-           return (OP_BIND_MW);
+               return (OP_BIND_MW);
        case IBV_WC_RECV:
-           return (OP_RECEIVE);
+               if (cqe_p->wc_flags & IBV_WC_WITH_IMM)
+                       return (OP_RECEIVE_IMM);
+               else
+                       return (OP_RECEIVE);
        case IBV_WC_RECV_RDMA_WITH_IMM:
-           return (OP_RECEIVE_IMM);
+               return (OP_RECEIVE_IMM);
        default:
-           return (OP_INVALID);
-    }
+               return (OP_INVALID);
+       }
+}
+
+#define DAPL_GET_CQE_OPTYPE(cqe_p) dapls_cqe_opcode(cqe_p)
+#define DAPL_GET_CQE_WRID(cqe_p) ((ib_work_completion_t*)cqe_p)->wr_id
+#define DAPL_GET_CQE_STATUS(cqe_p) ((ib_work_completion_t*)cqe_p)->status
+#define DAPL_GET_CQE_BYTESNUM(cqe_p) ((ib_work_completion_t*)cqe_p)->byte_len
+#define DAPL_GET_CQE_IMMED_DATA(cqe_p) ((ib_work_completion_t*)cqe_p)->imm_data
+#define DAPL_GET_CQE_VENDOR_ERR(cqe_p) ((ib_work_completion_t*)cqe_p)->vendor_err
+
+STATIC _INLINE_ char * dapls_dto_op_str(int op)
+{
+    static char *optable[] =
+    {
+        "OP_RDMA_WRITE",
+        "OP_RDMA_WRITE_IMM",
+        "OP_SEND",
+        "OP_SEND_IMM",
+        "OP_RDMA_READ",
+        "OP_COMP_AND_SWAP",
+        "OP_FETCH_AND_ADD",
+        "OP_RECEIVE",
+        "OP_RECEIVE_IMM",
+        "OP_BIND_MW",
+        0
+    };
+    return ((op < 0 || op > 9) ? "Invalid CQE OP?" : optable[op]);
 }
 
-#define DAPL_GET_CQE_OPTYPE(cqe_p)     dapls_cqe_opcode(cqe_p)
-#define DAPL_GET_CQE_WRID(cqe_p)       ((ib_work_completion_t*)cqe_p)->wr_id
-#define DAPL_GET_CQE_STATUS(cqe_p)     ((ib_work_completion_t*)cqe_p)->status
-#define DAPL_GET_CQE_BYTESNUM(cqe_p)   ((ib_work_completion_t*)cqe_p)->byte_len
-#define DAPL_GET_CQE_IMMED_DATA(cqe_p) ((ib_work_completion_t*)cqe_p)->imm_data
+static _INLINE_ char *
+dapls_cqe_op_str(IN ib_work_completion_t *cqe_ptr)
+{
+    return dapls_dto_op_str(DAPL_GET_CQE_OPTYPE(cqe_ptr));
+}
+#define DAPL_GET_CQE_OP_STR(cqe) dapls_cqe_op_str(cqe)
 
 #endif /*  _DAPL_IB_DTO_H_ */
index de36c0f201fe5ba73da985884f774d55037cecf8..6a5e4a214abc5393f9ec46c63ba1bd6f26e26ac7 100644 (file)
@@ -1,4 +1,6 @@
 /*
+ * Copyright (c) 2005-2007 Intel Corporation.  All rights reserved.
+ *
  * This Software is licensed under one of the following licenses:
  *
  * 1) under the terms of the "Common Public License 1.0" a copy of which is
 
 /**********************************************************************
  * 
- * MODULE: dapl_det_mem.c
+ * MODULE: dapl_ib_mem.c
  *
- * PURPOSE: Intel DET APIs: Memory windows, registration,
- *           and protection domain 
+ * PURPOSE: Memory windows, registration, and protection domain 
  *
- * $Id: $
+ * $Id:$
  *
  **********************************************************************/
 
@@ -61,8 +62,7 @@
  *
  */
 STATIC _INLINE_ int
-dapls_convert_privileges (
-    IN DAT_MEM_PRIV_FLAGS      privileges)
+dapls_convert_privileges(IN DAT_MEM_PRIV_FLAGS privileges)
 {
        int     access = 0;
 
@@ -101,16 +101,15 @@ dapls_convert_privileges (
  *
  */
 DAT_RETURN
-dapls_ib_pd_alloc (
-       IN  DAPL_IA     *ia_ptr,
-       IN  DAPL_PZ     *pz )
+dapls_ib_pd_alloc(IN DAPL_IA *ia_ptr, IN DAPL_PZ *pz)
 {
        /* get a protection domain */
        pz->pd_handle = ibv_alloc_pd(ia_ptr->hca_ptr->ib_hca_handle);
        if (!pz->pd_handle) 
                return(dapl_convert_errno(ENOMEM,"alloc_pd"));
 
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " pd_alloc: pd_handle=%p\n", 
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                    " pd_alloc: pd_handle=%p\n", 
                     pz->pd_handle );
 
        return DAT_SUCCESS;
@@ -134,13 +133,18 @@ dapls_ib_pd_alloc (
  *
  */
 DAT_RETURN
-dapls_ib_pd_free (
-       IN  DAPL_PZ     *pz )
+dapls_ib_pd_free(IN DAPL_PZ *pz )
 {
        if (pz->pd_handle != IB_INVALID_HANDLE) {
+#if 1
+               ibv_dealloc_pd(pz->pd_handle);
+               pz->pd_handle = IB_INVALID_HANDLE;      
+               return DAT_SUCCESS;
+#else
                if (ibv_dealloc_pd(pz->pd_handle))
                        return(dapl_convert_errno(errno,"dealloc_pd"));
                pz->pd_handle = IB_INVALID_HANDLE;      
+#endif
        }
        return DAT_SUCCESS;
 }
@@ -165,25 +169,24 @@ dapls_ib_pd_free (
  *
  */
 DAT_RETURN
-dapls_ib_mr_register (
-        IN  DAPL_IA                 *ia_ptr,
-        IN  DAPL_LMR                *lmr,
-        IN  DAT_PVOID                virt_addr,
-        IN  DAT_VLEN                length,
-        IN  DAT_MEM_PRIV_FLAGS      privileges)
+dapls_ib_mr_register(IN  DAPL_IA *ia_ptr,
+                    IN  DAPL_LMR *lmr,
+                    IN  DAT_PVOID virt_addr,
+                    IN  DAT_VLEN length,
+                    IN  DAT_MEM_PRIV_FLAGS privileges)
 {
        ib_pd_handle_t  ib_pd_handle;
 
        ib_pd_handle = ((DAPL_PZ *)lmr->param.pz_handle)->pd_handle;
        
-       dapl_dbg_log (  DAPL_DBG_TYPE_UTIL, 
-                       " mr_register: ia=%p, lmr=%p va=%p ln=%d pv=0x%x\n", 
-                       ia_ptr, lmr, virt_addr, length, privileges );
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                    " mr_register: ia=%p, lmr=%p va=%p ln=%d pv=0x%x\n", 
+                    ia_ptr, lmr, virt_addr, length, privileges );
 
        /* TODO: shared memory */
        if (lmr->param.mem_type == DAT_MEM_TYPE_SHARED_VIRTUAL) {
-               dapl_dbg_log( DAPL_DBG_TYPE_ERR,
-                    " mr_register_shared: NOT IMPLEMENTED\n");    
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                            " mr_register_shared: NOT IMPLEMENTED\n");    
                return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
        }
 
@@ -200,16 +203,16 @@ dapls_ib_mr_register (
        lmr->param.lmr_context = lmr->mr_handle->lkey; 
        lmr->param.rmr_context = lmr->mr_handle->rkey;
        lmr->param.registered_size = length;
-       lmr->param.registered_address = (DAT_VADDR)(uintptr_t) virt_addr;
+       lmr->param.registered_address = (DAT_VADDR)(uintptr_t)virt_addr;
 
-       dapl_dbg_log (  DAPL_DBG_TYPE_UTIL, 
-                       " mr_register: mr=%p h %x pd %p ctx %p ,lkey=0x%x, rkey=0x%x priv=%x\n", 
-                       lmr->mr_handle, lmr->mr_handle->handle
-                       lmr->mr_handle->pd,
-                       lmr->mr_handle->context,
-                       lmr->mr_handle->lkey, 
-                       lmr->mr_handle->rkey, 
-                       length, dapls_convert_privileges(privileges) );
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                    " mr_register: mr=%p addr=%p h %x pd %p ctx %p "
+                    "lkey=0x%x rkey=0x%x priv=%x\n"
+                    lmr->mr_handle, lmr->mr_handle->addr, 
+                    lmr->mr_handle->handle,    
+                    lmr->mr_handle->pd, lmr->mr_handle->context,
+                    lmr->mr_handle->lkey, lmr->mr_handle->rkey, 
+                    length, dapls_convert_privileges(privileges));
 
        return DAT_SUCCESS;
 }
@@ -231,8 +234,7 @@ dapls_ib_mr_register (
  *
  */
 DAT_RETURN
-dapls_ib_mr_deregister (
-       IN  DAPL_LMR    *lmr )
+dapls_ib_mr_deregister(IN DAPL_LMR *lmr)
 {
        if (lmr->mr_handle != IB_INVALID_HANDLE) {
                if (ibv_dereg_mr(lmr->mr_handle))
@@ -263,13 +265,14 @@ dapls_ib_mr_deregister (
  *
  */
 DAT_RETURN
-dapls_ib_mr_register_shared (
-       IN  DAPL_IA                 *ia_ptr,
-       IN  DAPL_LMR                *lmr,
-       IN  DAT_MEM_PRIV_FLAGS  privileges )
+dapls_ib_mr_register_shared(IN DAPL_IA *ia_ptr,
+                           IN DAPL_LMR *lmr,
+                           IN DAT_MEM_PRIV_FLAGS privileges)
 {
-    dapl_dbg_log(DAPL_DBG_TYPE_ERR," mr_register_shared: NOT IMPLEMENTED\n");
-    return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
+    dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                " mr_register_shared: NOT IMPLEMENTED\n");
+
+    return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
 /*
@@ -289,12 +292,13 @@ dapls_ib_mr_register_shared (
  *
  */
 DAT_RETURN
-dapls_ib_mw_alloc (
-       IN  DAPL_RMR    *rmr )
+dapls_ib_mw_alloc (IN DAPL_RMR *rmr)
 {
 
-       dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_alloc: NOT IMPLEMENTED\n");
-       return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                    " mw_alloc: NOT IMPLEMENTED\n");
+       
+       return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
 /*
@@ -314,11 +318,12 @@ dapls_ib_mw_alloc (
  *
  */
 DAT_RETURN
-dapls_ib_mw_free (
-       IN  DAPL_RMR    *rmr )
+dapls_ib_mw_free(IN DAPL_RMR *rmr)
 {      
-       dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_free: NOT IMPLEMENTED\n");
-       return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                    " mw_free: NOT IMPLEMENTED\n");
+
+       return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
 /*
@@ -339,17 +344,18 @@ dapls_ib_mw_free (
  *
  */
 DAT_RETURN
-dapls_ib_mw_bind (
-       IN  DAPL_RMR                    *rmr,
-       IN  DAPL_LMR                    *lmr,
-       IN  DAPL_EP                     *ep,
-       IN  DAPL_COOKIE                 *cookie,
-       IN  DAT_VADDR                   virtual_address,
-       IN  DAT_VLEN                    length,
-       IN  DAT_MEM_PRIV_FLAGS          mem_priv,
-       IN  DAT_BOOLEAN                 is_signaled)
+dapls_ib_mw_bind(IN DAPL_RMR *rmr,
+                IN DAPL_LMR *lmr,
+                IN DAPL_EP  *ep,
+                IN DAPL_COOKIE *cookie,
+                IN DAT_VADDR virtual_address,
+                IN DAT_VLEN length,
+                IN DAT_MEM_PRIV_FLAGS mem_priv,
+                IN DAT_BOOLEAN is_signaled)
 {
-       dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_bind: NOT IMPLEMENTED\n");
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                    " mw_bind: NOT IMPLEMENTED\n");
+
        return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
@@ -372,14 +378,15 @@ dapls_ib_mw_bind (
  *
  */
 DAT_RETURN
-dapls_ib_mw_unbind (
-       IN  DAPL_RMR    *rmr,
-       IN  DAPL_EP     *ep,
-       IN  DAPL_COOKIE *cookie,
-       IN  DAT_BOOLEAN is_signaled )
+dapls_ib_mw_unbind(IN DAPL_RMR *rmr,
+                  IN DAPL_EP  *ep,
+                  IN DAPL_COOKIE *cookie,
+                  IN DAT_BOOLEAN is_signaled )
 {
-       dapl_dbg_log(DAPL_DBG_TYPE_ERR," mw_unbind: NOT IMPLEMENTED\n");
-       return DAT_ERROR (DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                    " mw_unbind: NOT IMPLEMENTED\n");
+       
+       return DAT_ERROR(DAT_NOT_IMPLEMENTED, DAT_NO_SUBTYPE);  
 }
 
 /*
index 3a1e3c886861de1d89da1875a593fd98cc6278d7..1eba2bd334b7536566e05d399ff2a6830ee145a5 100644 (file)
@@ -110,15 +110,23 @@ dapls_ib_qp_alloc (
        /* Setup attributes and create qp */
        dapl_os_memzero((void*)&qp_create, sizeof(qp_create));
        qp_create.send_cq = req_cq;
-       qp_create.recv_cq = rcv_cq;
        qp_create.cap.max_send_wr = attr->max_request_dtos;
-       qp_create.cap.max_recv_wr = attr->max_recv_dtos;
        qp_create.cap.max_send_sge = attr->max_request_iov;
-       qp_create.cap.max_recv_sge = attr->max_recv_iov;
        qp_create.cap.max_inline_data = ia_ptr->hca_ptr->ib_trans.max_inline_send; 
        qp_create.qp_type = IBV_QPT_RC;
        qp_create.qp_context = (void*)ep_ptr;
 
+       /* ibv assumes rcv_cq is never NULL, set to req_cq */
+       if (rcv_cq == NULL) {
+               qp_create.recv_cq = req_cq;
+               qp_create.cap.max_recv_wr = 0;
+               qp_create.cap.max_recv_sge = 0;
+       } else {
+               qp_create.recv_cq = rcv_cq;
+               qp_create.cap.max_recv_wr = attr->max_recv_dtos;
+               qp_create.cap.max_recv_sge = attr->max_recv_iov;
+       }
+
        ep_ptr->qp_handle = ibv_create_qp( ib_pd_handle, &qp_create);
        if (!ep_ptr->qp_handle) 
                return(dapl_convert_errno(ENOMEM, "create_qp"));
@@ -298,9 +306,10 @@ dapls_modify_qp_state ( IN ib_qp_handle_t  qp_handle,
                        IN ib_qp_state_t        qp_state,
                        IN ib_qp_cm_t           *qp_cm )
 {
-       struct ibv_qp_attr      qp_attr;
+       struct ibv_qp_attr      qp_attr;
        enum ibv_qp_attr_mask   mask = IBV_QP_STATE;
-       DAPL_EP *ep_ptr = (DAPL_EP*)qp_handle->qp_context;
+       DAPL_EP                 *ep_ptr = (DAPL_EP*)qp_handle->qp_context;
+       DAPL_IA                 *ia_ptr = ep_ptr->header.owner_ia;
                        
        dapl_os_memzero((void*)&qp_attr, sizeof(qp_attr));
        qp_attr.qp_state = qp_state;
@@ -315,14 +324,16 @@ dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
                                IBV_QP_RQ_PSN             |
                                IBV_QP_MAX_DEST_RD_ATOMIC |
                                IBV_QP_MIN_RNR_TIMER;
+
                        qp_attr.qp_state                = IBV_QPS_RTR;
-                       qp_attr.path_mtu                = IBV_MTU_1024;
+                       qp_attr.path_mtu                = IBV_MTU_2048;
                        qp_attr.dest_qp_num             = qp_cm->qpn;
                        qp_attr.rq_psn                  = 1;
                        qp_attr.max_dest_rd_atomic      = 
                                ep_ptr->param.ep_attr.max_rdma_read_out;
-                       qp_attr.min_rnr_timer           = 12;
+                       qp_attr.min_rnr_timer           = ia_ptr->hca_ptr->ib_trans.rnr_timer;
                        qp_attr.ah_attr.is_global       = 0;
+                       qp_attr.ah_attr.grh.dgid        = qp_cm->gid;
                        qp_attr.ah_attr.dlid            = qp_cm->lid;
                        qp_attr.ah_attr.sl              = 0;
                        qp_attr.ah_attr.src_path_bits   = 0;
@@ -343,30 +354,25 @@ dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle,
                                IBV_QP_RNR_RETRY          |
                                IBV_QP_SQ_PSN             |
                                IBV_QP_MAX_QP_RD_ATOMIC;
+
                        qp_attr.qp_state        = IBV_QPS_RTS;
-                       qp_attr.timeout         = 14;
-                       qp_attr.retry_cnt       = 7;
-                       qp_attr.rnr_retry       = 7;
+                       qp_attr.timeout         = ia_ptr->hca_ptr->ib_trans.ack_timer;
+                       qp_attr.retry_cnt       = ia_ptr->hca_ptr->ib_trans.ack_retry;
+                       qp_attr.rnr_retry       = ia_ptr->hca_ptr->ib_trans.rnr_retry;
                        qp_attr.sq_psn          = 1;
                        qp_attr.max_rd_atomic   = 
                                ep_ptr->param.ep_attr.max_rdma_read_out;
 
-                       dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                             " modify_qp_rts: psn %x rd_atomic %d\n",
-                             qp_attr.sq_psn, qp_attr.max_rd_atomic );
+                       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                               " modify_qp_rts: psn %x rd_atomic %d ack %d "
+                               " retry %d rnr_retry %d\n",
+                               qp_attr.sq_psn, qp_attr.max_rd_atomic, 
+                               qp_attr.timeout, qp_attr.retry_cnt, 
+                               qp_attr.rnr_retry );
                        break;
                }
                case IBV_QPS_INIT: 
                {
-                       DAPL_IA *ia_ptr;
-                       DAPL_EP *ep_ptr; 
-                       /* need to find way back to port num */
-                       ep_ptr = (DAPL_EP*)qp_handle->qp_context;
-                       if (ep_ptr)
-                               ia_ptr = ep_ptr->header.owner_ia;
-                       else
-                               break;
-
                        mask |= IBV_QP_PKEY_INDEX       |
                                IBV_QP_PORT             |
                                IBV_QP_ACCESS_FLAGS;
@@ -377,7 +383,8 @@ dapls_modify_qp_state ( IN ib_qp_handle_t   qp_handle,
                                        IBV_ACCESS_LOCAL_WRITE |
                                        IBV_ACCESS_REMOTE_WRITE |
                                        IBV_ACCESS_REMOTE_READ |
-                                       IBV_ACCESS_REMOTE_ATOMIC;
+                                       IBV_ACCESS_REMOTE_ATOMIC |
+                                       IBV_ACCESS_MW_BIND;
                        
                        dapl_dbg_log (DAPL_DBG_TYPE_EP,
                                " modify_qp_init: pi %x port %x acc %x\n",
index 5e34b47e058b1caf805a30e5afc54f7046a96076..a9941f5ed5d6832b402b54bb5231531239e6f41f 100644 (file)
@@ -62,6 +62,7 @@ static const char rcsid[] = "$Id:  $";
 #include <fcntl.h>
 
 int g_dapl_loopback_connection = 0;
+int g_scm_pipe[2];
 
 /* just get IP address for hostname */
 DAT_RETURN getipaddr( char *addr, int addr_len)
@@ -70,14 +71,14 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
        struct hostent          *h_ptr;
        struct utsname          ourname;
 
-       if ( uname( &ourname ) < 0 
+       if (uname( &ourname ) < 0
                return DAT_INTERNAL_ERROR;
 
-       h_ptr = gethostbyname( ourname.nodename );
-       if ( h_ptr == NULL 
+       h_ptr = gethostbyname(ourname.nodename);
+       if (h_ptr == NULL
                return DAT_INTERNAL_ERROR;
 
-       if ( h_ptr->h_addrtype == AF_INET ) {
+       if (h_ptr->h_addrtype == AF_INET) {
                int i;
                struct in_addr  **alist =
                        (struct in_addr **)h_ptr->h_addr_list;
@@ -87,18 +88,17 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
                
                /* Walk the list of addresses for host */
                for (i=0; alist[i] != NULL; i++) {
-                      
-                       /* first non-loopback address */                        
-                      if ( *(uint32_t*)alist[i] != htonl(0x7f000001) ) {
-                               dapl_os_memcpy( &ipv4_addr->sin_addr,
-                                               h_ptr->h_addr_list[i],
-                                               4 );
+                      /* first non-loopback address */                 
+                      if (*(uint32_t*)alist[i] != htonl(0x7f000001)) {
+                               dapl_os_memcpy(&ipv4_addr->sin_addr,
+                                              h_ptr->h_addr_list[i],
+                                              4);
                                break;
                        }
                }
                /* if no acceptable address found */
                if (*(uint32_t*)&ipv4_addr->sin_addr == 0)
-                       return DAT_INVALID_ADDRESS;
+                       return DAT_INVALID_ADDRESS;
        } else 
                return DAT_INVALID_ADDRESS;
 
@@ -122,6 +122,10 @@ DAT_RETURN getipaddr( char *addr, int addr_len)
  */
 int32_t dapls_ib_init (void)
 {      
+       /* create pipe for waking up thread */
+       if (pipe(g_scm_pipe))
+               return 1;
+
        return 0;
 }
 
@@ -156,7 +160,7 @@ DAT_RETURN dapls_ib_open_hca (
        int             i;
        DAT_RETURN      dat_status = DAT_SUCCESS;
 
-       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
                      " open_hca: %s - %p\n", hca_name, hca_ptr );
 
        /* Get list of all IB devices, find match, open */
@@ -170,65 +174,83 @@ DAT_RETURN dapls_ib_open_hca (
 
        for (i = 0; dev_list[i]; ++i) {
                hca_ptr->ib_trans.ib_dev = dev_list[i];
-               if (!strcmp(ibv_get_device_name(hca_ptr->ib_trans.ib_dev),hca_name))
+               if (!strcmp(ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+                           hca_name))
                        goto found;
        }
 
-       dapl_dbg_log (DAPL_DBG_TYPE_ERR,
-                     " open_hca: IB device %s not found\n",
-                     hca_name);
+       dapl_log(DAPL_DBG_TYPE_ERR,
+                " open_hca: device %s not found\n",
+                hca_name);
        goto err;
 
 found:
-       dapl_dbg_log (DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s %016llx\n", 
-                       ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
-                       (unsigned long long)bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s %016llx\n", 
+                    ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+                    (unsigned long long)
+                    bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
 
        hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev);
        if (!hca_ptr->ib_hca_handle) {
-                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                               " open_hca: IB dev open failed for %s\n", 
-                               ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+                dapl_log(DAPL_DBG_TYPE_ERR, 
+                         " open_hca: dev open failed for %s, err=%s\n", 
+                         ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+                         strerror(errno));
                 goto err;
        }
 
-       /* set inline max with enviroment or default */
+       /* set RC tunables via enviroment or default */
        hca_ptr->ib_trans.max_inline_send = 
-               dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
+               dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_DEFAULT);
+       hca_ptr->ib_trans.ack_retry = 
+               dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
+       hca_ptr->ib_trans.ack_timer =
+               dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
+       hca_ptr->ib_trans.rnr_retry = 
+               dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
+       hca_ptr->ib_trans.rnr_timer = 
+               dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
 
        /* initialize cq_lock */
        dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
-       if (dat_status != DAT_SUCCESS)
-       {
-               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                       " open_hca: failed to init cq_lock\n");
+       if (dat_status != DAT_SUCCESS) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " open_hca: failed to init cq_lock\n");
                goto bail;
        }
 
        /* EVD events without direct CQ channels, non-blocking */
        hca_ptr->ib_trans.ib_cq = 
                ibv_create_comp_channel(hca_ptr->ib_hca_handle);
+       if (hca_ptr->ib_trans.ib_cq == NULL) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " open_hca: ibv_create_comp_channel ERR %s\n",
+                        strerror(errno));
+               goto bail;
+       }
+
        opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
        if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd, 
                              F_SETFL, opts | O_NONBLOCK) < 0) {
-               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                             " open_hca: ERR with CQ FD\n" );
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " open_hca: fcntl on ib_cq->fd %d ERR %d %s\n", 
+                        hca_ptr->ib_trans.ib_cq->fd, opts,
+                        strerror(errno));
                goto bail;
        }
 
        if (dapli_cq_thread_init(hca_ptr)) {
-                dapl_dbg_log (DAPL_DBG_TYPE_ERR,
-                              " open_hca: cq_thread_init failed for %s\n",
-                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+                dapl_log(DAPL_DBG_TYPE_ERR,
+                         " open_hca: cq_thread_init failed for %s\n",
+                         ibv_get_device_name(hca_ptr->ib_trans.ib_dev));
                 goto bail;
         }
 
        /* initialize cr_list lock */
        dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
-       if (dat_status != DAT_SUCCESS)
-       {
-               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                               " open_hca: failed to init lock\n");
+       if (dat_status != DAT_SUCCESS) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " open_hca: failed to init cr_list lock\n");
                goto bail;
        }
 
@@ -240,10 +262,9 @@ found:
        dat_status = dapl_os_thread_create(cr_thread, 
                                           (void*)hca_ptr, 
                                           &hca_ptr->ib_trans.thread );
-       if (dat_status != DAT_SUCCESS)
-       {
-               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                               " open_hca: failed to create thread\n");
+       if (dat_status != DAT_SUCCESS) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " open_hca: failed to create thread\n");
                goto bail;
        }
        
@@ -251,7 +272,7 @@ found:
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
                struct timespec sleep, remain;
                sleep.tv_sec = 0;
-               sleep.tv_nsec = 20000000; /* 20 ms */
+               sleep.tv_nsec = 2000000; /* 2 ms */
                dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
                             " open_hca: waiting for cr_thread\n");
                nanosleep (&sleep, &remain);
@@ -259,16 +280,15 @@ found:
 
        /* get the IP address of the device */
        dat_status = getipaddr((char*)&hca_ptr->hca_address, 
-                               sizeof(DAT_SOCK_ADDR6) );
-       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
-               " open_hca: %s, port %d, %s  %d.%d.%d.%d\n", 
-               ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num,
-               ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ?  "AF_INET":"AF_INET6",
-               ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 0 & 0xff,
-               ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 8 & 0xff,
-               ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 16 & 0xff,
-               ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff );
-
+                               sizeof(DAT_SOCK_ADDR6));
+       
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                    " open_hca: devname %s, port %d, hostname_IP %s\n",  
+                    ibv_get_device_name(hca_ptr->ib_trans.ib_dev), 
+                    hca_ptr->port_num,
+                    inet_ntoa(((struct sockaddr_in *)
+                               &hca_ptr->hca_address)->sin_addr));
+               
        ibv_free_device_list(dev_list);
        return dat_status;
 
@@ -308,15 +328,15 @@ DAT_RETURN dapls_ib_close_hca (   IN   DAPL_HCA   *hca_ptr )
                        return(dapl_convert_errno(errno,"ib_close_device"));
                hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
        }
-
        dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
 
        /* destroy cr_thread and lock */
        hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
+       write(g_scm_pipe[1], "w", sizeof "w");
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
                struct timespec sleep, remain;
                sleep.tv_sec = 0;
-               sleep.tv_nsec = 20000000; /* 20 ms */
+               sleep.tv_nsec = 2000000; /* 2 ms */
                dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
                             " close_hca: waiting for cr_thread\n");
                nanosleep (&sleep, &remain);
@@ -378,14 +398,11 @@ DAT_RETURN dapls_ib_query_hca (
                ia_attr->vendor_name[DAT_NAME_MAX_LENGTH - 1] = '\0';
                ia_attr->ia_address_ptr = (DAT_IA_ADDRESS_PTR)&hca_ptr->hca_address;
 
-               dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
-                       " query_hca: %s %s  %d.%d.%d.%d\n", 
-                       ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
-                       ((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_family == AF_INET ?  "AF_INET":"AF_INET6",
-                       ((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 0 & 0xff,
-                       ((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 8 & 0xff,
-                       ((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 16 & 0xff,
-                       ((struct sockaddr_in *)ia_attr->ia_address_ptr)->sin_addr.s_addr >> 24 & 0xff );
+               dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
+                            " query_hca: %s %s \n", 
+                            ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
+                            inet_ntoa(((struct sockaddr_in *)
+                                       &hca_ptr->hca_address)->sin_addr));
                
                ia_attr->hardware_version_major   = dev_attr.hw_ver;
                /* ia_attr->hardware_version_minor   = dev_attr.fw_ver; */
@@ -408,11 +425,14 @@ DAT_RETURN dapls_ib_query_hca (
                ia_attr->max_pzs                  = dev_attr.max_pd;
                ia_attr->max_mtu_size             = port_attr.max_msg_sz;
                ia_attr->max_rdma_size            = port_attr.max_msg_sz;
+               ia_attr->max_iov_segments_per_rdma_read = dev_attr.max_sge;
+               ia_attr->max_iov_segments_per_rdma_write = dev_attr.max_sge;
                ia_attr->num_transport_attr       = 0;
                ia_attr->transport_attr           = NULL;
                ia_attr->num_vendor_attr          = 0;
                ia_attr->vendor_attr              = NULL;
-
+               hca_ptr->ib_trans.ack_timer       = DAPL_MAX(dev_attr.local_ca_ack_delay,
+                                                            hca_ptr->ib_trans.ack_timer);
                dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
                        " query_hca: (%x.%x) ep %d ep_q %d evd %d evd_q %d\n", 
                        ia_attr->hardware_version_major,
@@ -420,11 +440,10 @@ DAT_RETURN dapls_ib_query_hca (
                        ia_attr->max_eps, ia_attr->max_dto_per_ep,
                        ia_attr->max_evds, ia_attr->max_evd_qlen );
                dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
-                       " query_hca: msg %llu rdma %llu iov %d lmr %d rmr %d\n", 
+                       " query_hca: msg %llu rdma %llu iov %d lmr %d rmr %d ack_time %d\n", 
                        ia_attr->max_mtu_size, ia_attr->max_rdma_size,
                        ia_attr->max_iov_segments_per_dto, ia_attr->max_lmrs, 
-                       ia_attr->max_rmrs );
-
+                       ia_attr->max_rmrs,hca_ptr->ib_trans.ack_timer );
        }
        
        if (ep_attr != NULL) {
@@ -443,7 +462,6 @@ DAT_RETURN dapls_ib_query_hca (
                        ep_attr->max_recv_dtos, ep_attr->max_recv_iov,
                        ep_attr->max_rdma_read_in, ep_attr->max_rdma_read_out);
        }
-
        return DAT_SUCCESS;
 }
 
index 0d928df93387ca429e037d47cdbf78f9a67f71a5..81a17529f59f7fd9508600211c80323255a4d730 100644 (file)
@@ -71,15 +71,18 @@ typedef ib_hca_handle_t             dapl_ibal_ca_t;
 
 /* CM mappings, user CM not complete use SOCKETS */
 
-/* destination info to exchange until real IB CM shows up */
+/* destination info to exchange, define wire protocol version */
+#define DSCM_VER 2
 typedef struct _ib_qp_cm
 { 
-       uint32_t                qpn;
+       uint16_t                ver;
+       uint16_t                rej;
        uint16_t                lid;
        uint16_t                port;
-       int                     p_size;
+       uint32_t                qpn;
+       uint32_t                p_size;
        DAT_SOCK_ADDR6          ia_address;
-
+        union ibv_gid          gid;
 } ib_qp_cm_t;
 
 /* 
@@ -94,20 +97,34 @@ struct ib_llist_entry
     struct dapl_llist_entry    *list_head;
 };
 
+typedef enum scm_state 
+{
+       SCM_INIT,
+       SCM_LISTEN,
+       SCM_CONN_PENDING,
+       SCM_ACCEPTING,
+       SCM_ACCEPTED,
+       SCM_REJECTED,
+       SCM_CONNECTED,
+       SCM_DISCONNECTED,
+       SCM_DESTROY
+} SCM_STATE;
+
 struct ib_cm_handle
 { 
        struct ib_llist_entry   entry;
+       DAPL_OS_LOCK            lock;
+       SCM_STATE               state;
        int                     socket;
-       int                     l_socket; 
-       struct dapl_hca         *hca_ptr;
-       DAT_HANDLE              cr;
+       struct dapl_hca         *hca;
        DAT_HANDLE              sp;     
+       struct dapl_ep          *ep;    
        ib_qp_cm_t              dst;
        unsigned char           p_data[256];
 };
 
 typedef struct ib_cm_handle    *ib_cm_handle_t;
-typedef ib_cm_handle_t         ib_cm_srvc_handle_t;
+typedef ib_cm_handle_t ib_cm_srvc_handle_t;
 
 DAT_RETURN getipaddr(char *addr, int addr_len);
 
@@ -163,6 +180,12 @@ typedef struct ibv_comp_channel *ib_wait_obj_handle_t;
 /* inline send rdma threshold */
 #define        INLINE_SEND_DEFAULT     128
 
+/* RC timer - retry count defaults */
+#define SCM_ACK_TIMER 15       /* 5 bits, 4.096us*2^ack_timer. 15 == 134ms */
+#define SCM_ACK_RETRY 7                /* 3 bits, 7 * 134ms = 940ms */
+#define SCM_RNR_TIMER 28       /* 5 bits, 28 == 163ms, 31 == 491ms */
+#define SCM_RNR_RETRY 7                /* 3 bits, 7 == infinite */
+
 /* CM private data areas */
 #define        IB_MAX_REQ_PDATA_SIZE   92
 #define        IB_MAX_REP_PDATA_SIZE   196
@@ -268,7 +291,10 @@ typedef struct _ib_hca_transport
        ib_async_cq_handler_t   async_cq_error;
        ib_async_dto_handler_t  async_cq;
        ib_async_qp_handler_t   async_qp_error;
-
+       uint8_t                 ack_timer;
+       uint8_t                 ack_retry;
+       uint8_t                 rnr_timer;
+       uint8_t                 rnr_retry;
 } ib_hca_transport_t;
 
 /* provider specfic fields for shared memory support */