]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Wed, 24 Oct 2012 00:07:36 +0000 (17:07 -0700)
committerSean Hefty <sean.hefty@intel.com>
Wed, 24 Oct 2012 00:07:36 +0000 (17:07 -0700)
meta
patches/refresh-temp [deleted file]
patches/rs-iomap

diff --git a/meta b/meta
index e0ccf3f4bdd1ab5d883ee5d507c49e9cf47fc87c..6f177a61284cde69f60a63a25726d3f616037979 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: 2180aec29b4e74afa502cf00e29d76719a5a4d6a
-Head: c375e043233dd6d8b876b4aabdc1ee4e97462ccf
+Previous: 74fc273df4e25167ef18c7cf822c9460a7dddd00
+Head: 10fb6d99f8ab828acc07d38c3eefaed0d3591bfa
 Applied:
-  rs-iomap: c25bf6431d76aca86ee4bf93d71a319fe3096a83
-  refresh-temp: c375e043233dd6d8b876b4aabdc1ee4e97462ccf
+  rs-iomap: 10fb6d99f8ab828acc07d38c3eefaed0d3591bfa
 Unapplied:
   resv-rs-len: 7b6ff5c4894f54b221d877adcd709795dffb2fe9
   rs-target-sgl: 7a07c80f2242e80c076dcf3ec6bb4c94626b284f
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index fc151ca..0000000
+++ /dev/null
@@ -1,206 +0,0 @@
-Bottom: ea70904deb6e6424cbdeacc9a46e20ee1b29e5c0
-Top:    fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-10-23 17:07:33 -0700
-
-Refresh of rs-iomap
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index ed708d4..22e474d 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -55,6 +55,7 @@
- #define RS_OLAP_START_SIZE 2048
- #define RS_MAX_TRANSFER 65536
-+#define RS_SNDLOWAT 64
- #define RS_QP_MAX_SIZE 0xFFFE
- #define RS_QP_CTRL_SIZE 4
- #define RS_CONN_RETRIES 6
-@@ -79,7 +80,10 @@ static uint32_t polling_time = 10;
-  * for data transfers:
-  * bits [28:0]: bytes transfered
-  * for control messages:
-+ * SGL, CTRL
-  * bits [28-0]: receive credits granted
-+ * IOMAP_SGL
-+ * bits [28-16]: reserved, bits [15-0]: index
-  */
- enum {
-@@ -89,7 +93,7 @@ enum {
-       RS_OP_RSVD_DRA_MORE,
-       RS_OP_SGL,
-       RS_OP_RSVD,
--      RS_OP_RSVD_DRA_SGL,
-+      RS_OP_IOMAP_SGL,
-       RS_OP_CTRL
- };
- #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
-@@ -285,9 +289,8 @@ void rs_configure(void)
-       if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
-               fscanf(f, "%u", &def_wmem);
-               fclose(f);
--
--              if (def_wmem < 1)
--                      def_wmem = 1;
-+              if (def_wmem < RS_SNDLOWAT)
-+                      def_wmem = RS_SNDLOWAT << 1;
-       }
-       if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) {
-@@ -917,6 +920,20 @@ static int rs_write_data(struct rsocket *rs,
-                            flags, addr, rkey);
- }
-+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
-+                        struct ibv_sge *sgl, int nsge, int flags)
-+{
-+      uint64_t addr;
-+
-+      rs->sseq_no++;
-+      rs->sqe_avail--;
-+      rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-+
-+      addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-+      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-+                           flags, addr, rs->remote_iomap.key);
-+}
-+
- static uint32_t rs_sbuf_left(struct rsocket *rs)
- {
-       return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
-@@ -1160,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs)
-  */
- static int rs_can_send(struct rsocket *rs)
- {
--      return rs->sqe_avail && rs->sbuf_bytes_avail &&
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-              (rs->sseq_no != rs->sseq_comp) &&
-              (rs->target_sgl[rs->target_sge].length != 0);
- }
-@@ -1330,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
-       return rrecvv(socket, iov, iovcnt, 0);
- }
-+static int rs_send_iomaps(struct rsocket *rs, int flags)
-+{
-+      struct rs_iomap_mr *iomr;
-+      struct ibv_sge sge;
-+      struct rs_iomap iom;
-+      int ret;
-+
-+      fastlock_acquire(&rs->iomap_lock);
-+      while (!dlist_empty(&rs->iomap_queue)) {
-+              if (!rs_can_send(rs)) {
-+                      ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-+                                        rs_conn_can_send);
-+                      if (ret)
-+                              break;
-+                      if (!(rs->state & rs_connect_wr)) {
-+                              ret = ERR(ECONNRESET);
-+                              break;
-+                      }
-+              }
-+
-+              iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
-+              if (!(rs->opts & RS_OPT_SWAP_SGL)) {
-+                      iom.offset = iomr->offset;
-+                      iom.sge.addr = (uintptr_t) iomr->mr->addr;
-+                      iom.sge.length = iomr->mr->length;
-+                      iom.sge.key = iomr->mr->rkey;
-+              } else {
-+                      iom.offset = bswap_64(iomr->offset);
-+                      iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
-+                      iom.sge.length = bswap_32(iomr->mr->length);
-+                      iom.sge.key = bswap_32(iomr->mr->rkey);
-+              }
-+
-+              if (rs->sq_inline >= sizeof iom) {
-+                      sge.addr = (uintptr_t) &iom;
-+                      sge.length = sizeof iom;
-+                      sge.lkey = 0;
-+                      ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
-+              } else if (rs_sbuf_left(rs) >= sizeof iom) {
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
-+                      rs->ssgl[0].length = sizeof iom;
-+                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
-+                      if (rs_sbuf_left(rs) > sizeof iom)
-+                              rs->ssgl[0].addr += sizeof iom;
-+                      else
-+                              rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+              } else {
-+                      rs->ssgl[0].length = rs_sbuf_left(rs);
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
-+                              rs->ssgl[0].length);
-+                      rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
-+                      memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
-+                             rs->ssgl[1].length);
-+                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
-+                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+              }
-+              dlist_remove(&iomr->entry);
-+              dlist_insert_tail(&iomr->entry, &rs->iomap_list);
-+              if (ret)
-+                      break;
-+      }
-+
-+      rs->iomap_pending = dlist_empty(&rs->iomap_queue);
-+      fastlock_release(&rs->iomap_lock);
-+      return ret;
-+}
-+
- /*
-  * We overlap sending the data, by posting a small work request immediately,
-  * then increasing the size of the send on each iteration.
-@@ -1353,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-       }
-       fastlock_acquire(&rs->slock);
-+      if (rs->iomap_pending) {
-+              ret = rs_send_iomaps(rs, flags);
-+              if (ret)
-+                      goto out;
-+      }
-       for (left = len; left; left -= xfer_size, buf += xfer_size) {
-               if (!rs_can_send(rs)) {
-                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1403,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-               if (ret)
-                       break;
-       }
-+out:
-       fastlock_release(&rs->slock);
-       return (ret && left == len) ? ret : len - left;
-@@ -1461,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-               len += iov[i].iov_len;
-       fastlock_acquire(&rs->slock);
-+      if (rs->iomap_pending) {
-+              ret = rs_send_iomaps(rs, flags);
-+              if (ret)
-+                      goto out;
-+      }
-       for (left = len; left; left -= xfer_size) {
-               if (!rs_can_send(rs)) {
-                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1509,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-               if (ret)
-                       break;
-       }
-+out:
-       fastlock_release(&rs->slock);
-       return (ret && left == len) ? ret : len - left;
-@@ -1928,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname,
-               case SO_SNDBUF:
-                       if (!rs->sbuf)
-                               rs->sbuf_size = (*(uint32_t *) optval) << 1;
-+                      if (rs->sbuf_size < RS_SNDLOWAT)
-+                              rs->sbuf_size = RS_SNDLOWAT << 1;
-                       ret = 0;
-                       break;
-               case SO_LINGER:
index 7dc0c1c37b3a90e5744dbd923b12ea0085556e80..e2caf7a3b475c44bf666eda88aee22367932543b 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: daf53db464152f40dc8d6f2c99844510b03f8567
-Top:    ea70904deb6e6424cbdeacc9a46e20ee1b29e5c0
+Top:    fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-10-21 14:16:03 -0700
 
@@ -91,10 +91,18 @@ index 26e7f98..0c5f388 100644
 +
 +#endif /* INDEXER_H */
 diff --git a/src/rsocket.c b/src/rsocket.c
-index cc5effe..ed708d4 100644
+index cc5effe..22e474d 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
-@@ -62,6 +62,7 @@
+@@ -55,6 +55,7 @@
+ #define RS_OLAP_START_SIZE 2048
+ #define RS_MAX_TRANSFER 65536
++#define RS_SNDLOWAT 64
+ #define RS_QP_MAX_SIZE 0xFFFE
+ #define RS_QP_CTRL_SIZE 4
+ #define RS_CONN_RETRIES 6
+@@ -62,6 +63,7 @@
  static struct index_map idm;
  static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
  
@@ -102,16 +110,30 @@ index cc5effe..ed708d4 100644
  static uint16_t def_inline = 64;
  static uint16_t def_sqsize = 384;
  static uint16_t def_rqsize = 384;
-@@ -76,7 +77,7 @@ static uint32_t polling_time = 10;
+@@ -76,9 +78,12 @@ static uint32_t polling_time = 10;
   * bit 29: more data, 0 - end of transfer, 1 - more data available
   *
   * for data transfers:
 - * bits [28:0]: bytes transfered, 0 = 1 GB
 + * bits [28:0]: bytes transfered
   * for control messages:
++ * SGL, CTRL
   * bits [28-0]: receive credits granted
++ * IOMAP_SGL
++ * bits [28-16]: reserved, bits [15-0]: index
   */
-@@ -111,15 +112,30 @@ struct rs_sge {
+ enum {
+@@ -88,7 +93,7 @@ enum {
+       RS_OP_RSVD_DRA_MORE,
+       RS_OP_SGL,
+       RS_OP_RSVD,
+-      RS_OP_RSVD_DRA_SGL,
++      RS_OP_IOMAP_SGL,
+       RS_OP_CTRL
+ };
+ #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
+@@ -111,15 +116,30 @@ struct rs_sge {
        uint32_t length;
  };
  
@@ -146,7 +168,7 @@ index cc5effe..ed708d4 100644
        struct rs_sge     target_sgl;
        struct rs_sge     data_buf;
  };
-@@ -155,6 +171,7 @@ struct rsocket {
+@@ -155,6 +175,7 @@ struct rsocket {
        fastlock_t        rlock;
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
@@ -154,7 +176,7 @@ index cc5effe..ed708d4 100644
  
        int               opts;
        long              fd_flags;
-@@ -186,10 +203,19 @@ struct rsocket {
+@@ -186,10 +207,19 @@ struct rsocket {
  
        int               remote_sge;
        struct rs_sge     remote_sgl;
@@ -175,7 +197,7 @@ index cc5effe..ed708d4 100644
  
        uint32_t          rbuf_size;
        struct ibv_mr    *rmr;
-@@ -201,6 +227,18 @@ struct rsocket {
+@@ -201,6 +231,18 @@ struct rsocket {
        uint8_t           *sbuf;
  };
  
@@ -194,23 +216,27 @@ index cc5effe..ed708d4 100644
  void rs_configure(void)
  {
        FILE *f;
-@@ -251,6 +289,15 @@ void rs_configure(void)
-               if (def_wmem < 1)
-                       def_wmem = 1;
-       }
+@@ -247,9 +289,17 @@ void rs_configure(void)
+       if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
+               fscanf(f, "%u", &def_wmem);
+               fclose(f);
++              if (def_wmem < RS_SNDLOWAT)
++                      def_wmem = RS_SNDLOWAT << 1;
++      }
 +
 +      if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) {
 +              fscanf(f, "%hu", &def_iomap_size);
 +              fclose(f);
-+
+-              if (def_wmem < 1)
+-                      def_wmem = 1;
 +              /* round to supported values */
 +              def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
 +              def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
-+      }
+       }
        init = 1;
  out:
-       pthread_mutex_unlock(&mut);
-@@ -287,6 +334,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = inherited_rs->sq_size;
                rs->rq_size = inherited_rs->rq_size;
                rs->ctrl_avail = inherited_rs->ctrl_avail;
@@ -218,7 +244,7 @@ index cc5effe..ed708d4 100644
        } else {
                rs->sbuf_size = def_wmem;
                rs->rbuf_size = def_mem;
-@@ -294,11 +342,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = def_sqsize;
                rs->rq_size = def_rqsize;
                rs->ctrl_avail = RS_QP_CTRL_SIZE;
@@ -234,7 +260,7 @@ index cc5effe..ed708d4 100644
        return rs;
  }
  
-@@ -336,6 +388,8 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
  
  static int rs_init_bufs(struct rsocket *rs)
  {
@@ -243,7 +269,7 @@ index cc5effe..ed708d4 100644
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
        if (!rs->rmsg)
                return -1;
-@@ -348,11 +402,21 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
        if (!rs->smr)
                return -1;
  
@@ -267,7 +293,7 @@ index cc5effe..ed708d4 100644
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
                return -1;
-@@ -452,6 +516,42 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs)
        return 0;
  }
  
@@ -310,7 +336,7 @@ index cc5effe..ed708d4 100644
  static void rs_free(struct rsocket *rs)
  {
        if (rs->index >= 0)
-@@ -472,15 +572,20 @@ static void rs_free(struct rsocket *rs)
+@@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs)
                free(rs->rbuf);
        }
  
@@ -333,7 +359,7 @@ index cc5effe..ed708d4 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -492,9 +597,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
+@@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
                             struct rs_conn_data *conn)
  {
        conn->version = 1;
@@ -347,7 +373,7 @@ index cc5effe..ed708d4 100644
  
        conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
        conn->target_sgl.length = htonl(RS_SGL_SIZE);
-@@ -518,6 +625,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
            (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
                rs->opts = RS_OPT_SWAP_SGL;
  
@@ -361,7 +387,160 @@ index cc5effe..ed708d4 100644
        rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
        rs->target_sgl[0].length = ntohl(conn->data_buf.length);
        rs->target_sgl[0].key = ntohl(conn->data_buf.key);
-@@ -2020,3 +2134,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
+@@ -803,6 +920,20 @@ static int rs_write_data(struct rsocket *rs,
+                            flags, addr, rkey);
+ }
++static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
++                        struct ibv_sge *sgl, int nsge, int flags)
++{
++      uint64_t addr;
++
++      rs->sseq_no++;
++      rs->sqe_avail--;
++      rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
++
++      addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
++      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
++                           flags, addr, rs->remote_iomap.key);
++}
++
+ static uint32_t rs_sbuf_left(struct rsocket *rs)
+ {
+       return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
+@@ -1046,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs)
+  */
+ static int rs_can_send(struct rsocket *rs)
+ {
+-      return rs->sqe_avail && rs->sbuf_bytes_avail &&
++      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+              (rs->sseq_no != rs->sseq_comp) &&
+              (rs->target_sgl[rs->target_sge].length != 0);
+ }
+@@ -1216,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
+       return rrecvv(socket, iov, iovcnt, 0);
+ }
++static int rs_send_iomaps(struct rsocket *rs, int flags)
++{
++      struct rs_iomap_mr *iomr;
++      struct ibv_sge sge;
++      struct rs_iomap iom;
++      int ret;
++
++      fastlock_acquire(&rs->iomap_lock);
++      while (!dlist_empty(&rs->iomap_queue)) {
++              if (!rs_can_send(rs)) {
++                      ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
++                                        rs_conn_can_send);
++                      if (ret)
++                              break;
++                      if (!(rs->state & rs_connect_wr)) {
++                              ret = ERR(ECONNRESET);
++                              break;
++                      }
++              }
++
++              iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
++              if (!(rs->opts & RS_OPT_SWAP_SGL)) {
++                      iom.offset = iomr->offset;
++                      iom.sge.addr = (uintptr_t) iomr->mr->addr;
++                      iom.sge.length = iomr->mr->length;
++                      iom.sge.key = iomr->mr->rkey;
++              } else {
++                      iom.offset = bswap_64(iomr->offset);
++                      iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
++                      iom.sge.length = bswap_32(iomr->mr->length);
++                      iom.sge.key = bswap_32(iomr->mr->rkey);
++              }
++
++              if (rs->sq_inline >= sizeof iom) {
++                      sge.addr = (uintptr_t) &iom;
++                      sge.length = sizeof iom;
++                      sge.lkey = 0;
++                      ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
++              } else if (rs_sbuf_left(rs) >= sizeof iom) {
++                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
++                      rs->ssgl[0].length = sizeof iom;
++                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
++                      if (rs_sbuf_left(rs) > sizeof iom)
++                              rs->ssgl[0].addr += sizeof iom;
++                      else
++                              rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
++              } else {
++                      rs->ssgl[0].length = rs_sbuf_left(rs);
++                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
++                              rs->ssgl[0].length);
++                      rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
++                      memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
++                             rs->ssgl[1].length);
++                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
++                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
++              }
++              dlist_remove(&iomr->entry);
++              dlist_insert_tail(&iomr->entry, &rs->iomap_list);
++              if (ret)
++                      break;
++      }
++
++      rs->iomap_pending = dlist_empty(&rs->iomap_queue);
++      fastlock_release(&rs->iomap_lock);
++      return ret;
++}
++
+ /*
+  * We overlap sending the data, by posting a small work request immediately,
+  * then increasing the size of the send on each iteration.
+@@ -1239,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+       }
+       fastlock_acquire(&rs->slock);
++      if (rs->iomap_pending) {
++              ret = rs_send_iomaps(rs, flags);
++              if (ret)
++                      goto out;
++      }
+       for (left = len; left; left -= xfer_size, buf += xfer_size) {
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+@@ -1289,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+               if (ret)
+                       break;
+       }
++out:
+       fastlock_release(&rs->slock);
+       return (ret && left == len) ? ret : len - left;
+@@ -1347,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+               len += iov[i].iov_len;
+       fastlock_acquire(&rs->slock);
++      if (rs->iomap_pending) {
++              ret = rs_send_iomaps(rs, flags);
++              if (ret)
++                      goto out;
++      }
+       for (left = len; left; left -= xfer_size) {
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+@@ -1395,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+               if (ret)
+                       break;
+       }
++out:
+       fastlock_release(&rs->slock);
+       return (ret && left == len) ? ret : len - left;
+@@ -1814,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname,
+               case SO_SNDBUF:
+                       if (!rs->sbuf)
+                               rs->sbuf_size = (*(uint32_t *) optval) << 1;
++                      if (rs->sbuf_size < RS_SNDLOWAT)
++                              rs->sbuf_size = RS_SNDLOWAT << 1;
+                       ret = 0;
+                       break;
+               case SO_LINGER:
+@@ -2020,3 +2232,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
        va_end(args);
        return ret;
  }