]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Improve disconnect time under normal conditions
authorSean Hefty <sean.hefty@intel.com>
Fri, 27 Jul 2012 17:46:42 +0000 (10:46 -0700)
committerSean Hefty <sean.hefty@intel.com>
Mon, 30 Jul 2012 18:33:02 +0000 (11:33 -0700)
When both sides of a connection attempt to close at the same
time, one of the two sides can easily get an error when sending
a disconnect message.  This results in that side hanging
during close until the send times out.  (The time out is caused
by the remote side destroying its QP.)

We can reduce the chance of this occurring by immediately
assuming that the disconnect has been successful once we've
received the remote side's disconnect message.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index 1e391b30aad1255e659e6b7f4a0e0b250b412f81..b9105a17b7d4d4c3e0614fd947461eeca5e5a12a 100644 (file)
@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
 
 enum {
        RS_OP_DATA,
-       RS_OP_DATA_MORE,
-       RS_OP_DRA,
-       RS_OP_DRA_MORE,
+       RS_OP_RSVD_DATA_MORE,
+       RS_OP_RSVD_DRA,
+       RS_OP_RSVD_DRA_MORE,
        RS_OP_SGL,
-       RS_OP_RSVD1,
-       RS_OP_DRA_SGL,
+       RS_OP_RSVD,
+       RS_OP_RSVD_DRA_SGL,
        RS_OP_CTRL
 };
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
@@ -124,14 +124,6 @@ struct rs_conn_data {
        struct rs_sge     data_buf;
 };
 
-union rs_wr_id {
-       uint64_t          wr_id;
-       struct {
-               uint32_t  reserved; /* sqe_count; */
-               uint32_t  length;
-       };
-};
-
 #define RS_RECV_WR_ID (~((uint64_t) 0))
 
 /*
@@ -264,17 +256,6 @@ out:
        pthread_mutex_unlock(&mut);
 }
 
-/*
- * We currently generate a completion per send.  sqe_count = 1
- */
-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
-{
-       union rs_wr_id wrid;
-       /* wrid.reserved = sqe_count; */
-       wrid.length = length;
-       return wrid;
-}
-
 static int rs_insert(struct rsocket *rs)
 {
        pthread_mutex_lock(&mut);
@@ -772,21 +753,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return rs_do_connect(rs);
 }
 
-static void rs_shutdown_state(struct rsocket *rs, int state)
-{
-       rs->state &= ~state;
-       if (rs->state == rs_connected)
-               rs->state = rs_disconnected;
-}
-
-static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
+static int rs_post_write(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
                         uint64_t addr, uint32_t rkey)
 {
        struct ibv_send_wr wr, *bad;
 
-       wr.wr_id = wr_id;
+       wr.wr_id = (uint64_t) imm_data;
        wr.next = NULL;
        wr.sg_list = sgl;
        wr.num_sge = nsge;
@@ -803,29 +777,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
  */
-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
+static int rs_write_data(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags)
+                        uint32_t length, int flags)
 {
        uint64_t addr;
        uint32_t rkey;
 
        rs->sseq_no++;
        rs->sqe_avail--;
-       rs->sbuf_bytes_avail -= wr_id.length;
+       rs->sbuf_bytes_avail -= length;
 
        addr = rs->target_sgl[rs->target_sge].addr;
        rkey = rs->target_sgl[rs->target_sge].key;
 
-       rs->target_sgl[rs->target_sge].addr += wr_id.length;
-       rs->target_sgl[rs->target_sge].length -= wr_id.length;
+       rs->target_sgl[rs->target_sge].addr += length;
+       rs->target_sgl[rs->target_sge].length -= length;
 
        if (!rs->target_sgl[rs->target_sge].length) {
                if (++rs->target_sge == RS_SGL_SIZE)
                        rs->target_sge = 0;
        }
 
-       return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
+       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+                            flags, addr, rkey);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -856,7 +831,7 @@ static void rs_send_credits(struct rsocket *rs)
                ibsge.lkey = 0;
                ibsge.length = sizeof(sge);
 
-               rs_post_write(rs, 0, &ibsge, 1,
+               rs_post_write(rs, &ibsge, 1,
                              rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
                              IBV_SEND_INLINE,
                              rs->remote_sgl.addr +
@@ -870,7 +845,7 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write(rs, 0, NULL, 0,
+               rs_post_write(rs, NULL, 0,
                              rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
        }
 }
@@ -891,7 +866,6 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       union rs_wr_id *wr_id;
        uint32_t imm_data;
        int ret, rcnt = 0;
 
@@ -911,7 +885,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-                                       rs_shutdown_state(rs, rs_connect_rd);
+                                       rs->state &= ~rs_connect_rd;
                                }
                                break;
                        default:
@@ -922,12 +896,19 @@ static int rs_poll_cq(struct rsocket *rs)
                                break;
                        }
                } else {
-                       if (wc.wr_id) {
-                               wr_id = (union rs_wr_id *) &wc.wr_id;
-                               rs->sqe_avail++; /* += wr_id->sqe_count; */
-                               rs->sbuf_bytes_avail += wr_id->length;
-                       } else {
+                       switch  (rs_msg_op((uint32_t) wc.wr_id)) {
+                       case RS_OP_SGL:
                                rs->ctrl_avail++;
+                               break;
+                       case RS_OP_CTRL:
+                               rs->ctrl_avail++;
+                               if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
+                                       rs->state = rs_disconnected;
+                               break;
+                       default:
+                               rs->sqe_avail++;
+                               rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
+                               break;
                        }
                        if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
                                rs->state = rs_error;
@@ -1075,9 +1056,9 @@ static int rs_conn_can_send(struct rsocket *rs)
        return rs_can_send(rs) || !(rs->state & rs_connect_wr);
 }
 
-static int rs_can_send_ctrl(struct rsocket *rs)
+static int rs_conn_can_send_ctrl(struct rsocket *rs)
 {
-       return rs->ctrl_avail;
+       return rs->ctrl_avail || !(rs->state & rs_connected);
 }
 
 static int rs_have_rdata(struct rsocket *rs)
@@ -1090,9 +1071,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
        return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
 }
 
-static int rs_all_sends_done(struct rsocket *rs)
+static int rs_conn_all_sends_done(struct rsocket *rs)
 {
-       return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
+       return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
+              !(rs->state & rs_connected);
 }
 
 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
@@ -1281,15 +1263,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                        sge.addr = (uintptr_t) buf;
                        sge.length = xfer_size;
                        sge.lkey = 0;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
-                                           IBV_SEND_INLINE);
+                       ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
                } else if (xfer_size <= rs_sbuf_left(rs)) {
                        memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
                        rs->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 1,
-                                           rs_msg_set(RS_OP_DATA, xfer_size), 0);
+                       ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
                        if (xfer_size < rs_sbuf_left(rs))
                                rs->ssgl[0].addr += xfer_size;
                        else
@@ -1300,9 +1278,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                rs->ssgl[0].length);
                        rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
                        memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 2,
-                                           rs_msg_set(RS_OP_DATA, xfer_size), 0);
+                       ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
                        rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
                }
                if (ret)
@@ -1395,9 +1371,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                        rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
                                    &cur_iov, &offset, xfer_size);
                        rs->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 1,
-                                           rs_msg_set(RS_OP_DATA, xfer_size),
+                       ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
                                            xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
                        if (xfer_size < rs_sbuf_left(rs))
                                rs->ssgl[0].addr += xfer_size;
@@ -1409,9 +1383,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                    &offset, rs->ssgl[0].length);
                        rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
                        rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 2,
-                                           rs_msg_set(RS_OP_DATA, xfer_size),
+                       ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
                                            xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
                        rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
                }
@@ -1724,7 +1696,7 @@ int rshutdown(int socket, int how)
 
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
-               rs_shutdown_state(rs, rs_connect_rd);
+               rs->state &= ~rs_connect_rd;
                return 0;
        }
 
@@ -1734,26 +1706,27 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
-                       rs->state = rs_disconnected;
+                       rs->state &= ~(rs_connect_rd | rs_connect_wr);
                } else {
-                       rs_shutdown_state(rs, rs_connect_wr);
-                       ctrl = (rs->state & rs_connected) ?
+                       rs->state &= ~rs_connect_wr;
+                       ctrl = (rs->state & rs_connect_rd) ?
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
-               if (!rs_can_send_ctrl(rs)) {
-                       ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
+               if (!rs->ctrl_avail) {
+                       ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
                        if (ret)
                                return ret;
                }
 
-               rs->ctrl_avail--;
-               ret = rs_post_write(rs, 0, NULL, 0,
-                                   rs_msg_set(RS_OP_CTRL, ctrl),
-                                   0, 0, 0);
+               if ((rs->state & rs_connected) && rs->ctrl_avail) {
+                       rs->ctrl_avail--;
+                       ret = rs_post_write(rs, NULL, 0,
+                                           rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+               }
        }
 
-       if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
-               rs_process_cq(rs, 0, rs_all_sends_done);
+       if (rs->state & rs_connected)
+               rs_process_cq(rs, 0, rs_conn_all_sends_done);
 
        if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
                rs_set_nonblocking(rs, 1);