]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
scm: fixes and optimizations for connection scaling
authorArlin Davis <arlin.r.davis@intel.com>
Fri, 26 Jun 2009 21:45:34 +0000 (14:45 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Fri, 26 Jun 2009 21:45:34 +0000 (14:45 -0700)
Prioritize accepts on listen ports via FD_READ
process the accepts ahead of other work to avoid
socket half_connection (SYN_RECV) stalls.

Fix dapl_poll to return DAPL_FD_ERROR on
all event error types.

Add new state for socket released, but CR
not yet destroyed. This enables scm to release
the socket resources immediately after exchanging
all QP information. Also, add state to str call.

Only add the CR reference to the EP if it is
RC type. UD has multiple CR's per EP so when
a UD EP disconnect_clean was called, from a
timeout, it destroyed the wrong CR.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
dapl/openib_scm/dapl_ib_cm.c
dapl/openib_scm/dapl_ib_util.h

index 27defb6169ef351321aafb76e049d72a9a80ef2c..90d6d278dd69aec8b475f48384b02ee144bc2662 100644 (file)
@@ -213,12 +213,14 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
        fds.events = event;
        fds.revents = 0;
        ret = poll(&fds, 1, 0);
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_poll: ret=%d, events=0x%x\n",
-                    ret, fds.revents);
-       if (ret <= 0)
-               return ret;
-
-       return fds.revents;
+       dapl_log(DAPL_DBG_TYPE_CM, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",
+                s, ret, fds.revents);
+       if (ret == 0)
+               return 0;
+       else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) 
+               return DAPL_FD_ERROR;
+       else 
+               return fds.revents;
 }
 
 static int dapl_select(struct dapl_fd_set *set)
@@ -271,8 +273,10 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
 
        dapl_os_lock(&cm_ptr->lock);
        cm_ptr->state = SCM_DESTROY;
-       if ((cm_ptr->ep) && (cm_ptr->ep->cm_handle == cm_ptr))
+       if ((cm_ptr->ep) && (cm_ptr->ep->cm_handle == cm_ptr)) {
                cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
+               cm_ptr->ep = NULL;
+       }
 
        /* close socket if still active */
        if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
@@ -369,11 +373,14 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
 
        if (err) {
                dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_PENDING: socket ERR %s -> %s\n",
-                        strerror(err), inet_ntoa(((struct sockaddr_in *)
-                                                  ep_ptr->param.
-                                                  remote_ia_address_ptr)->
-                                                 sin_addr));
+                        " CONN_PENDING: %s ERR %s -> %s %d\n",
+                        err == -1 ? "POLL" : "SOCKOPT",
+                        err == -1 ? strerror(errno) : strerror(err), 
+                        inet_ntoa(((struct sockaddr_in *)
+                                  ep_ptr->param.
+                                  remote_ia_address_ptr)->sin_addr), 
+                        ntohs(((struct sockaddr_in *)
+                               &cm_ptr->dst.ia_address)->sin_port));
                goto bail;
        }
        dapl_dbg_log(DAPL_DBG_TYPE_EP,
@@ -486,6 +493,9 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
        cm_ptr->hca = ia_ptr->hca_ptr;
        cm_ptr->ep = ep_ptr;
        cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address;
+       ((struct sockaddr_in *)
+               &cm_ptr->dst.ia_address)->sin_port = ntohs(r_qual);
+
        if (p_size) {
                cm_ptr->dst.p_size = htonl(p_size);
                dapl_os_memcpy(cm_ptr->p_data, p_data, p_size);
@@ -642,7 +652,6 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
                goto bail;
        }
        /* init cm_handle and post the event with private data */
-       ep_ptr->cm_handle = cm_ptr;
        cm_ptr->state = SCM_CONNECTED;
        event = IB_CME_CONNECTED;
        dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
@@ -677,11 +686,15 @@ ud_bail:
                /* done with socket, don't destroy cm_ptr, need pdata */
                closesocket(cm_ptr->socket);
                cm_ptr->socket = DAPL_INVALID_SOCKET;
+               cm_ptr->state = SCM_RELEASED;
        } else
 #endif
+       {
+               ep_ptr->cm_handle = cm_ptr; /* only RC, multi CR's on UD */
                dapl_evd_connection_callback(cm_ptr,
                                             IB_CME_CONNECTED,
                                             cm_ptr->p_data, ep_ptr);
+       }
        return;
 
 bail:
@@ -769,34 +782,36 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        int len;
 
        dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket_accept\n");
+       
+       /* 
+        * Accept all CR's on this port to avoid half-connection (SYN_RCV)
+        * stalls with many to one connection storms
+        */
+       do {
+               /* Allocate accept CM and initialize */
+               if ((acm_ptr = dapli_cm_create()) == NULL)
+                       return;
+
+               acm_ptr->sp = cm_ptr->sp;
+               acm_ptr->hca = cm_ptr->hca;
+
+               len = sizeof(acm_ptr->dst.ia_address);
+               acm_ptr->socket = accept(cm_ptr->socket,
+                                       (struct sockaddr *)
+                                       &acm_ptr->dst.ia_address,
+                                       (socklen_t *) & len);
+               if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
+                       dapl_log(DAPL_DBG_TYPE_ERR,
+                               " accept: ERR %s on FD %d l_cr %p\n",
+                               strerror(errno), cm_ptr->socket, cm_ptr);
+                       dapli_cm_destroy(acm_ptr);
+                       return;
+               }
 
-       /* Allocate accept CM and initialize */
-       if ((acm_ptr = dapli_cm_create()) == NULL)
-               return;
-
-       acm_ptr->sp = cm_ptr->sp;
-       acm_ptr->hca = cm_ptr->hca;
-
-       len = sizeof(acm_ptr->dst.ia_address);
-       acm_ptr->socket = accept(cm_ptr->socket,
-                                (struct sockaddr *)&acm_ptr->dst.ia_address,
-                                (socklen_t *) & len);
-       if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " accept: ERR %s on FD %d l_cr %p\n",
-                        strerror(errno), cm_ptr->socket, cm_ptr);
-               goto bail;
-       }
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " socket accepted, queue new cm %p\n", acm_ptr);
-
-       acm_ptr->state = SCM_ACCEPTING;
-       dapli_cm_queue(acm_ptr);
-       return;
-      bail:
-       /* close socket, free cm structure, active will see socket close as reject */
-       dapli_cm_destroy(acm_ptr);
+               acm_ptr->state = SCM_ACCEPTING;
+               dapli_cm_queue(acm_ptr);
+       
+       } while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);
 }
 
 /*
@@ -964,6 +979,9 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
        local.lid = ia_ptr->hca_ptr->ib_trans.lid;
        local.gid = ia_ptr->hca_ptr->ib_trans.gid;
        local.ia_address = ia_ptr->hca_ptr->hca_address;
+       ((struct sockaddr_in *)&local.ia_address)->sin_port = 
+               ntohs(cm_ptr->sp->conn_qual);
+
        local.p_size = htonl(p_size);
        iov[0].iov_base = (void *)&local;
        iov[0].iov_len = sizeof(ib_qp_cm_t);
@@ -1059,6 +1077,7 @@ void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
                 /* done with socket, don't destroy cm_ptr, need pdata */
                 closesocket(cm_ptr->socket);
                 cm_ptr->socket = DAPL_INVALID_SOCKET;
+               cm_ptr->state = SCM_RELEASED;
        } else
 #endif
                dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp);
@@ -1146,7 +1165,8 @@ dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)
  *
  * Clean up outstanding connection data. This routine is invoked
  * after the final disconnect callback has occurred. Only on the
- * ACTIVE side of a connection.
+ * ACTIVE side of a connection. It is also called if dat_ep_connect
+ * times out using the consumer supplied timeout value.
  *
  * Input:
  *     ep_ptr          DAPL_EP
@@ -1164,6 +1184,14 @@ dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,
                          IN DAT_BOOLEAN active,
                          IN const ib_cm_events_t ib_cm_event)
 {
+       /* NOTE: SCM will only initialize cm_handle with RC type
+        * 
+        * For UD there can many in-flight CR's so you 
+        * cannot cleanup timed out CR's with EP reference 
+        * alone since they share the same EP. The common
+        * code that handles connection timeout logic needs 
+        * updated for UD support.
+        */
        if (ep_ptr->cm_handle)
                dapli_cm_destroy(ep_ptr->cm_handle);
 
@@ -1633,7 +1661,7 @@ void cr_thread(void *arg)
                                     " poll ret=0x%x cr->state=%d socket=%d\n",
                                     ret, cr->state, cr->socket);
 
-                       /* data on listen, qp exchange, and on disconnect request */
+                       /* data on listen, qp exchange, and on disc req */
                        if (ret == DAPL_FD_READ) {
                                if (cr->socket != DAPL_INVALID_SOCKET) {
                                        switch (cr->state) {
@@ -1656,40 +1684,29 @@ void cr_thread(void *arg)
                                                break;
                                        }
                                }
-                               /* connect socket is writable, check status */
-                       } else if (ret == DAPL_FD_WRITE || ret == DAPL_FD_ERROR) {
-                               if (cr->state == SCM_CONN_PENDING) {
-                                       opt = 0;
-                                       opt_len = sizeof(opt);
-                                       ret = getsockopt(cr->socket, SOL_SOCKET,
-                                                        SO_ERROR, (char *)&opt,
-                                                        &opt_len);
-                                       if (!ret)
-                                               dapli_socket_connected(cr, opt);
-                                       else
-                                               dapli_socket_connected(cr,
-                                                                      errno);
-                               } else {
-                                       dapl_log(DAPL_DBG_TYPE_CM,
-                                                " CM poll ERR, wrong state(%d) -> %s SKIP\n",
-                                                cr->state,
-                                                inet_ntoa(((struct sockaddr_in
-                                                            *)&cr->dst.
-                                                           ia_address)->
-                                                          sin_addr));
-                               }
-                       } else if (ret != 0) {
-                               dapl_log(DAPL_DBG_TYPE_CM,
-                                        " CM poll warning %s, ret=%d st=%d -> %s\n",
-                                        strerror(errno), ret, cr->state,
-                                        inet_ntoa(((struct sockaddr_in *)
-                                                   &cr->dst.ia_address)->
-                                                  sin_addr));
-
-                               /* POLLUP, NVAL, or poll error. - DISC */
+                       /* connect socket is writable, check status */
+                       } else if (ret == DAPL_FD_WRITE ||
+                                  (cr->state == SCM_CONN_PENDING && 
+                                   ret == DAPL_FD_ERROR)) {
+                               opt = 0;
+                               opt_len = sizeof(opt);
+                               ret = getsockopt(cr->socket, SOL_SOCKET,
+                                                SO_ERROR, (char *)&opt,
+                                                &opt_len);
+                               if (!ret)
+                                       dapli_socket_connected(cr, opt);
+                               else
+                                       dapli_socket_connected(cr, errno);
+                        
+                       /* POLLUP, ERR, NVAL, or poll error - DISC */
+                       } else if (ret < 0 || ret == DAPL_FD_ERROR) {
+                               dapl_log(DAPL_DBG_TYPE_WARN,
+                                    " poll=%d cr->st=%s sk=%d ep %p, %d\n",
+                                    ret, dapl_cm_state_str(cr->state), 
+                                    cr->socket, cr->ep,
+                                    cr->ep ? cr->ep->param.ep_state:0);
                                dapli_socket_disconnect(cr);
                        }
-
                        dapl_os_lock(&hca_ptr->ib_trans.lock);
                }
 
@@ -1748,12 +1765,19 @@ void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
                                 &ia_ptr->hca_ptr->ib_trans.list,
                                (DAPL_LLIST_ENTRY*)&cr->entry);
 
-               printf( "  CONN[%d]: sp %p ep %p sock %d %s %s -> %s\n",
+               printf( "  CONN[%d]: sp %p ep %p sock %d %s %s %s %s %d\n",
                        i, cr->sp, cr->ep, cr->socket,
                        cr->dst.qp_type == IBV_QPT_RC ? "RC" : "UD",
                        dapl_cm_state_str(cr->state),
+                       cr->sp ? "<-" : "->",
+                       cr->state == SCM_LISTEN ? 
+                       inet_ntoa(((struct sockaddr_in *)
+                               &ia_ptr->hca_ptr->hca_address)->sin_addr) :
                        inet_ntoa(((struct sockaddr_in *)
-                                 &cr->dst.ia_address)->sin_addr));
+                               &cr->dst.ia_address)->sin_addr),
+                       cr->sp ? (int)cr->sp->conn_qual : 
+                       ntohs(((struct sockaddr_in *)
+                               &cr->dst.ia_address)->sin_port));
                i++;
        }
        printf("\n");
index 294ef3d994603f7eccac09a7403a60285633d8c5..a668af70a2d7750765c7d4624f5e1c00d480cc0a 100644 (file)
@@ -99,6 +99,7 @@ typedef enum scm_state
        SCM_ACCEPTED,
        SCM_REJECTED,
        SCM_CONNECTED,
+       SCM_RELEASED,
        SCM_DISCONNECTED,
        SCM_DESTROY
 } SCM_STATE;
@@ -382,10 +383,11 @@ STATIC _INLINE_ char * dapl_cm_state_str(IN int st)
                "SCM_ACCEPTED",
                "SCM_REJECTED",
                "SCM_CONNECTED",
+               "SCM_RELEASED",
                "SCM_DISCONNECTED",
                "SCM_DESTROY"
         };
-        return ((st < 0 || st > 10) ? "Invalid CM state?" : cm_state[st]);
+        return ((st < 0 || st > 11) ? "Invalid CM state?" : cm_state[st]);
 }
 
 /*