]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Handle race between rshutdown and rpoll
authorSean Hefty <sean.hefty@intel.com>
Fri, 16 Aug 2013 22:15:12 +0000 (15:15 -0700)
committerSean Hefty <sean.hefty@intel.com>
Wed, 30 Oct 2013 15:46:34 +0000 (08:46 -0700)
Multi-threaded applications which call rpoll and rshutdown
simultaneously can hang.  Ceph developers reported an issue
with the rsocket implementation.  Ceph calls rpoll in
one thread, and while that thread is blocked in rpoll,
a second thread may cann rshutdown on the socket.  In
normal sockets, this results in the poll call unblocking
(since a call to read on the socket will no longer block).
however, rsockets does not free the thread blocked on the
rpoll call.

To fix this, we add some additional state checking to
protect against threads calling rpoll and rshutdown
simultaneously.  We also have the rshutdown call
transition the QP into an error state.  This causes all
posted receives to complete as flushed, which results
in unblocking the thread in rpoll (to process the flushed
receives).

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/cma.c
src/cma.h
src/rsocket.c

index 374844c16449b7e7973419ee22f0e10c321c00ac..4f41879a4df507e1562921447f7695eca18fd39d 100644 (file)
--- a/src/cma.c
+++ b/src/cma.c
@@ -1543,22 +1543,25 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event)
        return 0;
 }
 
-int rdma_disconnect(struct rdma_cm_id *id)
+int ucma_shutdown(struct rdma_cm_id *id)
 {
-       struct ucma_abi_disconnect cmd;
-       struct cma_id_private *id_priv;
-       int ret;
-
        switch (id->verbs->device->transport_type) {
        case IBV_TRANSPORT_IB:
-               ret = ucma_modify_qp_err(id);
-               break;
+               return ucma_modify_qp_err(id);
        case IBV_TRANSPORT_IWARP:
-               ret = ucma_modify_qp_sqd(id);
-               break;
+               return ucma_modify_qp_sqd(id);
        default:
-               ret = ERR(EINVAL);
+               return ERR(EINVAL);
        }
+}
+
+int rdma_disconnect(struct rdma_cm_id *id)
+{
+       struct ucma_abi_disconnect cmd;
+       struct cma_id_private *id_priv;
+       int ret;
+
+       ret = ucma_shutdown(id);
        if (ret)
                return ret;
 
index e944a9a4c5e2ae1cb323da05bfafc1118e9b68e9..4c991b426f48f8e33466cb9609114ff5066f9316 100644 (file)
--- a/src/cma.h
+++ b/src/cma.h
@@ -150,6 +150,7 @@ void ucma_set_sid(enum rdma_port_space ps, struct sockaddr *addr,
                  struct sockaddr_ib *sib);
 int ucma_max_qpsize(struct rdma_cm_id *id);
 int ucma_complete(struct rdma_cm_id *id);
+int ucma_shutdown(struct rdma_cm_id *id);
 
 static inline int ERR(int err)
 {
index d544dd097cda228de114173c8fe569dc1881f057..c7a491ba0a4fe06c52c00eebc8184453d95849c4 100644 (file)
@@ -1822,7 +1822,12 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
-                                       rs->state &= ~rs_readable;
+                                       if (rs->state & rs_writable) {
+                                               rs->state &= ~rs_readable;
+                                       } else {
+                                               rs->state = rs_disconnected;
+                                               return 0;
+                                       }
                                }
                                break;
                        case RS_OP_WRITE:
@@ -2948,10 +2953,12 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
 
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
+                       fastlock_acquire(&rs->cq_wait_lock);
                        if (rs->type == SOCK_STREAM)
                                rs_get_cq_event(rs);
                        else
                                ds_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
@@ -3098,7 +3105,8 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds,
 
 /*
  * For graceful disconnect, notify the remote side that we're
- * disconnecting and wait until all outstanding sends complete.
+ * disconnecting and wait until all outstanding sends complete, provided
+ * that the remote side has not sent a disconnect message.
  */
 int rshutdown(int socket, int how)
 {
@@ -3106,11 +3114,6 @@ int rshutdown(int socket, int how)
        int ctrl, ret = 0;
 
        rs = idm_at(&idm, socket);
-       if (how == SHUT_RD) {
-               rs->state &= ~rs_readable;
-               return 0;
-       }
-
        if (rs->fd_flags & O_NONBLOCK)
                rs_set_nonblocking(rs, 0);
 
@@ -3118,15 +3121,20 @@ int rshutdown(int socket, int how)
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
                        rs->state &= ~(rs_readable | rs_writable);
-               } else {
+               } else if (how == SHUT_WR) {
                        rs->state &= ~rs_writable;
                        ctrl = (rs->state & rs_readable) ?
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
+               } else {
+                       rs->state &= ~rs_readable;
+                       if (rs->state & rs_writable)
+                               goto out;
+                       ctrl = RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
                        ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
                        if (ret)
-                               return ret;
+                               goto out;
                }
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
@@ -3138,10 +3146,17 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected)
                rs_process_cq(rs, 0, rs_conn_all_sends_done);
 
+out:
        if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
                rs_set_nonblocking(rs, rs->fd_flags);
 
-       return 0;
+       if (rs->state & rs_disconnected) {
+               /* Generate event by flushing receives to unblock rpoll */
+               ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+               ucma_shutdown(rs->cm_id);
+       }
+
+       return ret;
 }
 
 static void ds_shutdown(struct rsocket *rs)