+++ /dev/null
-Bottom: 2d7d36989522b9e010497256793038fb2cb5aa9e
-Top: b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-07-26 15:35:32 -0700
-
-rsockets: Use wr_id to determine completion type
-
-If a work request has completed in error, the completion type
-field is undefined. Use the wr_id to determine if the failed
-completion was a send or receive.
-
-This fixes an issue where MPI can hang during finalize. With
-both sides of a connection shutting down simultaneously, one
-side may complete quicker and delete its QP before the other
-side receives an acknowledgement to their disconnect message.
-Eventually, the disconnect message will time out, but because
-the completion type field is undefined, it may be processed
-as a failed receive, rather than a failed send. The end
-result is that the second side hangs waiting for the send to
-complete.
-
-This problem showed up more easily after commit
-2e5b0fc95964f74ea59dd725e849027faa0cd526, but existed beforehand.
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index ed125b3..1e391b3 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -132,6 +132,8 @@ union rs_wr_id {
- };
- };
-
-+#define RS_RECV_WR_ID (~((uint64_t) 0))
-+
- /*
- * rsocket states are ordered as passive, connecting, connected, disconnected.
- */
-@@ -418,6 +420,19 @@ err1:
- return -1;
- }
-
-+static inline int
-+rs_post_recv(struct rsocket *rs)
-+{
-+ struct ibv_recv_wr wr, *bad;
-+
-+ wr.wr_id = RS_RECV_WR_ID;
-+ wr.next = NULL;
-+ wr.sg_list = NULL;
-+ wr.num_sge = 0;
-+
-+ return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
-+}
-+
- static int rs_create_ep(struct rsocket *rs)
- {
- struct ibv_qp_init_attr qp_attr;
-@@ -449,7 +464,7 @@ static int rs_create_ep(struct rsocket *rs)
- return ret;
-
- for (i = 0; i < rs->rq_size; i++) {
-- ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0);
-+ ret = rs_post_recv(rs);
- if (ret)
- return ret;
- }
-@@ -881,7 +896,7 @@ static int rs_poll_cq(struct rsocket *rs)
- int ret, rcnt = 0;
-
- while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
-- if (wc.opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
-+ if (wc.wr_id == RS_RECV_WR_ID) {
- if (wc.status != IBV_WC_SUCCESS)
- continue;
- rcnt++;
-@@ -923,7 +938,7 @@ static int rs_poll_cq(struct rsocket *rs)
-
- if (rs->state & rs_connected) {
- while (!ret && rcnt--)
-- ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0);
-+ ret = rs_post_recv(rs);
-
- if (ret) {
- rs->state = rs_error;
+++ /dev/null
-Bottom: b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
-Top: d24dd0fae6823a86b3a40de76ff29ff00831ae09
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-07-27 10:46:42 -0700
-
-rsocket: Improve disconnect time under normal conditions
-
-When both sides of a connection attempt to close at the same
-time, one of the two sides can easily get an error when sending
-a disconnect message. This results in that side hanging
-during close until the send times out. (The time out is caused
-by the remote side destroying its QP.)
-
-We can reduce the chance of this occurring by immediately
-assuming that the disconnect has been successful once we've
-received the remote side's disconnect message, or we've
-polled a send completion for the local disconnect message.
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 1e391b3..b9105a1 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
-
- enum {
- RS_OP_DATA,
-- RS_OP_DATA_MORE,
-- RS_OP_DRA,
-- RS_OP_DRA_MORE,
-+ RS_OP_RSVD_DATA_MORE,
-+ RS_OP_RSVD_DRA,
-+ RS_OP_RSVD_DRA_MORE,
- RS_OP_SGL,
-- RS_OP_RSVD1,
-- RS_OP_DRA_SGL,
-+ RS_OP_RSVD,
-+ RS_OP_RSVD_DRA_SGL,
- RS_OP_CTRL
- };
- #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
-@@ -124,14 +124,6 @@ struct rs_conn_data {
- struct rs_sge data_buf;
- };
-
--union rs_wr_id {
-- uint64_t wr_id;
-- struct {
-- uint32_t reserved; /* sqe_count; */
-- uint32_t length;
-- };
--};
--
- #define RS_RECV_WR_ID (~((uint64_t) 0))
-
- /*
-@@ -264,17 +256,6 @@ out:
- pthread_mutex_unlock(&mut);
- }
-
--/*
-- * We currently generate a completion per send. sqe_count = 1
-- */
--static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
--{
-- union rs_wr_id wrid;
-- /* wrid.reserved = sqe_count; */
-- wrid.length = length;
-- return wrid;
--}
--
- static int rs_insert(struct rsocket *rs)
- {
- pthread_mutex_lock(&mut);
-@@ -772,21 +753,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- return rs_do_connect(rs);
- }
-
--static void rs_shutdown_state(struct rsocket *rs, int state)
--{
-- rs->state &= ~state;
-- if (rs->state == rs_connected)
-- rs->state = rs_disconnected;
--}
--
--static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
-+static int rs_post_write(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags,
- uint64_t addr, uint32_t rkey)
- {
- struct ibv_send_wr wr, *bad;
-
-- wr.wr_id = wr_id;
-+ wr.wr_id = (uint64_t) imm_data;
- wr.next = NULL;
- wr.sg_list = sgl;
- wr.num_sge = nsge;
-@@ -803,29 +777,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
- * Update target SGE before sending data. Otherwise the remote side may
- * update the entry before we do.
- */
--static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
-+static int rs_write_data(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
-- uint32_t imm_data, int flags)
-+ uint32_t length, int flags)
- {
- uint64_t addr;
- uint32_t rkey;
-
- rs->sseq_no++;
- rs->sqe_avail--;
-- rs->sbuf_bytes_avail -= wr_id.length;
-+ rs->sbuf_bytes_avail -= length;
-
- addr = rs->target_sgl[rs->target_sge].addr;
- rkey = rs->target_sgl[rs->target_sge].key;
-
-- rs->target_sgl[rs->target_sge].addr += wr_id.length;
-- rs->target_sgl[rs->target_sge].length -= wr_id.length;
-+ rs->target_sgl[rs->target_sge].addr += length;
-+ rs->target_sgl[rs->target_sge].length -= length;
-
- if (!rs->target_sgl[rs->target_sge].length) {
- if (++rs->target_sge == RS_SGL_SIZE)
- rs->target_sge = 0;
- }
-
-- return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
-+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-+ flags, addr, rkey);
- }
-
- static uint32_t rs_sbuf_left(struct rsocket *rs)
-@@ -856,7 +831,7 @@ static void rs_send_credits(struct rsocket *rs)
- ibsge.lkey = 0;
- ibsge.length = sizeof(sge);
-
-- rs_post_write(rs, 0, &ibsge, 1,
-+ rs_post_write(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
-@@ -870,7 +845,7 @@ static void rs_send_credits(struct rsocket *rs)
- if (++rs->remote_sge == rs->remote_sgl.length)
- rs->remote_sge = 0;
- } else {
-- rs_post_write(rs, 0, NULL, 0,
-+ rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
- }
- }
-@@ -891,7 +866,6 @@ static void rs_update_credits(struct rsocket *rs)
- static int rs_poll_cq(struct rsocket *rs)
- {
- struct ibv_wc wc;
-- union rs_wr_id *wr_id;
- uint32_t imm_data;
- int ret, rcnt = 0;
-
-@@ -911,7 +885,7 @@ static int rs_poll_cq(struct rsocket *rs)
- rs->state = rs_disconnected;
- return 0;
- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-- rs_shutdown_state(rs, rs_connect_rd);
-+ rs->state &= ~rs_connect_rd;
- }
- break;
- default:
-@@ -922,12 +896,19 @@ static int rs_poll_cq(struct rsocket *rs)
- break;
- }
- } else {
-- if (wc.wr_id) {
-- wr_id = (union rs_wr_id *) &wc.wr_id;
-- rs->sqe_avail++; /* += wr_id->sqe_count; */
-- rs->sbuf_bytes_avail += wr_id->length;
-- } else {
-+ switch (rs_msg_op((uint32_t) wc.wr_id)) {
-+ case RS_OP_SGL:
- rs->ctrl_avail++;
-+ break;
-+ case RS_OP_CTRL:
-+ rs->ctrl_avail++;
-+ if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
-+ rs->state = rs_disconnected;
-+ break;
-+ default:
-+ rs->sqe_avail++;
-+ rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
-+ break;
- }
- if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
- rs->state = rs_error;
-@@ -1075,9 +1056,9 @@ static int rs_conn_can_send(struct rsocket *rs)
- return rs_can_send(rs) || !(rs->state & rs_connect_wr);
- }
-
--static int rs_can_send_ctrl(struct rsocket *rs)
-+static int rs_conn_can_send_ctrl(struct rsocket *rs)
- {
-- return rs->ctrl_avail;
-+ return rs->ctrl_avail || !(rs->state & rs_connected);
- }
-
- static int rs_have_rdata(struct rsocket *rs)
-@@ -1090,9 +1071,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
- return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
- }
-
--static int rs_all_sends_done(struct rsocket *rs)
-+static int rs_conn_all_sends_done(struct rsocket *rs)
- {
-- return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
-+ return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
-+ !(rs->state & rs_connected);
- }
-
- static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
-@@ -1281,15 +1263,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- sge.addr = (uintptr_t) buf;
- sge.length = xfer_size;
- sge.lkey = 0;
-- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-- &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
-- IBV_SEND_INLINE);
-+ ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
- } else if (xfer_size <= rs_sbuf_left(rs)) {
- memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
- rs->ssgl[0].length = xfer_size;
-- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-- rs->ssgl, 1,
-- rs_msg_set(RS_OP_DATA, xfer_size), 0);
-+ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
- if (xfer_size < rs_sbuf_left(rs))
- rs->ssgl[0].addr += xfer_size;
- else
-@@ -1300,9 +1278,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- rs->ssgl[0].length);
- rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
- memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-- rs->ssgl, 2,
-- rs_msg_set(RS_OP_DATA, xfer_size), 0);
-+ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
- }
- if (ret)
-@@ -1395,9 +1371,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
- &cur_iov, &offset, xfer_size);
- rs->ssgl[0].length = xfer_size;
-- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-- rs->ssgl, 1,
-- rs_msg_set(RS_OP_DATA, xfer_size),
-+ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
- xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
- if (xfer_size < rs_sbuf_left(rs))
- rs->ssgl[0].addr += xfer_size;
-@@ -1409,9 +1383,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- &offset, rs->ssgl[0].length);
- rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
- rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
-- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-- rs->ssgl, 2,
-- rs_msg_set(RS_OP_DATA, xfer_size),
-+ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
- xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
- }
-@@ -1724,7 +1696,7 @@ int rshutdown(int socket, int how)
-
- rs = idm_at(&idm, socket);
- if (how == SHUT_RD) {
-- rs_shutdown_state(rs, rs_connect_rd);
-+ rs->state &= ~rs_connect_rd;
- return 0;
- }
-
-@@ -1734,26 +1706,27 @@ int rshutdown(int socket, int how)
- if (rs->state & rs_connected) {
- if (how == SHUT_RDWR) {
- ctrl = RS_CTRL_DISCONNECT;
-- rs->state = rs_disconnected;
-+ rs->state &= ~(rs_connect_rd | rs_connect_wr);
- } else {
-- rs_shutdown_state(rs, rs_connect_wr);
-- ctrl = (rs->state & rs_connected) ?
-+ rs->state &= ~rs_connect_wr;
-+ ctrl = (rs->state & rs_connect_rd) ?
- RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
- }
-- if (!rs_can_send_ctrl(rs)) {
-- ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
-+ if (!rs->ctrl_avail) {
-+ ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
- if (ret)
- return ret;
- }
-
-- rs->ctrl_avail--;
-- ret = rs_post_write(rs, 0, NULL, 0,
-- rs_msg_set(RS_OP_CTRL, ctrl),
-- 0, 0, 0);
-+ if ((rs->state & rs_connected) && rs->ctrl_avail) {
-+ rs->ctrl_avail--;
-+ ret = rs_post_write(rs, NULL, 0,
-+ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-+ }
- }
-
-- if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
-- rs_process_cq(rs, 0, rs_all_sends_done);
-+ if (rs->state & rs_connected)
-+ rs_process_cq(rs, 0, rs_conn_all_sends_done);
-
- if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
- rs_set_nonblocking(rs, 1);