]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
Update the dapl.git tree with the latest SVN version of the
authorStan Smith <stan.smith@intel.com>
Fri, 30 Jan 2009 17:52:33 +0000 (09:52 -0800)
committerArlin Davis <arlin.r.davis@intel.com>
Fri, 30 Jan 2009 17:52:33 +0000 (09:52 -0800)
ibal-scm provider.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
dapl/ibal-scm/dapl_ibal-scm_cm.c
dapl/ibal-scm/dapl_ibal-scm_util.c

index df83008d78014100a398ee678a8e81d8e2bd7d1b..6a050b81e45be4055ed440700995ad6368455f17 100644 (file)
 #include <ws2tcpip.h>
 #include <io.h>
 
+extern int g_scm_pipe[2];
+
+extern DAT_RETURN
+dapls_ib_query_gid( IN  DAPL_HCA       *hca_ptr,
+                   IN  GID             *gid );
+
+
+static struct ib_cm_handle * dapli_cm_create(void)
+{ 
+       struct ib_cm_handle *cm_ptr;
+
+       /* Allocate CM, init lock, and initialize */
+       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+               return NULL;
+
+        if (dapl_os_lock_init(&cm_ptr->lock)) 
+               goto bail;
+
+       (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
+       cm_ptr->dst.ver = htons(DSCM_VER);
+       cm_ptr->socket = -1;
+       cm_ptr->l_socket = -1;
+       return cm_ptr;
+bail:
+       dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+       return NULL;
+}
+
+
+/* mark for destroy, remove all references, schedule cleanup */
+
+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
+{
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep);
+       
+       /* cleanup, never made it to work queue */
+       if (cm_ptr->state == SCM_INIT) {
+               if (cm_ptr->socket >= 0)  
+                       closesocket(cm_ptr->socket);
+               if (cm_ptr->l_socket >= 0)  
+                       closesocket(cm_ptr->l_socket);
+               dapl_os_free(cm_ptr, sizeof(*cm_ptr));
+               return;
+       }
+
+       dapl_os_lock(&cm_ptr->lock);
+       cm_ptr->state = SCM_DESTROY;
+       if (cm_ptr->ep) 
+               cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+
+       /* close socket if still active */
+       if (cm_ptr->socket >= 0) {
+               closesocket(cm_ptr->socket);
+               cm_ptr->socket = -1;
+       }
+       if (cm_ptr->l_socket >= 0) {
+               closesocket(cm_ptr->l_socket);
+               cm_ptr->l_socket = -1;
+       }
+       dapl_os_unlock(&cm_ptr->lock);
+
+       /* wakeup work thread */
+       _write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
+/* queue socket for processing CM work */
+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
+{
+       /* add to work queue for cr thread processing */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
+       dapl_os_lock( &cm_ptr->hca->ib_trans.lock );
+       dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list, 
+                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
+       dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
+
+        /* wakeup CM work thread */
+        _write(g_scm_pipe[1], "w", sizeof "w");
+}
+
+
 
 static uint16_t
 dapli_get_lid(IN DAPL_HCA *hca, IN int port)
@@ -122,6 +204,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port)
 }
 
 
+/*
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
+ */
+static DAT_RETURN 
+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
+{
+       DAPL_EP *ep_ptr = cm_ptr->ep;
+       DAT_UINT32 disc_data = htonl(0xdead);
+
+       if (ep_ptr == NULL)
+               return DAT_SUCCESS;
+       
+       dapl_os_lock(&cm_ptr->lock);
+       if ((cm_ptr->state == SCM_INIT) ||
+           (cm_ptr->state == SCM_DISCONNECTED)) {
+               dapl_os_unlock(&cm_ptr->lock);
+               return DAT_SUCCESS;
+       } else {
+               /* send disc date, close socket, schedule destroy */
+               if (cm_ptr->socket >= 0) { 
+                       send(cm_ptr->socket, (const char *)&disc_data,
+                               sizeof(disc_data), 0);
+                       closesocket(cm_ptr->socket);
+                       cm_ptr->socket = -1;
+               }
+               cm_ptr->state = SCM_DISCONNECTED;
+               _write(g_scm_pipe[1], "w", sizeof "w");
+       }
+       dapl_os_unlock(&cm_ptr->lock);
+
+
+       if (ep_ptr->cr_ptr) {
+               dapls_cr_callback(cm_ptr,
+                                 IB_CME_DISCONNECTED,
+                                 NULL,
+                                 ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr);
+       } else {
+               dapl_evd_connection_callback(ep_ptr->cm_handle,
+                                            IB_CME_DISCONNECTED,
+                                            NULL,
+                                            ep_ptr);
+       }       
+
+       /* remove reference from endpoint */
+       ep_ptr->cm_handle = NULL;
+       
+       /* schedule destroy */
+
+
+       return DAT_SUCCESS;
+}
+
+
+
+/*
+ * PASSIVE: consumer accept, send local QP information, private data, 
+ * queue on work thread to receive RTU information to avoid blocking
+ * user thread. 
+ */
+static DAT_RETURN 
+dapli_socket_accept_usr( DAPL_EP       *ep_ptr,
+                        DAPL_CR        *cr_ptr,
+                        DAT_COUNT      p_size,
+                        DAT_PVOID      p_data )
+{
+       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
+       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
+       WSABUF          iovec[2];
+       int             len, rc;
+       short           rtu_data = 0;
+       ib_api_status_t ibs;
+       ib_qp_attr_t    qpa;
+       dapl_ibal_port_t *p_port;
+       dapl_ibal_ca_t  *p_ca;
+
+       dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port 0x%x\n",
+                       __FUNCTION__,p_size,cm_ptr->socket,
+                       ia_ptr->hca_ptr->port_num);
+
+       if (p_size >  IB_MAX_REP_PDATA_SIZE) 
+               return DAT_LENGTH_ERROR;
+
+       /* must have a accepted socket */
+       if ( cm_ptr->socket < 0 ) {
+               dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                    "%s() Not accepted socket? remote port=0x%x lid=0x%x"
+                    " qpn=0x%x psize=%d\n",
+                    cm_ptr->dst.port, cm_ptr->dst.lid,
+                    ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); 
+               return DAT_INTERNAL_ERROR;
+       }
+       
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                    " accept_usr: remote port=0x%x lid=0x%x"
+                    " qpn=0x%x psize=%d\n",
+                    cm_ptr->dst.port, cm_ptr->dst.lid,
+                    ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); 
+
+       /* modify QP to RTR and then to RTS with remote info already read */
+
+       p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
+       p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
+       if (!p_port)
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                       "%s() dapli_ibal_get_port() failed @ line #%d\n",
+                       __FUNCTION__,__LINE__);
+               goto bail;
+       }
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                       "%s() DST: qpn 0x%x port 0x%x lid %x psize %d\n",
+                       __FUNCTION__,
+                       cl_ntoh32(cm_ptr->dst.qpn),
+                       cm_ptr->dst.port,
+                       cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size);
+
+       /* modify QP to RTR and then to RTS with remote info */
+
+       ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, 
+                                           cm_ptr->dst.qpn,
+                                           cm_ptr->dst.lid,
+                                           p_port );
+       if (ibs != IB_SUCCESS)
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                               "%s() QP --> RTR failed @ line #%d\n",
+                               __FUNCTION__,__LINE__);
+               goto bail;
+       }
+
+       if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                               "%s() QP --> RTS failed @ line #%d\n",
+                               __FUNCTION__,__LINE__);
+               goto bail;
+       }
+
+       ep_ptr->qp_state = IB_QP_STATE_RTS;
+       
+       /* save remote address information */
+       dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+                       &cm_ptr->dst.ia_address, 
+                       sizeof(ep_ptr->remote_ia_address));
+
+       /* determine QP & port numbers */
+       ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
+       if (ibs != IB_SUCCESS)
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            " ib_query_qp() ERR %s\n", ib_get_err_str(ibs)); 
+               goto bail;
+       }
+
+       /* Send our QP info, IA address, and private data */
+       cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
+       cm_ptr->dst.port = ia_ptr->hca_ptr->port_num;
+       cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num);
+       /* set gid in network order */
+       ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+       if ( ibs != IB_SUCCESS )
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                       "%s() dapls_ib_query_gid() returns '%s'\n",
+                       __FUNCTION__,ib_get_err_str(ibs)); 
+               goto bail;
+       }
+
+       cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+       cm_ptr->dst.p_size = p_size;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+               "%s()\n  Tx QP info: qpn %x port 0x%x lid 0x%x p_sz %d IP %s\n",
+               __FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn), cm_ptr->dst.port,
+               cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size,
+               dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
+
+       /* network byte-ordering - QPN & LID already are */
+       cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size);
+       cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port);
+
+       iovec[0].buf = (char*)&cm_ptr->dst;
+       iovec[0].len  = sizeof(ib_qp_cm_t);
+       if (p_size) {
+               iovec[1].buf = p_data;
+               iovec[1].len  = p_size;
+       }
+       rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
+       if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            " accept_usr: ERR %d, wcnt=%d\n",
+                            WSAGetLastError(), len); 
+               goto bail;
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " accept_usr: local port=0x%x lid=0x%x"
+                    " qpn=0x%x psize=%d\n",
+                    ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), 
+                    ntohl(cm_ptr->dst.qpn), ntohl(cm_ptr->dst.p_size)); 
+       
+       /* save state and reference to EP, queue for RTU data */
+       cm_ptr->ep = ep_ptr;
+       cm_ptr->hca = ia_ptr->hca_ptr;
+       cm_ptr->state = SCM_ACCEPTED;
+
+       /* restore remote address information for query */
+       dapl_os_memcpy( &cm_ptr->dst.ia_address, 
+                       &ep_ptr->remote_ia_address,
+                       sizeof(cm_ptr->dst.ia_address));
+
+       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); 
+       dapli_cm_queue(cm_ptr);
+
+       return DAT_SUCCESS;
+
+bail:
+       dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR !QP_RTR_RTS \n"); 
+       dapli_cm_destroy(cm_ptr);
+       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
+
+       return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * PASSIVE: read RTU from active peer, post CONN event
+ */
+void 
+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+       int             len;
+       short           rtu_data = 0;
+
+       /* complete handshake after final QP state change */
+       len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
+       if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            " accept_rtu: ERR %d, rcnt=%d rdata=%x\n",
+                            WSAGetLastError(), len, ntohs(rtu_data) ); 
+               goto bail;
+       }
+
+       /* save state and reference to EP, queue for disc event */
+       cm_ptr->state = SCM_CONNECTED;
+
+       /* final data exchange if remote QP state is good to go */
+       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
+       dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
+       return;
+bail:
+       dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */
+       dapli_cm_destroy(cm_ptr);
+       dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, NULL, cm_ptr->sp);
+}
+
+
 /*
  * ACTIVE: Create socket, connect, and exchange QP information 
  */
@@ -143,21 +482,16 @@ dapli_socket_connect (    DAPL_EP                 *ep_ptr,
        dapl_ibal_port_t *p_port;
        dapl_ibal_ca_t  *p_ca;
        
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d\n", r_qual);
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d psize %d\n",
+                    r_qual, p_size);
                        
-       /*
-        *  Allocate CM and initialize
-        */
-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) {
+       cm_ptr = dapli_cm_create();
+       if (cm_ptr == NULL)
                return DAT_INSUFFICIENT_RESOURCES;
-       }
-
-       (void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) );
-       cm_ptr->socket = -1;
 
        /* create, connect, sockopt, and exchange QP information */
        if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+               dapli_cm_destroy(cm_ptr);
                return DAT_INSUFFICIENT_RESOURCES;
        }
 
@@ -166,7 +500,7 @@ dapli_socket_connect (      DAPL_EP                 *ep_ptr,
        if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) == SOCKET_ERROR) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d on r_qual %d\n",
                             WSAGetLastError(), (unsigned int)r_qual);
-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+               dapli_cm_destroy(cm_ptr);
                return DAT_INVALID_ADDRESS;
        }
 
@@ -175,6 +509,8 @@ dapli_socket_connect (      DAPL_EP                 *ep_ptr,
                          (const char*)&opt,
                          sizeof(opt) );
        
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n");
+
        /* determine QP & port numbers */
        ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
        if (ibs != IB_SUCCESS)
@@ -187,7 +523,6 @@ dapli_socket_connect (      DAPL_EP                 *ep_ptr,
        /* Send QP info, IA address and private data */
        cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */
        cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num);
-
        cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr, 
                                         ia_ptr->hca_ptr->port_num );
        if (cm_ptr->dst.lid == 0)
@@ -197,6 +532,17 @@ dapli_socket_connect (     DAPL_EP                 *ep_ptr,
                                __FUNCTION__, __LINE__); 
                goto bail;
        }
+
+       /* set gid in network order */
+       ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid );
+       if ( ibs != IB_SUCCESS )
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                       "%s() dapls_ib_query_gid() returns '%s'\n",
+                       __FUNCTION__,ib_get_err_str(ibs)); 
+               goto bail;
+       }
+
        cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
        cm_ptr->dst.p_size = cl_hton32(p_size);
 
@@ -213,6 +559,8 @@ dapli_socket_connect (      DAPL_EP                 *ep_ptr,
                iovec[1].buf = p_data;
                iovec[1].len  = p_size;
        }
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, write QP and private data\n"); 
        rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, NULL);
        if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
@@ -225,17 +573,65 @@ dapli_socket_connect (    DAPL_EP                 *ep_ptr,
                     cm_ptr->dst.port, cm_ptr->dst.lid, 
                     cm_ptr->dst.qpn, cm_ptr->dst.p_size ); 
 
+       /* queue up to work thread to avoid blocking consumer */
+       cm_ptr->state = SCM_CONN_PENDING;
+       cm_ptr->hca = ia_ptr->hca_ptr;
+       cm_ptr->ep = ep_ptr;
+       dapli_cm_queue(cm_ptr);
+       return DAT_SUCCESS;
+bail:
+       /* close socket, free cm structure */
+       dapli_cm_destroy(cm_ptr);
+       return DAT_INTERNAL_ERROR;
+}
+
+
+/*
+ * ACTIVE: exchange QP information, called from CR thread
+ */
+void 
+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
+{
+       DAPL_EP         *ep_ptr = cm_ptr->ep;
+       DAPL_IA         *ia_ptr = cm_ptr->ep->header.owner_ia;
+       int             len, rc;
+       DWORD           ioflags;
+       WSABUF          iovec[1];
+       short           rtu_data = htons(0x0E0F);
+       ib_cm_events_t  event = IB_CME_DESTINATION_REJECT;
+       ib_api_status_t ibs;
+       dapl_ibal_port_t *p_port;
+       dapl_ibal_ca_t  *p_ca;
+
        /* read DST information into cm_ptr, overwrite SRC info */
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n"); 
+
+       iovec[0].buf = (char*)&cm_ptr->dst;
+       iovec[0].len  = sizeof(ib_qp_cm_t);
        ioflags = len = 0;
        rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0);
-       if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read: ERR %d rcnt=%d\n",
-                            WSAGetLastError(), len); 
+       if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ||
+                                       ntohs(cm_ptr->dst.ver) != DSCM_VER )
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                            "connect_rtu read: ERR %d rcnt=%d ver=%d\n",
+                            WSAGetLastError(), len, cm_ptr->dst.ver); 
+               goto bail;
+       }
+
+       /* check for consumer reject */
+       if (cm_ptr->dst.rej) {
+               dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                            " connect_rtu read: PEER REJ reason=0x%x\n",
+                            ntohs(cm_ptr->dst.rej)); 
+               event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
                goto bail;
        }
 
-       /* revert back to host byte ordering */
+       /* convert peer response values to host order */
        cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port);
+       cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid);
+       cm_ptr->dst.qpn = cm_ptr->dst.qpn;
        cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size);
 
        dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn %x port %d "
@@ -245,15 +641,27 @@ dapli_socket_connect (    DAPL_EP                 *ep_ptr,
                        cm_ptr->dst.p_size,
                        dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL));
 
+       /* save remote address information */
+       dapl_os_memcpy( &ep_ptr->remote_ia_address, 
+                       &cm_ptr->dst.ia_address, 
+                       sizeof(ep_ptr->remote_ia_address));
+
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
+                    " connect_rtu: DST %s port=0x%x lid=0x%x, qpn=0x%x, psize=%d\n",
+                    inet_ntoa(((struct sockaddr_in *)&cm_ptr->dst.ia_address)->sin_addr),
+                    cm_ptr->dst.port, cm_ptr->dst.lid, 
+                    cm_ptr->dst.qpn, cm_ptr->dst.p_size); 
+
        /* validate private data size before reading */
-       if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) {
+       if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " connect read: psize (%d) wrong\n",
+                            " connect_rtu read: psize (%d) wrong\n",
                             cm_ptr->dst.p_size ); 
                goto bail;
        }
 
        /* read private data into cm_handle if any present */
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n"); 
        if ( cm_ptr->dst.p_size ) {
                iovec[0].buf = cm_ptr->p_data;
                iovec[0].len  = cm_ptr->dst.p_size;
@@ -300,32 +708,29 @@ dapli_socket_connect (    DAPL_EP                 *ep_ptr,
                 
        ep_ptr->qp_state = IB_QP_STATE_RTS;
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n"); 
+
        /* complete handshake after final QP state change */
        send(cm_ptr->socket, (const char *)&rtu_data, sizeof(rtu_data), 0);
 
        /* init cm_handle and post the event with private data */
        ep_ptr->cm_handle = cm_ptr;
+       cm_ptr->state = SCM_CONNECTED;
        dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" ); 
 
        dapl_evd_connection_callback(   ep_ptr->cm_handle, 
                                        IB_CME_CONNECTED, 
                                        cm_ptr->p_data, 
                                        ep_ptr );       
-       return DAT_SUCCESS;
-
+       return;
 bail:
        /* close socket, free cm structure and post error event */
-       if ( cm_ptr->socket >= 0 ) 
-               closesocket(cm_ptr->socket);
-
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
-       dapl_evd_connection_callback(   ep_ptr->cm_handle, 
-                                       IB_CME_LOCAL_FAILURE, 
+       dapli_cm_destroy(cm_ptr);
+       dapls_ib_reinit_ep(ep_ptr); /* reset QP state */
+       dapl_evd_connection_callback(   NULL /*ep_ptr->cm_handle*/, 
+                                       event, 
                                        NULL, 
                                        ep_ptr );
-       return DAT_INTERNAL_ERROR;
 }
 
 
@@ -347,14 +752,12 @@ dapli_socket_listen (     DAPL_IA         *ia_ptr,
                        ia_ptr, serviceID, sp_ptr);
 
        /* Allocate CM and initialize */
-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) 
+       cm_ptr = dapli_cm_create();
+       if (cm_ptr == NULL)
                return DAT_INSUFFICIENT_RESOURCES;
 
-       (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) );
-       
-       cm_ptr->socket = cm_ptr->l_socket = -1;
        cm_ptr->sp = sp_ptr;
-       cm_ptr->hca_ptr = ia_ptr->hca_ptr;
+       cm_ptr->hca = ia_ptr->hca_ptr;
        
        /* bind, listen, set sockopt, accept, exchange data */
        if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -406,12 +809,9 @@ dapli_socket_listen (      DAPL_IA         *ia_ptr,
        /* set cm_handle for this service point, save listen socket */
        sp_ptr->cm_srvc_handle = cm_ptr;
 
-       /* add to SP->CR thread list */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry);
-       dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock );
-       dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list, 
-                           (DAPL_LLIST_ENTRY*)&cm_ptr->entry, (void*)cm_ptr);
-       dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock);
+       /* queue up listen socket to process inbound CR's */
+       cm_ptr->state = SCM_LISTEN;
+       dapli_cm_queue(cm_ptr);
 
        dapl_dbg_log( DAPL_DBG_TYPE_CM,
                        " listen: qual 0x%x cr %p s_fd %d\n",
@@ -421,10 +821,7 @@ dapli_socket_listen (      DAPL_IA         *ia_ptr,
 bail:
        dapl_dbg_log( DAPL_DBG_TYPE_CM,
                        " listen: ERROR on conn_qual 0x%x\n",serviceID); 
-       if ( cm_ptr->l_socket >= 0 )
-               closesocket( cm_ptr->l_socket );
-
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
+       dapli_cm_destroy(cm_ptr);
        return dat_status;
 }
 
@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        int             len;
        DAT_RETURN      dat_status = DAT_SUCCESS;
                
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n");
+
        /* Allocate accept CM and initialize */
        if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) 
                return DAT_INSUFFICIENT_RESOURCES;
@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) );
        
        acm_ptr->socket = -1;
+       acm_ptr->l_socket = -1;
        acm_ptr->sp = cm_ptr->sp;
-       acm_ptr->hca_ptr = cm_ptr->hca_ptr;
+       acm_ptr->hca = cm_ptr->hca;
 
        len = sizeof(acm_ptr->dst.ia_address);
        acm_ptr->socket = accept(cm_ptr->l_socket, 
@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
                goto bail;
        }
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
+
        /* read in DST QP info, IA address. check for private data */
        len = recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0);
-       if ( len != sizeof(ib_qp_cm_t) ) {
+       if ( len != sizeof(ib_qp_cm_t) || ntohs(acm_ptr->dst.ver) != DSCM_VER )
+       {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                       " accept read: ERR %d, rcnt=%d\n",
-                       WSAGetLastError(), len); 
+                       " accept read: ERR %d, rcnt=%d ver=%d\n",
+                       WSAGetLastError(), len, acm_ptr->dst.ver); 
                dat_status = DAT_INTERNAL_ERROR;
                goto bail;
 
        }
-       /* revert back to host byte ordering */
+       /* convert accepted values to host byte ordering */
        acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port);
+       acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid);
+       acm_ptr->dst.qpn = acm_ptr->dst.qpn;
        acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size);
 
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST sizeof(ib_cm_t) %d qpn %x "
-               "port %d lid 0x%x psize %d IP %s\n",
-               sizeof(ib_qp_cm_t),
-               cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port,
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x "
+               "lid 0x%x qpn 0x%x psize %d\n",
+               dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL),
+               acm_ptr->dst.port,
                cl_ntoh16(acm_ptr->dst.lid),
-               acm_ptr->dst.p_size,
-               dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL));
+               cl_ntoh32(acm_ptr->dst.qpn),
+               acm_ptr->dst.p_size);
 
        /* validate private data size before reading */
        if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) {
@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
                goto bail;
        }
 
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read private data\n");
+
        /* read private data into cm_handle if any present */
        if ( acm_ptr->dst.p_size ) {
                len = recv( acm_ptr->socket, 
@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
                p_data = acm_ptr->p_data;
        }
        
+       acm_ptr->state = SCM_ACCEPTING;
+
        /* trigger CR event and return SUCCESS */
        dapls_cr_callback(  acm_ptr,
                            IB_CME_CONNECTION_REQUEST_PENDING,
@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
                            acm_ptr->sp );
 
        return DAT_SUCCESS;
-
-bail:
-       if ( acm_ptr->socket >= 0 )
-               closesocket( acm_ptr->socket );
-
-       dapl_os_free( acm_ptr, sizeof( *acm_ptr ) );
-       return DAT_INTERNAL_ERROR;
-}
-
-
-static DAT_RETURN 
-dapli_socket_accept_final( DAPL_EP             *ep_ptr,
-                          DAPL_CR              *cr_ptr,
-                          DAT_COUNT            p_size,
-                          DAT_PVOID            p_data )
-{
-       DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
-       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
-       ib_qp_cm_t      qp_cm;
-       WSABUF          iovec[2];
-       int             len, rc;
-       short           rtu_data = 0;
-       ib_api_status_t ibs;
-       ib_qp_attr_t    qpa;
-       dapl_ibal_port_t *p_port;
-       dapl_ibal_ca_t  *p_ca;
-
-       dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d port %d\n",
-                       __FUNCTION__,p_size,cm_ptr->socket,
-                       ia_ptr->hca_ptr->port_num);
-
-       if (p_size >  IB_MAX_REP_PDATA_SIZE) 
-               return DAT_LENGTH_ERROR;
-
-       /* must have a accepted socket */
-       if ( cm_ptr->socket < 0 )
-               return DAT_INTERNAL_ERROR;
-       
-       /* modify QP to RTR and then to RTS with remote info already read */
-
-       p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle;
-       p_port = dapli_ibal_get_port (p_ca, (uint8_t)ia_ptr->hca_ptr->port_num);
-       if (!p_port)
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                       "%s() dapli_ibal_get_port() failed @ line #%d\n",
-                       __FUNCTION__,__LINE__);
-               goto bail;
-       }
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port %d lid %x\n",
-                       __FUNCTION__,
-                       cl_ntoh32(cm_ptr->dst.qpn),
-                       cm_ptr->dst.port,
-                       cl_ntoh16(cm_ptr->dst.lid));
-
-       /* modify QP to RTR and then to RTS with remote info */
-
-       ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, 
-                                           cm_ptr->dst.qpn,
-                                           cm_ptr->dst.lid,
-                                           p_port );
-       if (ibs != IB_SUCCESS)
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                               "%s() QP --> RTR failed @ line #%d\n",
-                               __FUNCTION__,__LINE__);
-               goto bail;
-       }
-
-       if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) )
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                               "%s() QP --> RTS failed @ line #%d\n",
-                               __FUNCTION__,__LINE__);
-               goto bail;
-       }
-
-       ep_ptr->qp_state = IB_QP_STATE_RTS;
-       
-       /* determine QP & port numbers */
-       ibs = ib_query_qp(ep_ptr->qp_handle, &qpa);
-       if (ibs != IB_SUCCESS)
-       {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " ib_query_qp() ERR %s\n", ib_get_err_str(ibs)); 
-               goto bail;
-       }
-
-       /* Send QP info, IA address, and private data */
-       qp_cm.qpn = qpa.num; /* ib_net32_t */
-       qp_cm.port = ia_ptr->hca_ptr->port_num;
-       qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr, ia_ptr->hca_ptr->port_num );
-       qp_cm.ia_address = ia_ptr->hca_ptr->hca_address;
-       qp_cm.p_size = p_size;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-               "%s()\n  Tx QP info: qpn %x port %d lid 0x%x p_sz %d IP %s\n",
-               __FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port,
-               cl_ntoh16(qp_cm.lid), qp_cm.p_size,
-               dapli_get_ip_addr_str(&qp_cm.ia_address,NULL));
-
-       /* network byte-ordering - QPN & LID already are */
-       qp_cm.p_size = cl_hton32(qp_cm.p_size);
-       qp_cm.port = cl_hton16(qp_cm.port);
-
-       iovec[0].buf = (char*)&qp_cm;
-       iovec[0].len  = sizeof(ib_qp_cm_t);
-       if (p_size) {
-               iovec[1].buf = p_data;
-               iovec[1].len  = p_size;
-       }
-       rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), &len, 0, 0, 0 );
-       if (rc || len != (p_size + sizeof(ib_qp_cm_t))) {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " accept_final: ERR %d, wcnt=%d\n",
-                            WSAGetLastError(), len); 
-               goto bail;
-       }
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, 
-                    " accept_final: SRC qpn %x port %d lid 0x%x psize %d\n",
-                    qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size ); 
-       
-       /* complete handshake after final QP state change */
-       len = recv(cm_ptr->socket, (char*)&rtu_data, sizeof(rtu_data), 0);
-       if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            " accept_final: ERR %d, rcnt=%d rdata=%x\n",
-                            WSAGetLastError(), len, ntohs(rtu_data) ); 
-               goto bail;
-       }
-
-       /* final data exchange if remote QP state is good to go */
-       dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); 
-
-       dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp );
-
-       return DAT_SUCCESS;
-
 bail:
-       dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR !QP_RTR_RTS \n"); 
-       if ( cm_ptr->socket >= 0 )
-               closesocket( cm_ptr->socket );
-
-       dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
-       dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */
-
+       dapli_cm_destroy(acm_ptr);
        return DAT_INTERNAL_ERROR;
 }
 
@@ -747,11 +1011,7 @@ dapls_ib_disconnect (
        dapl_dbg_log (DAPL_DBG_TYPE_EP,
                        "dapls_ib_disconnect(ep_handle %p ....)\n", ep_ptr);
 
-       if ( cm_ptr->socket >= 0 ) {
-               closesocket( cm_ptr->socket );
-               cm_ptr->socket = -1;
-       }
-       
+#if 0 // XXX
        /* disconnect QP ala transition to RESET state */
        ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle);
 
@@ -776,15 +1036,18 @@ dapls_ib_disconnect (
                                                NULL,
                                                ep_ptr );
                ep_ptr->cm_handle = NULL;
-               dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
        }       
-
+#endif
        /* modify QP state --> INIT */
        dapls_ib_reinit_ep(ep_ptr);
 
+       if (cm_ptr == NULL)
        return DAT_SUCCESS;
+       else
+               return dapli_socket_disconnect(cm_ptr);
 }
 
+
 /*
  * dapls_ib_disconnect_clean
  *
@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener (
        if ( cm_ptr != NULL ) {
                if ( cm_ptr->l_socket >= 0 ) {
                        closesocket( cm_ptr->l_socket );
+                       cm_ptr->l_socket = -1;
+               }
+               if ( cm_ptr->socket >= 0 ) {
+                       closesocket( cm_ptr->socket );
                        cm_ptr->socket = -1;
                }
                /* cr_thread will free */
                sp_ptr->cm_srvc_handle = NULL;
+               _write(g_scm_pipe[1], "w", sizeof "w");
        }
        return DAT_SUCCESS;
 }
 
+
 /*
  * dapls_ib_accept_connection
  *
@@ -928,7 +1197,7 @@ dapls_ib_accept_connection (
                        return status;
        }
     
-       return ( dapli_socket_accept_final(ep_ptr, cr_ptr, p_size, p_data) );
+       return ( dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data) );
 }
 
 
@@ -948,27 +1217,39 @@ dapls_ib_accept_connection (
  *     DAT_INTERNAL_ERROR
  *
  */
+
 DAT_RETURN
 dapls_ib_reject_connection (
-       IN  dp_ib_cm_handle_t   ib_cm_handle,
+       IN  dp_ib_cm_handle_t   cm_ptr,
        IN  int                 reject_reason,
-       IN  DAT_COUNT           private_data_size,
-       IN  const DAT_PVOID     private_data)
+       IN  DAT_COUNT           psize,
+       IN  const DAT_PVOID     pdata)
 {
-       ib_cm_srvc_handle_t     cm_ptr = ib_cm_handle;
+       WSABUF  iovec[1];
+       int     len;
 
        dapl_dbg_log (DAPL_DBG_TYPE_EP,
-                     "dapls_ib_reject_connection(cm_handle %p reason %x)\n",
-                     ib_cm_handle, reject_reason );
-
-       /* just close the socket and return */
-       if ( cm_ptr->socket > 0 ) {
-               closesocket( cm_ptr->socket );
+                     " reject(cm %p reason %x pdata %p psize %d)\n",
+                     cm_ptr, reject_reason, pdata, psize );
+
+       /* write reject data to indicate reject */
+       if (cm_ptr->socket >= 0) {
+               cm_ptr->dst.rej = (uint16_t)reject_reason;
+               cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej);
+               iovec[0].buf = (char*)&cm_ptr->dst;
+               iovec[0].len  = sizeof(ib_qp_cm_t);
+               (void) WSASend (cm_ptr->socket, iovec, 1, &len, 0, 0, NULL);
+               closesocket(cm_ptr->socket);
                cm_ptr->socket = -1;
        }
+
+       /* cr_thread will destroy CR */
+       cm_ptr->state = SCM_REJECTED;
+        _write(g_scm_pipe[1], "w", sizeof "w");
        return DAT_SUCCESS;
 }
 
+
 /*
  * dapls_ib_cm_remote_addr
  *
@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event (
 
 
 /*
- * dapls_ib_get_dat_event
+ * dapls_ib_get_cm_event
  *
  * Return a DAT connection event given a provider CM event.
  * 
@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event (
 }
 #endif /* NOT_USED */
 
-/* async CR processing thread to avoid blocking applications */
+/* outbound/inbound CR processing thread to avoid blocking applications */
+
+#define SCM_MAX_CONN (8 * sizeof(fd_set))
+
 void cr_thread(void *arg) 
 {
     struct dapl_hca    *hca_ptr = arg;
     ib_cm_srvc_handle_t        cr, next_cr;
     int                        max_fd, rc;
+    char               rbuf[2];
     fd_set             rfd, rfds;
     struct timeval     to;
      
@@ -1202,10 +1487,12 @@ void cr_thread(void *arg)
 
     dapl_os_lock( &hca_ptr->ib_trans.lock );
     hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
+
     while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
        
        FD_ZERO( &rfds ); 
-       max_fd = -1;
+       FD_SET(g_scm_pipe[0], &rfds);
+       max_fd = g_scm_pipe[0];
        
        if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list))
             next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
@@ -1230,32 +1517,46 @@ void cr_thread(void *arg)
                continue;
            }
                  
-           FD_SET( cr->l_socket, &rfds ); /* add to select set */
-           if ( cr->l_socket > max_fd )
+           if (cr->socket > SCM_MAX_CONN-1) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            "SCM ERR: cr->socket(%d) exceeded FD_SETSIZE %d\n",
+                               cr->socket,SCM_MAX_CONN-1);
+               continue;
+           }
+           FD_SET( cr->socket, &rfds ); /* add to select SET */
+           if ( cr->socket > max_fd )
                max_fd = cr->l_socket;
 
            /* individual select poll to check for work */
            FD_ZERO(&rfd);
-           FD_SET(cr->l_socket, &rfd);
+           FD_SET(cr->socket, &rfd);
            dapl_os_unlock(&hca_ptr->ib_trans.lock);    
 
            to.tv_sec  = 0;
            to.tv_usec = 0; /* wakeup and check destroy */
 
            /* block waiting for Rx data */
-           if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
+           if (select(cr->socket+1,&rfd,NULL,NULL,&to) == SOCKET_ERROR) {
                rc = WSAGetLastError();
                if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ )
                {
                    dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/,
                                " thread: select(sock %d) ERR %d on cr %p\n",
-                               cr->l_socket, rc, cr);
+                               cr->socket, rc, cr);
+               }
+               closesocket(cr->socket);
+               cr->socket = -1;
+           } else if (FD_ISSET(cr->socket,&rfd)) {
+               if (cr->socket > 0) {
+                       if (cr->state == SCM_LISTEN) 
+                               dapli_socket_accept(cr);
+                       else if (cr->state == SCM_ACCEPTED)
+                               dapli_socket_accept_rtu(cr);
+                       else if (cr->state == SCM_CONN_PENDING)
+                               dapli_socket_connect_rtu(cr);
+                       else if (cr->state == SCM_CONNECTED)
+                               dapli_socket_disconnect(cr);
                }
-               closesocket(cr->l_socket);
-               cr->l_socket = -1;
-           } else if (FD_ISSET(cr->l_socket,&rfd) && dapli_socket_accept(cr)) {
-               closesocket(cr->l_socket);
-               cr->l_socket = -1;
            }
            dapl_os_lock( &hca_ptr->ib_trans.lock );
            next_cr =  dapl_llist_next_entry((DAPL_LLIST_HEAD*)
@@ -1263,9 +1564,19 @@ void cr_thread(void *arg)
                                             (DAPL_LLIST_ENTRY*)&cr->entry );
        }
        dapl_os_unlock( &hca_ptr->ib_trans.lock );
+
        to.tv_sec  = 0;
        to.tv_usec = 100000; /* wakeup and check destroy */
+
        (void) select(max_fd+1, &rfds, NULL, NULL, &to);
+
+       /* if pipe data consume - used to wake this thread up */
+       if (FD_ISSET(g_scm_pipe[0],&rfds)) {
+               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread() read pipe data\n");
+printf(" cr_thread() read pipe data\n");
+               _read(g_scm_pipe[0], rbuf, 2);
+printf(" cr_thread() Finished read pipe data\n");
+       }
        dapl_os_lock( &hca_ptr->ib_trans.lock );
     }
     dapl_os_unlock( &hca_ptr->ib_trans.lock ); 
index 8e5f8ac05718d23630e69e5fd602127687f1851b..06bc704ecf74d6b94d13dc96b3390fe4cf108731 100644 (file)
@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id:  $";
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_ibal_util.h"
+#include "dapl_ibal_name_service.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id:  $";
 #include <winsock2.h>
 #include <ws2tcpip.h>
 #include <io.h>
+#include <fcntl.h>
 
+extern void cr_thread(void *arg);
 
 int g_dapl_loopback_connection = 0;
+int g_scm_pipe[2];
 
 #ifdef NOT_USED
 
@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN DAPL_HCA  *hca_ptr )
 
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s(): %p\n",__FUNCTION__,hca_ptr );
 
-       /* set inline max with enviroment or default */
+       /* set RC tunables via enviroment or default */
        hca_ptr->ib_trans.max_inline_send = 
-               dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
+               dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_DEFAULT);
+#if 0
+       hca_ptr->ib_trans.ack_retry = 
+               dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY);
+       hca_ptr->ib_trans.ack_timer =
+               dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER);
+       hca_ptr->ib_trans.rnr_retry = 
+               dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY);
+       hca_ptr->ib_trans.rnr_timer = 
+               dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER);
+       hca_ptr->ib_trans.global =
+               dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL);
+       hca_ptr->ib_trans.hop_limit =
+               dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT);
+       hca_ptr->ib_trans.tclass =
+               dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS);
+#endif
 
        /* initialize cr_list lock */
        dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock);
        if (dat_status != DAT_SUCCESS)
        {
                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
-                               " open_hca: failed to init lock\n");
+                       "%s() failed to init cr_list lock\n", __FUNCTION__);
                return DAT_INTERNAL_ERROR;
        }
 
+#if 0
+       /* initialize cq_lock */
+       dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+       if (dat_status != DAT_SUCCESS) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        "%s() failed to init cq_lock\n", __FUNCTION__);
+               return DAT_INTERNAL_ERROR;
+       }
+#endif
+
        /* initialize CM list for listens on this HCA */
        dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list);
 
+       /* create pipe communication endpoints */
+       if (_pipe(g_scm_pipe, 256, O_TEXT)) {
+               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+                       "%s() failed to create thread\n", __FUNCTION__);
+               return DAT_INTERNAL_ERROR;
+       }
+
        /* create thread to process inbound connect request */
        hca_ptr->ib_trans.cr_state = IB_THREAD_INIT;
        dat_status = dapl_os_thread_create(cr_thread, 
@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN DAPL_HCA  *hca_ptr )
 
        /* destroy cr_thread and lock */
        hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL;
+
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
                dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
                             " close_hca: waiting for cr_thread\n");