]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Sat, 28 Jul 2012 00:35:36 +0000 (17:35 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 28 Jul 2012 00:35:36 +0000 (17:35 -0700)
meta
patches/fast-disc
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index d3ce27af625b80cbd30a82945301bdc78f8d8b0e..6a37b5ed22593ef8b3f30967b4f19af6f06a7df5 100644 (file)
--- a/meta
+++ b/meta
@@ -1,6 +1,6 @@
 Version: 1
-Previous: dc27f3442f132a5e1d2cfbf174ee8e023533de63
-Head: 726f0d0e29b67a1fe3450571a9c03f0b6557aa06
+Previous: beb307c2ec61d711e699e38893a888d9e5fe55e3
+Head: 97feb5bdfd26f7b309cdc8d59fab475206b9356f
 Applied:
   cma-rm-pd: 2ffda7f2991395570b9e776ff5ae256ca9684771
   transpose: 3e52eb22f44eafaefa95c4674bc5665a94e15694
@@ -9,8 +9,7 @@ Applied:
   init-getname: 7d988863b218d1b66e3739ec4b6f51acc72b2334
   rs-ftp: 28e0744eb89227fbeded485fbad64010b9edf0f6
   check-id: 59cddb8c2cd143ffe205716eef276b9c502e1eb5
-  fast-disc: 8ba3be5f12ffff6818c394d0b0589cb713f68841
-  refresh-temp: 726f0d0e29b67a1fe3450571a9c03f0b6557aa06
+  fast-disc: 97feb5bdfd26f7b309cdc8d59fab475206b9356f
 Unapplied:
   dbg: 0c269855776d3001e37da8c8afe283c20e1d6cd6
   waitall-buggy: c49c6b56c55385774065f5aa2704078e6ae0ceb8
index aa968a215d4843bbad2f9039b4f237db0c13cf69..d7a17f8303a56d38206e497d8e30bc0a2157d774 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
-Top:    b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
+Top:    fffced6494cfa5fc5f9da5527d93fb7fe504bd86
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-07-27 10:46:42 -0700
 
@@ -20,4 +20,306 @@ Signed-off-by: Sean Hefty <sean.hefty@intel.com>
 
 ---
 
-
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 1e391b3..908b00c 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
+ enum {
+       RS_OP_DATA,
+-      RS_OP_DATA_MORE,
+-      RS_OP_DRA,
+-      RS_OP_DRA_MORE,
++      RS_OP_RSVD_DATA_MORE,
++      RS_OP_RSVD_DRA,
++      RS_OP_RSVD_DRA_MORE,
+       RS_OP_SGL,
+-      RS_OP_RSVD1,
+-      RS_OP_DRA_SGL,
++      RS_OP_RSVD,
++      RS_OP_RSVD_DRA_SGL,
+       RS_OP_CTRL
+ };
+ #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
+@@ -124,12 +124,11 @@ struct rs_conn_data {
+       struct rs_sge     data_buf;
+ };
+-union rs_wr_id {
+-      uint64_t          wr_id;
+-      struct {
+-              uint32_t  reserved; /* sqe_count; */
+-              uint32_t  length;
+-      };
++enum {
++      RS_WR_RECV,
++      RS_WR_DATA,
++      RS_WR_CTRL,
++      RS_WR_DISCONNECT
+ };
+ #define RS_RECV_WR_ID (~((uint64_t) 0))
+@@ -264,17 +263,6 @@ out:
+       pthread_mutex_unlock(&mut);
+ }
+-/*
+- * We currently generate a completion per send.  sqe_count = 1
+- */
+-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
+-{
+-      union rs_wr_id wrid;
+-      /* wrid.reserved = sqe_count; */
+-      wrid.length = length;
+-      return wrid;
+-}
+-
+ static int rs_insert(struct rsocket *rs)
+ {
+       pthread_mutex_lock(&mut);
+@@ -772,21 +760,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+       return rs_do_connect(rs);
+ }
+-static void rs_shutdown_state(struct rsocket *rs, int state)
+-{
+-      rs->state &= ~state;
+-      if (rs->state == rs_connected)
+-              rs->state = rs_disconnected;
+-}
+-
+-static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
++static int rs_post_write(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint32_t imm_data, int flags,
+                        uint64_t addr, uint32_t rkey)
+ {
+       struct ibv_send_wr wr, *bad;
+-      wr.wr_id = wr_id;
++      wr.wr_id = (uint64_t) imm_data;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+@@ -803,29 +784,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
+  * Update target SGE before sending data.  Otherwise the remote side may
+  * update the entry before we do.
+  */
+-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
++static int rs_write_data(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+-                       uint32_t imm_data, int flags)
++                       uint32_t length, int flags)
+ {
+       uint64_t addr;
+       uint32_t rkey;
+       rs->sseq_no++;
+       rs->sqe_avail--;
+-      rs->sbuf_bytes_avail -= wr_id.length;
++      rs->sbuf_bytes_avail -= length;
+       addr = rs->target_sgl[rs->target_sge].addr;
+       rkey = rs->target_sgl[rs->target_sge].key;
+-      rs->target_sgl[rs->target_sge].addr += wr_id.length;
+-      rs->target_sgl[rs->target_sge].length -= wr_id.length;
++      rs->target_sgl[rs->target_sge].addr += length;
++      rs->target_sgl[rs->target_sge].length -= length;
+       if (!rs->target_sgl[rs->target_sge].length) {
+               if (++rs->target_sge == RS_SGL_SIZE)
+                       rs->target_sge = 0;
+       }
+-      return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
++      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
++                           flags, addr, rkey);
+ }
+ static uint32_t rs_sbuf_left(struct rsocket *rs)
+@@ -856,7 +838,7 @@ static void rs_send_credits(struct rsocket *rs)
+               ibsge.lkey = 0;
+               ibsge.length = sizeof(sge);
+-              rs_post_write(rs, 0, &ibsge, 1,
++              rs_post_write(rs, &ibsge, 1,
+                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                             IBV_SEND_INLINE,
+                             rs->remote_sgl.addr +
+@@ -870,7 +852,7 @@ static void rs_send_credits(struct rsocket *rs)
+               if (++rs->remote_sge == rs->remote_sgl.length)
+                       rs->remote_sge = 0;
+       } else {
+-              rs_post_write(rs, 0, NULL, 0,
++              rs_post_write(rs, NULL, 0,
+                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+       }
+ }
+@@ -891,7 +873,6 @@ static void rs_update_credits(struct rsocket *rs)
+ static int rs_poll_cq(struct rsocket *rs)
+ {
+       struct ibv_wc wc;
+-      union rs_wr_id *wr_id;
+       uint32_t imm_data;
+       int ret, rcnt = 0;
+@@ -911,7 +892,7 @@ static int rs_poll_cq(struct rsocket *rs)
+                                       rs->state = rs_disconnected;
+                                       return 0;
+                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+-                                      rs_shutdown_state(rs, rs_connect_rd);
++                                      rs->state &= ~rs_connect_rd;
+                               }
+                               break;
+                       default:
+@@ -922,12 +903,19 @@ static int rs_poll_cq(struct rsocket *rs)
+                               break;
+                       }
+               } else {
+-                      if (wc.wr_id) {
+-                              wr_id = (union rs_wr_id *) &wc.wr_id;
+-                              rs->sqe_avail++; /* += wr_id->sqe_count; */
+-                              rs->sbuf_bytes_avail += wr_id->length;
+-                      } else {
++                      switch  (rs_msg_op((uint32_t) wc.wr_id)) {
++                      case RS_OP_SGL:
+                               rs->ctrl_avail++;
++                              break;
++                      case RS_OP_CTRL:
++                              rs->ctrl_avail++;
++                              if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
++                                      rs->state = rs_disconnected;
++                              break;
++                      default:
++                              rs->sqe_avail++;
++                              rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
++                              break;
+                       }
+                       if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
+                               rs->state = rs_error;
+@@ -1075,9 +1063,9 @@ static int rs_conn_can_send(struct rsocket *rs)
+       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+ }
+-static int rs_can_send_ctrl(struct rsocket *rs)
++static int rs_conn_can_send_ctrl(struct rsocket *rs)
+ {
+-      return rs->ctrl_avail;
++      return rs->ctrl_avail || !(rs->state & rs_connected);
+ }
+ static int rs_have_rdata(struct rsocket *rs)
+@@ -1090,9 +1078,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
+       return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
+ }
+-static int rs_all_sends_done(struct rsocket *rs)
++static int rs_conn_all_sends_done(struct rsocket *rs)
+ {
+-      return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
++      return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
++             !(rs->state & rs_connected);
+ }
+ static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
+@@ -1281,15 +1270,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+                       sge.addr = (uintptr_t) buf;
+                       sge.length = xfer_size;
+                       sge.lkey = 0;
+-                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+-                                          &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
+-                                          IBV_SEND_INLINE);
++                      ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
+               } else if (xfer_size <= rs_sbuf_left(rs)) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+                       rs->ssgl[0].length = xfer_size;
+-                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+-                                          rs->ssgl, 1,
+-                                          rs_msg_set(RS_OP_DATA, xfer_size), 0);
++                      ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
+                       if (xfer_size < rs_sbuf_left(rs))
+                               rs->ssgl[0].addr += xfer_size;
+                       else
+@@ -1300,9 +1285,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+-                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+-                                          rs->ssgl, 2,
+-                                          rs_msg_set(RS_OP_DATA, xfer_size), 0);
++                      ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               if (ret)
+@@ -1395,9 +1378,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+                       rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
+                                   &cur_iov, &offset, xfer_size);
+                       rs->ssgl[0].length = xfer_size;
+-                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+-                                          rs->ssgl, 1,
+-                                          rs_msg_set(RS_OP_DATA, xfer_size),
++                      ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
+                                           xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
+                       if (xfer_size < rs_sbuf_left(rs))
+                               rs->ssgl[0].addr += xfer_size;
+@@ -1409,9 +1390,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+                                   &offset, rs->ssgl[0].length);
+                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+                       rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
+-                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+-                                          rs->ssgl, 2,
+-                                          rs_msg_set(RS_OP_DATA, xfer_size),
++                      ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
+                                           xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+@@ -1724,7 +1703,7 @@ int rshutdown(int socket, int how)
+       rs = idm_at(&idm, socket);
+       if (how == SHUT_RD) {
+-              rs_shutdown_state(rs, rs_connect_rd);
++              rs->state &= ~rs_connect_rd;
+               return 0;
+       }
+@@ -1734,26 +1713,27 @@ int rshutdown(int socket, int how)
+       if (rs->state & rs_connected) {
+               if (how == SHUT_RDWR) {
+                       ctrl = RS_CTRL_DISCONNECT;
+-                      rs->state = rs_disconnected;
++                      rs->state &= ~(rs_connect_rd | rs_connect_wr);
+               } else {
+-                      rs_shutdown_state(rs, rs_connect_wr);
+-                      ctrl = (rs->state & rs_connected) ?
++                      rs->state &= ~rs_connect_wr;
++                      ctrl = (rs->state & rs_connect_rd) ?
+                               RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
+               }
+-              if (!rs_can_send_ctrl(rs)) {
+-                      ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
++              if (!rs->ctrl_avail) {
++                      ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
+                       if (ret)
+                               return ret;
+               }
+-              rs->ctrl_avail--;
+-              ret = rs_post_write(rs, 0, NULL, 0,
+-                                  rs_msg_set(RS_OP_CTRL, ctrl),
+-                                  0, 0, 0);
++              if ((rs->state & rs_connected) && rs->ctrl_avail) {
++                      rs->ctrl_avail--;
++                      ret = rs_post_write(rs, NULL, 0,
++                                          rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
++              }
+       }
+-      if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
+-              rs_process_cq(rs, 0, rs_all_sends_done);
++      if (rs->state & rs_connected)
++              rs_process_cq(rs, 0, rs_conn_all_sends_done);
+       if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
+               rs_set_nonblocking(rs, 1);
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 93a536d..0000000
+++ /dev/null
@@ -1,312 +0,0 @@
-Bottom: b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
-Top:    fffced6494cfa5fc5f9da5527d93fb7fe504bd86
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-07-27 17:35:36 -0700
-
-Refresh of fast-disc
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 1e391b3..908b00c 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
- enum {
-       RS_OP_DATA,
--      RS_OP_DATA_MORE,
--      RS_OP_DRA,
--      RS_OP_DRA_MORE,
-+      RS_OP_RSVD_DATA_MORE,
-+      RS_OP_RSVD_DRA,
-+      RS_OP_RSVD_DRA_MORE,
-       RS_OP_SGL,
--      RS_OP_RSVD1,
--      RS_OP_DRA_SGL,
-+      RS_OP_RSVD,
-+      RS_OP_RSVD_DRA_SGL,
-       RS_OP_CTRL
- };
- #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
-@@ -124,12 +124,11 @@ struct rs_conn_data {
-       struct rs_sge     data_buf;
- };
--union rs_wr_id {
--      uint64_t          wr_id;
--      struct {
--              uint32_t  reserved; /* sqe_count; */
--              uint32_t  length;
--      };
-+enum {
-+      RS_WR_RECV,
-+      RS_WR_DATA,
-+      RS_WR_CTRL,
-+      RS_WR_DISCONNECT
- };
- #define RS_RECV_WR_ID (~((uint64_t) 0))
-@@ -264,17 +263,6 @@ out:
-       pthread_mutex_unlock(&mut);
- }
--/*
-- * We currently generate a completion per send.  sqe_count = 1
-- */
--static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
--{
--      union rs_wr_id wrid;
--      /* wrid.reserved = sqe_count; */
--      wrid.length = length;
--      return wrid;
--}
--
- static int rs_insert(struct rsocket *rs)
- {
-       pthread_mutex_lock(&mut);
-@@ -772,21 +760,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-       return rs_do_connect(rs);
- }
--static void rs_shutdown_state(struct rsocket *rs, int state)
--{
--      rs->state &= ~state;
--      if (rs->state == rs_connected)
--              rs->state = rs_disconnected;
--}
--
--static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
-+static int rs_post_write(struct rsocket *rs,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags,
-                        uint64_t addr, uint32_t rkey)
- {
-       struct ibv_send_wr wr, *bad;
--      wr.wr_id = wr_id;
-+      wr.wr_id = (uint64_t) imm_data;
-       wr.next = NULL;
-       wr.sg_list = sgl;
-       wr.num_sge = nsge;
-@@ -803,29 +784,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
-  * Update target SGE before sending data.  Otherwise the remote side may
-  * update the entry before we do.
-  */
--static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
-+static int rs_write_data(struct rsocket *rs,
-                        struct ibv_sge *sgl, int nsge,
--                       uint32_t imm_data, int flags)
-+                       uint32_t length, int flags)
- {
-       uint64_t addr;
-       uint32_t rkey;
-       rs->sseq_no++;
-       rs->sqe_avail--;
--      rs->sbuf_bytes_avail -= wr_id.length;
-+      rs->sbuf_bytes_avail -= length;
-       addr = rs->target_sgl[rs->target_sge].addr;
-       rkey = rs->target_sgl[rs->target_sge].key;
--      rs->target_sgl[rs->target_sge].addr += wr_id.length;
--      rs->target_sgl[rs->target_sge].length -= wr_id.length;
-+      rs->target_sgl[rs->target_sge].addr += length;
-+      rs->target_sgl[rs->target_sge].length -= length;
-       if (!rs->target_sgl[rs->target_sge].length) {
-               if (++rs->target_sge == RS_SGL_SIZE)
-                       rs->target_sge = 0;
-       }
--      return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
-+      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-+                           flags, addr, rkey);
- }
- static uint32_t rs_sbuf_left(struct rsocket *rs)
-@@ -856,7 +838,7 @@ static void rs_send_credits(struct rsocket *rs)
-               ibsge.lkey = 0;
-               ibsge.length = sizeof(sge);
--              rs_post_write(rs, 0, &ibsge, 1,
-+              rs_post_write(rs, &ibsge, 1,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                             IBV_SEND_INLINE,
-                             rs->remote_sgl.addr +
-@@ -870,7 +852,7 @@ static void rs_send_credits(struct rsocket *rs)
-               if (++rs->remote_sge == rs->remote_sgl.length)
-                       rs->remote_sge = 0;
-       } else {
--              rs_post_write(rs, 0, NULL, 0,
-+              rs_post_write(rs, NULL, 0,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
-       }
- }
-@@ -891,7 +873,6 @@ static void rs_update_credits(struct rsocket *rs)
- static int rs_poll_cq(struct rsocket *rs)
- {
-       struct ibv_wc wc;
--      union rs_wr_id *wr_id;
-       uint32_t imm_data;
-       int ret, rcnt = 0;
-@@ -911,7 +892,7 @@ static int rs_poll_cq(struct rsocket *rs)
-                                       rs->state = rs_disconnected;
-                                       return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
--                                      rs_shutdown_state(rs, rs_connect_rd);
-+                                      rs->state &= ~rs_connect_rd;
-                               }
-                               break;
-                       default:
-@@ -922,12 +903,19 @@ static int rs_poll_cq(struct rsocket *rs)
-                               break;
-                       }
-               } else {
--                      if (wc.wr_id) {
--                              wr_id = (union rs_wr_id *) &wc.wr_id;
--                              rs->sqe_avail++; /* += wr_id->sqe_count; */
--                              rs->sbuf_bytes_avail += wr_id->length;
--                      } else {
-+                      switch  (rs_msg_op((uint32_t) wc.wr_id)) {
-+                      case RS_OP_SGL:
-                               rs->ctrl_avail++;
-+                              break;
-+                      case RS_OP_CTRL:
-+                              rs->ctrl_avail++;
-+                              if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
-+                                      rs->state = rs_disconnected;
-+                              break;
-+                      default:
-+                              rs->sqe_avail++;
-+                              rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
-+                              break;
-                       }
-                       if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
-                               rs->state = rs_error;
-@@ -1075,9 +1063,9 @@ static int rs_conn_can_send(struct rsocket *rs)
-       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
- }
--static int rs_can_send_ctrl(struct rsocket *rs)
-+static int rs_conn_can_send_ctrl(struct rsocket *rs)
- {
--      return rs->ctrl_avail;
-+      return rs->ctrl_avail || !(rs->state & rs_connected);
- }
- static int rs_have_rdata(struct rsocket *rs)
-@@ -1090,9 +1078,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
-       return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
- }
--static int rs_all_sends_done(struct rsocket *rs)
-+static int rs_conn_all_sends_done(struct rsocket *rs)
- {
--      return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
-+      return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
-+             !(rs->state & rs_connected);
- }
- static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
-@@ -1281,15 +1270,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-                       sge.addr = (uintptr_t) buf;
-                       sge.length = xfer_size;
-                       sge.lkey = 0;
--                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
--                                          &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
--                                          IBV_SEND_INLINE);
-+                      ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
-               } else if (xfer_size <= rs_sbuf_left(rs)) {
-                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
-                       rs->ssgl[0].length = xfer_size;
--                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
--                                          rs->ssgl, 1,
--                                          rs_msg_set(RS_OP_DATA, xfer_size), 0);
-+                      ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
-                       if (xfer_size < rs_sbuf_left(rs))
-                               rs->ssgl[0].addr += xfer_size;
-                       else
-@@ -1300,9 +1285,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-                               rs->ssgl[0].length);
-                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
-                       memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
--                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
--                                          rs->ssgl, 2,
--                                          rs_msg_set(RS_OP_DATA, xfer_size), 0);
-+                      ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
-                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-               }
-               if (ret)
-@@ -1395,9 +1378,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-                       rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
-                                   &cur_iov, &offset, xfer_size);
-                       rs->ssgl[0].length = xfer_size;
--                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
--                                          rs->ssgl, 1,
--                                          rs_msg_set(RS_OP_DATA, xfer_size),
-+                      ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
-                                           xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
-                       if (xfer_size < rs_sbuf_left(rs))
-                               rs->ssgl[0].addr += xfer_size;
-@@ -1409,9 +1390,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-                                   &offset, rs->ssgl[0].length);
-                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
-                       rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
--                      ret = rs_write_data(rs, rs_wrid(1, xfer_size),
--                                          rs->ssgl, 2,
--                                          rs_msg_set(RS_OP_DATA, xfer_size),
-+                      ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
-                                           xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
-                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-               }
-@@ -1724,7 +1703,7 @@ int rshutdown(int socket, int how)
-       rs = idm_at(&idm, socket);
-       if (how == SHUT_RD) {
--              rs_shutdown_state(rs, rs_connect_rd);
-+              rs->state &= ~rs_connect_rd;
-               return 0;
-       }
-@@ -1734,26 +1713,27 @@ int rshutdown(int socket, int how)
-       if (rs->state & rs_connected) {
-               if (how == SHUT_RDWR) {
-                       ctrl = RS_CTRL_DISCONNECT;
--                      rs->state = rs_disconnected;
-+                      rs->state &= ~(rs_connect_rd | rs_connect_wr);
-               } else {
--                      rs_shutdown_state(rs, rs_connect_wr);
--                      ctrl = (rs->state & rs_connected) ?
-+                      rs->state &= ~rs_connect_wr;
-+                      ctrl = (rs->state & rs_connect_rd) ?
-                               RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
-               }
--              if (!rs_can_send_ctrl(rs)) {
--                      ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
-+              if (!rs->ctrl_avail) {
-+                      ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
-                       if (ret)
-                               return ret;
-               }
--              rs->ctrl_avail--;
--              ret = rs_post_write(rs, 0, NULL, 0,
--                                  rs_msg_set(RS_OP_CTRL, ctrl),
--                                  0, 0, 0);
-+              if ((rs->state & rs_connected) && rs->ctrl_avail) {
-+                      rs->ctrl_avail--;
-+                      ret = rs_post_write(rs, NULL, 0,
-+                                          rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-+              }
-       }
--      if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
--              rs_process_cq(rs, 0, rs_all_sends_done);
-+      if (rs->state & rs_connected)
-+              rs_process_cq(rs, 0, rs_conn_all_sends_done);
-       if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
-               rs_set_nonblocking(rs, 1);