From 86520b86ffb45d3caf6e5bd94271f99deef0a5f9 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Fri, 16 Aug 2013 15:15:12 -0700 Subject: [PATCH] rsockets: Handle race between rshutdown and rpoll Multi-threaded applications which call rpoll and rshutdown simultaneously can hang. Ceph developers reported an issue with the rsocket implementation. Ceph calls rpoll in one thread, and while that thread is blocked in rpoll, a second thread may cann rshutdown on the socket. In normal sockets, this results in the poll call unblocking (since a call to read on the socket will no longer block). however, rsockets does not free the thread blocked on the rpoll call. To fix this, we add some additional state checking to protect against threads calling rpoll and rshutdown simultaneously. We also have the rshutdown call transition the QP into an error state. This causes all posted receives to complete as flushed, which results in unblocking the thread in rpoll (to process the flushed receives). Signed-off-by: Sean Hefty --- src/cma.c | 23 +++++++++++++---------- src/cma.h | 1 + src/rsocket.c | 35 +++++++++++++++++++++++++---------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/cma.c b/src/cma.c index 374844c1..4f41879a 100644 --- a/src/cma.c +++ b/src/cma.c @@ -1543,22 +1543,25 @@ int rdma_notify(struct rdma_cm_id *id, enum ibv_event_type event) return 0; } -int rdma_disconnect(struct rdma_cm_id *id) +int ucma_shutdown(struct rdma_cm_id *id) { - struct ucma_abi_disconnect cmd; - struct cma_id_private *id_priv; - int ret; - switch (id->verbs->device->transport_type) { case IBV_TRANSPORT_IB: - ret = ucma_modify_qp_err(id); - break; + return ucma_modify_qp_err(id); case IBV_TRANSPORT_IWARP: - ret = ucma_modify_qp_sqd(id); - break; + return ucma_modify_qp_sqd(id); default: - ret = ERR(EINVAL); + return ERR(EINVAL); } +} + +int rdma_disconnect(struct rdma_cm_id *id) +{ + struct ucma_abi_disconnect cmd; + struct cma_id_private *id_priv; + int ret; + + ret = ucma_shutdown(id); if (ret) return ret; diff --git a/src/cma.h b/src/cma.h index e944a9a4..4c991b42 100644 --- a/src/cma.h +++ b/src/cma.h @@ -150,6 +150,7 @@ void ucma_set_sid(enum rdma_port_space ps, struct sockaddr *addr, struct sockaddr_ib *sib); int ucma_max_qpsize(struct rdma_cm_id *id); int ucma_complete(struct rdma_cm_id *id); +int ucma_shutdown(struct rdma_cm_id *id); static inline int ERR(int err) { diff --git a/src/rsocket.c b/src/rsocket.c index d544dd09..c7a491ba 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -1822,7 +1822,12 @@ static int rs_poll_cq(struct rsocket *rs) rs->state = rs_disconnected; return 0; } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { - rs->state &= ~rs_readable; + if (rs->state & rs_writable) { + rs->state &= ~rs_readable; + } else { + rs->state = rs_disconnected; + return 0; + } } break; case RS_OP_WRITE: @@ -2948,10 +2953,12 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) rs = idm_lookup(&idm, fds[i].fd); if (rs) { + fastlock_acquire(&rs->cq_wait_lock); if (rs->type == SOCK_STREAM) rs_get_cq_event(rs); else ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); } else { fds[i].revents = rfds[i].revents; @@ -3098,7 +3105,8 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds, /* * For graceful disconnect, notify the remote side that we're - * disconnecting and wait until all outstanding sends complete. + * disconnecting and wait until all outstanding sends complete, provided + * that the remote side has not sent a disconnect message. */ int rshutdown(int socket, int how) { @@ -3106,11 +3114,6 @@ int rshutdown(int socket, int how) int ctrl, ret = 0; rs = idm_at(&idm, socket); - if (how == SHUT_RD) { - rs->state &= ~rs_readable; - return 0; - } - if (rs->fd_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); @@ -3118,15 +3121,20 @@ int rshutdown(int socket, int how) if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; rs->state &= ~(rs_readable | rs_writable); - } else { + } else if (how == SHUT_WR) { rs->state &= ~rs_writable; ctrl = (rs->state & rs_readable) ? RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; + } else { + rs->state &= ~rs_readable; + if (rs->state & rs_writable) + goto out; + ctrl = RS_CTRL_DISCONNECT; } if (!rs->ctrl_avail) { ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl); if (ret) - return ret; + goto out; } if ((rs->state & rs_connected) && rs->ctrl_avail) { @@ -3138,10 +3146,17 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) rs_process_cq(rs, 0, rs_conn_all_sends_done); +out: if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) rs_set_nonblocking(rs, rs->fd_flags); - return 0; + if (rs->state & rs_disconnected) { + /* Generate event by flushing receives to unblock rpoll */ + ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + ucma_shutdown(rs->cm_id); + } + + return ret; } static void ds_shutdown(struct rsocket *rs) -- 2.41.0