+++ /dev/null
-Bottom: fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
-Top: 99d8cf70184e482d774241ae32277d367ffabf7f
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-10-23 22:16:29 -0700
-
-Refresh of rs-iomap
-
----
-
-diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h
-index 21477e4..8634517 100644
---- a/include/rdma/rsocket.h
-+++ b/include/rdma/rsocket.h
-@@ -76,7 +76,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
- enum {
- RDMA_SQSIZE,
- RDMA_RQSIZE,
-- RDMA_INLINE
-+ RDMA_INLINE,
-+ RDMA_IOMAPSIZE
- };
-
- int rsetsockopt(int socket, int level, int optname,
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 22e474d..d620d04 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -89,7 +89,7 @@ static uint32_t polling_time = 10;
- enum {
- RS_OP_DATA,
- RS_OP_RSVD_DATA_MORE,
-- RS_OP_RSVD_DRA,
-+ RS_OP_WRITE,
- RS_OP_RSVD_DRA_MORE,
- RS_OP_SGL,
- RS_OP_RSVD,
-@@ -298,8 +298,8 @@ void rs_configure(void)
- fclose(f);
-
- /* round to supported values */
-- def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
-- def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
-+ def_iomap_size = (uint8_t) rs_value_to_scale(
-+ (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8)
- }
- init = 1;
- out:
-@@ -546,12 +546,12 @@ static void rs_free_iomappings(struct rsocket *rs)
- while (!dlist_empty(&rs->iomap_list)) {
- iomr = container_of(rs->iomap_list.next,
- struct rs_iomap_mr, entry);
-- riounmap(iomr->mr->addr, iomr->mr->length);
-+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
- }
- while (!dlist_empty(&rs->iomap_queue)) {
- iomr = container_of(rs->iomap_queue.next,
- struct rs_iomap_mr, entry);
-- riounmap(iomr->mr->addr, iomr->mr->length);
-+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
- }
- }
-
-@@ -870,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- return rs_do_connect(rs);
- }
-
--static int rs_post_write(struct rsocket *rs,
-+static int rs_post_write_msg(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags,
- uint64_t addr, uint32_t rkey)
-@@ -890,6 +890,25 @@ static int rs_post_write(struct rsocket *rs,
- return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-
-+static int rs_post_write(struct rsocket *rs,
-+ struct ibv_sge *sgl, int nsge,
-+ uint64_t wr_id, int flags,
-+ uint64_t addr, uint32_t rkey)
-+{
-+ struct ibv_send_wr wr, *bad;
-+
-+ wr.wr_id = wr_id;
-+ wr.next = NULL;
-+ wr.sg_list = sgl;
-+ wr.num_sge = nsge;
-+ wr.opcode = IBV_WR_RDMA_WRITE;
-+ wr.send_flags = flags;
-+ wr.wr.rdma.remote_addr = addr;
-+ wr.wr.rdma.rkey = rkey;
-+
-+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
-+}
-+
- /*
- * Update target SGE before sending data. Otherwise the remote side may
- * update the entry before we do.
-@@ -916,8 +935,22 @@ static int rs_write_data(struct rsocket *rs,
- rs->target_sge = 0;
- }
-
-- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-- flags, addr, rkey);
-+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-+ flags, addr, rkey);
-+}
-+
-+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-+ struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
-+{
-+ uint64_t addr;
-+
-+ rs->sseq_no++;
-+ rs->sqe_avail--;
-+ rs->sbuf_bytes_avail -= length;
-+
-+ addr = iom->sge.addr + offset - iom->offset;
-+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
-+ flags, addr, iom->sge.key);
- }
-
- static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
-@@ -930,8 +963,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
- rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-
- addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-- flags, addr, rs->remote_iomap.key);
-+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-+ flags, addr, rs->remote_iomap.key);
- }
-
- static uint32_t rs_sbuf_left(struct rsocket *rs)
-@@ -962,12 +995,12 @@ static void rs_send_credits(struct rsocket *rs)
- ibsge.lkey = 0;
- ibsge.length = sizeof(sge);
-
-- rs_post_write(rs, &ibsge, 1,
-- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-- IBV_SEND_INLINE,
-- rs->remote_sgl.addr +
-- rs->remote_sge * sizeof(struct rs_sge),
-- rs->remote_sgl.key);
-+ rs_post_write_msg(rs, &ibsge, 1,
-+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-+ IBV_SEND_INLINE,
-+ rs->remote_sgl.addr +
-+ rs->remote_sge * sizeof(struct rs_sge),
-+ rs->remote_sgl.key);
-
- rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
- rs->rbuf_free_offset += rs->rbuf_size >> 1;
-@@ -976,8 +1009,9 @@ static void rs_send_credits(struct rsocket *rs)
- if (++rs->remote_sge == rs->remote_sgl.length)
- rs->remote_sge = 0;
- } else {
-- rs_post_write(rs, NULL, 0,
-- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
-+ rs_post_write_msg(rs, NULL, 0,
-+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-+ 0, 0, 0);
- }
- }
-
-@@ -1011,6 +1045,9 @@ static int rs_poll_cq(struct rsocket *rs)
- case RS_OP_SGL:
- rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
- break;
-+ case RS_OP_IOMAP_SGL:
-+ /* The iomap was updated, that's nice to know. */
-+ break;
- case RS_OP_CTRL:
- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
- rs->state = rs_disconnected;
-@@ -1019,6 +1056,9 @@ static int rs_poll_cq(struct rsocket *rs)
- rs->state &= ~rs_connect_rd;
- }
- break;
-+ case RS_OP_WRITE:
-+ /* We really shouldn't be here. */
-+ break;
- default:
- rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
- rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
-@@ -1036,6 +1076,10 @@ static int rs_poll_cq(struct rsocket *rs)
- if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
- rs->state = rs_disconnected;
- break;
-+ case RS_OP_IOMAP_SGL:
-+ rs->sqe_avail++;
-+ rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
-+ break;
- default:
- rs->sqe_avail++;
- rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
-@@ -1935,8 +1979,8 @@ int rshutdown(int socket, int how)
-
- if ((rs->state & rs_connected) && rs->ctrl_avail) {
- rs->ctrl_avail--;
-- ret = rs_post_write(rs, NULL, 0,
-- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-+ ret = rs_post_write_msg(rs, NULL, 0,
-+ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
- }
- }
-
-@@ -2090,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname,
- if (rs->sq_inline < RS_MIN_INLINE)
- rs->sq_inline = RS_MIN_INLINE;
- break;
-+ case RDMA_IOMAPSIZE:
-+ rs->target_iomap_size = (uint16_t) rs_scale_to_value(
-+ (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
-+ break;
- default:
- break;
- }
-@@ -2191,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname,
- *((int *) optval) = rs->sq_inline;
- *optlen = sizeof(int);
- break;
-+ case RDMA_IOMAPSIZE:
-+ *((int *) optval) = rs->target_iomap_size;
-+ *optlen = sizeof(int);
-+ break;
- default:
- ret = ENOTSUP;
- break;
-@@ -2342,7 +2394,94 @@ out:
- return ret;
- }
-
-+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
-+{
-+ int i;
-+
-+ for (i = 0; i < rs->target_iomap_size; i++) {
-+ if (offset >= rs->target_iomap[i].offset &&
-+ offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
-+ return &rs->target_iomap[i];
-+ }
-+ return NULL;
-+}
-+
- size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
- {
-+ struct rsocket *rs;
-+ struct rs_iomap *iom = NULL;
-+ struct ibv_sge sge;
-+ size_t left;
-+ uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
-+ int ret = 0;
-+
-+ rs = idm_at(&idm, socket);
-+ fastlock_acquire(&rs->slock);
-+ if (rs->iomap_pending) {
-+ ret = rs_send_iomaps(rs, flags);
-+ if (ret)
-+ goto out;
-+ }
-+ for (left = count; left;
-+ left -= xfer_size, buf += xfer_size, offset += xfer_size) {
-+ if (!iom || offset > iom->offset + iom->sge.length) {
-+ iom = rs_find_iomap(rs, offset);
-+ if (!iom)
-+ break;
-+ }
-+
-+ if (!rs_can_send(rs)) {
-+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-+ rs_conn_can_send);
-+ if (ret)
-+ break;
-+ if (!(rs->state & rs_connect_wr)) {
-+ ret = ERR(ECONNRESET);
-+ break;
-+ }
-+ }
-+
-+ if (olen < left) {
-+ xfer_size = olen;
-+ if (olen < RS_MAX_TRANSFER)
-+ olen <<= 1;
-+ } else {
-+ xfer_size = left;
-+ }
-+
-+ if (xfer_size > rs->sbuf_bytes_avail)
-+ xfer_size = rs->sbuf_bytes_avail;
-+ if (xfer_size > iom->offset + iom->sge.length - offset)
-+ xfer_size = iom->offset + iom->sge.length - offset;
-+
-+ if (xfer_size <= rs->sq_inline) {
-+ sge.addr = (uintptr_t) buf;
-+ sge.length = xfer_size;
-+ sge.lkey = 0;
-+ ret = rs_write_direct(rs, iom, offset, &sge, 1,
-+ xfer_size, IBV_SEND_INLINE);
-+ } else if (xfer_size <= rs_sbuf_left(rs)) {
-+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
-+ rs->ssgl[0].length = xfer_size;
-+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
-+ if (xfer_size < rs_sbuf_left(rs))
-+ rs->ssgl[0].addr += xfer_size;
-+ else
-+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+ } else {
-+ rs->ssgl[0].length = rs_sbuf_left(rs);
-+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
-+ rs->ssgl[0].length);
-+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
-+ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
-+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+ }
-+ if (ret)
-+ break;
-+ }
-+out:
-+ fastlock_release(&rs->slock);
-
-+ return (ret && left == count) ? ret : count - left;
- }
Bottom: daf53db464152f40dc8d6f2c99844510b03f8567
-Top: fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
+Top: 99d8cf70184e482d774241ae32277d367ffabf7f
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-10-21 14:16:03 -0700
---
diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h
-index 65feda9..21477e4 100644
+index 65feda9..8634517 100644
--- a/include/rdma/rsocket.h
+++ b/include/rdma/rsocket.h
@@ -1,5 +1,5 @@
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
-@@ -85,6 +85,10 @@ int rgetsockopt(int socket, int level, int optname,
+@@ -76,7 +76,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
+ enum {
+ RDMA_SQSIZE,
+ RDMA_RQSIZE,
+- RDMA_INLINE
++ RDMA_INLINE,
++ RDMA_IOMAPSIZE
+ };
+
+ int rsetsockopt(int socket, int level, int optname,
+@@ -85,6 +86,10 @@ int rgetsockopt(int socket, int level, int optname,
void *optval, socklen_t *optlen);
int rfcntl(int socket, int cmd, ... /* arg */ );
+
+#endif /* INDEXER_H */
diff --git a/src/rsocket.c b/src/rsocket.c
-index cc5effe..22e474d 100644
+index cc5effe..d620d04 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -55,6 +55,7 @@
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
-@@ -76,9 +78,12 @@ static uint32_t polling_time = 10;
+@@ -76,19 +78,22 @@ static uint32_t polling_time = 10;
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
*/
enum {
-@@ -88,7 +93,7 @@ enum {
+ RS_OP_DATA,
+ RS_OP_RSVD_DATA_MORE,
+- RS_OP_RSVD_DRA,
++ RS_OP_WRITE,
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
- if (def_wmem < 1)
- def_wmem = 1;
+ /* round to supported values */
-+ def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
-+ def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
++ def_iomap_size = (uint8_t) rs_value_to_scale(
++ (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8)
}
init = 1;
out:
+ while (!dlist_empty(&rs->iomap_list)) {
+ iomr = container_of(rs->iomap_list.next,
+ struct rs_iomap_mr, entry);
-+ riounmap(iomr->mr->addr, iomr->mr->length);
++ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+ while (!dlist_empty(&rs->iomap_queue)) {
+ iomr = container_of(rs->iomap_queue.next,
+ struct rs_iomap_mr, entry);
-+ riounmap(iomr->mr->addr, iomr->mr->length);
++ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+}
+
rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
rs->target_sgl[0].length = ntohl(conn->data_buf.length);
rs->target_sgl[0].key = ntohl(conn->data_buf.key);
-@@ -803,6 +920,20 @@ static int rs_write_data(struct rsocket *rs,
- flags, addr, rkey);
+@@ -753,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ return rs_do_connect(rs);
}
+-static int rs_post_write(struct rsocket *rs,
++static int rs_post_write_msg(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t imm_data, int flags,
+ uint64_t addr, uint32_t rkey)
+@@ -773,6 +890,25 @@ static int rs_post_write(struct rsocket *rs,
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+ }
+
++static int rs_post_write(struct rsocket *rs,
++ struct ibv_sge *sgl, int nsge,
++ uint64_t wr_id, int flags,
++ uint64_t addr, uint32_t rkey)
++{
++ struct ibv_send_wr wr, *bad;
++
++ wr.wr_id = wr_id;
++ wr.next = NULL;
++ wr.sg_list = sgl;
++ wr.num_sge = nsge;
++ wr.opcode = IBV_WR_RDMA_WRITE;
++ wr.send_flags = flags;
++ wr.wr.rdma.remote_addr = addr;
++ wr.wr.rdma.rkey = rkey;
++
++ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
++}
++
+ /*
+ * Update target SGE before sending data. Otherwise the remote side may
+ * update the entry before we do.
+@@ -799,8 +935,36 @@ static int rs_write_data(struct rsocket *rs,
+ rs->target_sge = 0;
+ }
+
+- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+- flags, addr, rkey);
++ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
++ flags, addr, rkey);
++}
++
++static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
++ struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
++{
++ uint64_t addr;
++
++ rs->sseq_no++;
++ rs->sqe_avail--;
++ rs->sbuf_bytes_avail -= length;
++
++ addr = iom->sge.addr + offset - iom->offset;
++ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
++ flags, addr, iom->sge.key);
++}
++
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+ struct ibv_sge *sgl, int nsge, int flags)
+{
+ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-+ flags, addr, rs->remote_iomap.key);
-+}
-+
++ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
++ flags, addr, rs->remote_iomap.key);
+ }
+
static uint32_t rs_sbuf_left(struct rsocket *rs)
- {
- return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
-@@ -1046,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs)
+@@ -831,12 +995,12 @@ static void rs_send_credits(struct rsocket *rs)
+ ibsge.lkey = 0;
+ ibsge.length = sizeof(sge);
+
+- rs_post_write(rs, &ibsge, 1,
+- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+- IBV_SEND_INLINE,
+- rs->remote_sgl.addr +
+- rs->remote_sge * sizeof(struct rs_sge),
+- rs->remote_sgl.key);
++ rs_post_write_msg(rs, &ibsge, 1,
++ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
++ IBV_SEND_INLINE,
++ rs->remote_sgl.addr +
++ rs->remote_sge * sizeof(struct rs_sge),
++ rs->remote_sgl.key);
+
+ rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
+ rs->rbuf_free_offset += rs->rbuf_size >> 1;
+@@ -845,8 +1009,9 @@ static void rs_send_credits(struct rsocket *rs)
+ if (++rs->remote_sge == rs->remote_sgl.length)
+ rs->remote_sge = 0;
+ } else {
+- rs_post_write(rs, NULL, 0,
+- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
++ rs_post_write_msg(rs, NULL, 0,
++ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
++ 0, 0, 0);
+ }
+ }
+
+@@ -880,6 +1045,9 @@ static int rs_poll_cq(struct rsocket *rs)
+ case RS_OP_SGL:
+ rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+ break;
++ case RS_OP_IOMAP_SGL:
++ /* The iomap was updated, that's nice to know. */
++ break;
+ case RS_OP_CTRL:
+ if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+ rs->state = rs_disconnected;
+@@ -888,6 +1056,9 @@ static int rs_poll_cq(struct rsocket *rs)
+ rs->state &= ~rs_connect_rd;
+ }
+ break;
++ case RS_OP_WRITE:
++ /* We really shouldn't be here. */
++ break;
+ default:
+ rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
+ rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+@@ -905,6 +1076,10 @@ static int rs_poll_cq(struct rsocket *rs)
+ if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
+ rs->state = rs_disconnected;
+ break;
++ case RS_OP_IOMAP_SGL:
++ rs->sqe_avail++;
++ rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
++ break;
+ default:
+ rs->sqe_avail++;
+ rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
+@@ -1046,7 +1221,7 @@ static int rs_poll_all(struct rsocket *rs)
*/
static int rs_can_send(struct rsocket *rs)
{
(rs->sseq_no != rs->sseq_comp) &&
(rs->target_sgl[rs->target_sge].length != 0);
}
-@@ -1216,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
+@@ -1216,6 +1391,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
return rrecvv(socket, iov, iovcnt, 0);
}
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1239,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1239,6 +1481,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
}
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1289,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1289,6 +1536,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
if (ret)
break;
}
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
-@@ -1347,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1347,6 +1595,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
len += iov[i].iov_len;
fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1395,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1395,6 +1648,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
if (ret)
break;
}
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
-@@ -1814,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -1725,8 +1979,8 @@ int rshutdown(int socket, int how)
+
+ if ((rs->state & rs_connected) && rs->ctrl_avail) {
+ rs->ctrl_avail--;
+- ret = rs_post_write(rs, NULL, 0,
+- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
++ ret = rs_post_write_msg(rs, NULL, 0,
++ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+ }
+ }
+
+@@ -1814,6 +2068,8 @@ int rsetsockopt(int socket, int level, int optname,
case SO_SNDBUF:
if (!rs->sbuf)
rs->sbuf_size = (*(uint32_t *) optval) << 1;
ret = 0;
break;
case SO_LINGER:
-@@ -2020,3 +2232,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
+@@ -1878,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname,
+ if (rs->sq_inline < RS_MIN_INLINE)
+ rs->sq_inline = RS_MIN_INLINE;
+ break;
++ case RDMA_IOMAPSIZE:
++ rs->target_iomap_size = (uint16_t) rs_scale_to_value(
++ (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
++ break;
+ default:
+ break;
+ }
+@@ -1979,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname,
+ *((int *) optval) = rs->sq_inline;
+ *optlen = sizeof(int);
+ break;
++ case RDMA_IOMAPSIZE:
++ *((int *) optval) = rs->target_iomap_size;
++ *optlen = sizeof(int);
++ break;
+ default:
+ ret = ENOTSUP;
+ break;
+@@ -2020,3 +2284,204 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
va_end(args);
return ret;
}
+ return ret;
+}
+
++static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
++{
++ int i;
++
++ for (i = 0; i < rs->target_iomap_size; i++) {
++ if (offset >= rs->target_iomap[i].offset &&
++ offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
++ return &rs->target_iomap[i];
++ }
++ return NULL;
++}
++
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
++ struct rsocket *rs;
++ struct rs_iomap *iom = NULL;
++ struct ibv_sge sge;
++ size_t left;
++ uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
++ int ret = 0;
++
++ rs = idm_at(&idm, socket);
++ fastlock_acquire(&rs->slock);
++ if (rs->iomap_pending) {
++ ret = rs_send_iomaps(rs, flags);
++ if (ret)
++ goto out;
++ }
++ for (left = count; left;
++ left -= xfer_size, buf += xfer_size, offset += xfer_size) {
++ if (!iom || offset > iom->offset + iom->sge.length) {
++ iom = rs_find_iomap(rs, offset);
++ if (!iom)
++ break;
++ }
++
++ if (!rs_can_send(rs)) {
++ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
++ rs_conn_can_send);
++ if (ret)
++ break;
++ if (!(rs->state & rs_connect_wr)) {
++ ret = ERR(ECONNRESET);
++ break;
++ }
++ }
++
++ if (olen < left) {
++ xfer_size = olen;
++ if (olen < RS_MAX_TRANSFER)
++ olen <<= 1;
++ } else {
++ xfer_size = left;
++ }
++
++ if (xfer_size > rs->sbuf_bytes_avail)
++ xfer_size = rs->sbuf_bytes_avail;
++ if (xfer_size > iom->offset + iom->sge.length - offset)
++ xfer_size = iom->offset + iom->sge.length - offset;
++
++ if (xfer_size <= rs->sq_inline) {
++ sge.addr = (uintptr_t) buf;
++ sge.length = xfer_size;
++ sge.lkey = 0;
++ ret = rs_write_direct(rs, iom, offset, &sge, 1,
++ xfer_size, IBV_SEND_INLINE);
++ } else if (xfer_size <= rs_sbuf_left(rs)) {
++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
++ rs->ssgl[0].length = xfer_size;
++ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
++ if (xfer_size < rs_sbuf_left(rs))
++ rs->ssgl[0].addr += xfer_size;
++ else
++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
++ } else {
++ rs->ssgl[0].length = rs_sbuf_left(rs);
++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
++ rs->ssgl[0].length);
++ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
++ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
++ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
++ }
++ if (ret)
++ break;
++ }
++out:
++ fastlock_release(&rs->slock);
+
++ return (ret && left == count) ? ret : count - left;
+}