+++ /dev/null
-Bottom: ea70904deb6e6424cbdeacc9a46e20ee1b29e5c0
-Top: fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-10-23 17:07:33 -0700
-
-Refresh of rs-iomap
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index ed708d4..22e474d 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -55,6 +55,7 @@
-
- #define RS_OLAP_START_SIZE 2048
- #define RS_MAX_TRANSFER 65536
-+#define RS_SNDLOWAT 64
- #define RS_QP_MAX_SIZE 0xFFFE
- #define RS_QP_CTRL_SIZE 4
- #define RS_CONN_RETRIES 6
-@@ -79,7 +80,10 @@ static uint32_t polling_time = 10;
- * for data transfers:
- * bits [28:0]: bytes transfered
- * for control messages:
-+ * SGL, CTRL
- * bits [28-0]: receive credits granted
-+ * IOMAP_SGL
-+ * bits [28-16]: reserved, bits [15-0]: index
- */
-
- enum {
-@@ -89,7 +93,7 @@ enum {
- RS_OP_RSVD_DRA_MORE,
- RS_OP_SGL,
- RS_OP_RSVD,
-- RS_OP_RSVD_DRA_SGL,
-+ RS_OP_IOMAP_SGL,
- RS_OP_CTRL
- };
- #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
-@@ -285,9 +289,8 @@ void rs_configure(void)
- if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
- fscanf(f, "%u", &def_wmem);
- fclose(f);
--
-- if (def_wmem < 1)
-- def_wmem = 1;
-+ if (def_wmem < RS_SNDLOWAT)
-+ def_wmem = RS_SNDLOWAT << 1;
- }
-
- if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) {
-@@ -917,6 +920,20 @@ static int rs_write_data(struct rsocket *rs,
- flags, addr, rkey);
- }
-
-+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
-+ struct ibv_sge *sgl, int nsge, int flags)
-+{
-+ uint64_t addr;
-+
-+ rs->sseq_no++;
-+ rs->sqe_avail--;
-+ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-+
-+ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-+ flags, addr, rs->remote_iomap.key);
-+}
-+
- static uint32_t rs_sbuf_left(struct rsocket *rs)
- {
- return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
-@@ -1160,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs)
- */
- static int rs_can_send(struct rsocket *rs)
- {
-- return rs->sqe_avail && rs->sbuf_bytes_avail &&
-+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
- (rs->sseq_no != rs->sseq_comp) &&
- (rs->target_sgl[rs->target_sge].length != 0);
- }
-@@ -1330,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
- return rrecvv(socket, iov, iovcnt, 0);
- }
-
-+static int rs_send_iomaps(struct rsocket *rs, int flags)
-+{
-+ struct rs_iomap_mr *iomr;
-+ struct ibv_sge sge;
-+ struct rs_iomap iom;
-+ int ret;
-+
-+ fastlock_acquire(&rs->iomap_lock);
-+ while (!dlist_empty(&rs->iomap_queue)) {
-+ if (!rs_can_send(rs)) {
-+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-+ rs_conn_can_send);
-+ if (ret)
-+ break;
-+ if (!(rs->state & rs_connect_wr)) {
-+ ret = ERR(ECONNRESET);
-+ break;
-+ }
-+ }
-+
-+ iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
-+ if (!(rs->opts & RS_OPT_SWAP_SGL)) {
-+ iom.offset = iomr->offset;
-+ iom.sge.addr = (uintptr_t) iomr->mr->addr;
-+ iom.sge.length = iomr->mr->length;
-+ iom.sge.key = iomr->mr->rkey;
-+ } else {
-+ iom.offset = bswap_64(iomr->offset);
-+ iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
-+ iom.sge.length = bswap_32(iomr->mr->length);
-+ iom.sge.key = bswap_32(iomr->mr->rkey);
-+ }
-+
-+ if (rs->sq_inline >= sizeof iom) {
-+ sge.addr = (uintptr_t) &iom;
-+ sge.length = sizeof iom;
-+ sge.lkey = 0;
-+ ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
-+ } else if (rs_sbuf_left(rs) >= sizeof iom) {
-+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
-+ rs->ssgl[0].length = sizeof iom;
-+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
-+ if (rs_sbuf_left(rs) > sizeof iom)
-+ rs->ssgl[0].addr += sizeof iom;
-+ else
-+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+ } else {
-+ rs->ssgl[0].length = rs_sbuf_left(rs);
-+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
-+ rs->ssgl[0].length);
-+ rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
-+ memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
-+ rs->ssgl[1].length);
-+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
-+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+ }
-+ dlist_remove(&iomr->entry);
-+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
-+ if (ret)
-+ break;
-+ }
-+
-+ rs->iomap_pending = dlist_empty(&rs->iomap_queue);
-+ fastlock_release(&rs->iomap_lock);
-+ return ret;
-+}
-+
- /*
- * We overlap sending the data, by posting a small work request immediately,
- * then increasing the size of the send on each iteration.
-@@ -1353,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- }
-
- fastlock_acquire(&rs->slock);
-+ if (rs->iomap_pending) {
-+ ret = rs_send_iomaps(rs, flags);
-+ if (ret)
-+ goto out;
-+ }
- for (left = len; left; left -= xfer_size, buf += xfer_size) {
- if (!rs_can_send(rs)) {
- ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1403,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- if (ret)
- break;
- }
-+out:
- fastlock_release(&rs->slock);
-
- return (ret && left == len) ? ret : len - left;
-@@ -1461,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- len += iov[i].iov_len;
-
- fastlock_acquire(&rs->slock);
-+ if (rs->iomap_pending) {
-+ ret = rs_send_iomaps(rs, flags);
-+ if (ret)
-+ goto out;
-+ }
- for (left = len; left; left -= xfer_size) {
- if (!rs_can_send(rs)) {
- ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1509,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- if (ret)
- break;
- }
-+out:
- fastlock_release(&rs->slock);
-
- return (ret && left == len) ? ret : len - left;
-@@ -1928,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname,
- case SO_SNDBUF:
- if (!rs->sbuf)
- rs->sbuf_size = (*(uint32_t *) optval) << 1;
-+ if (rs->sbuf_size < RS_SNDLOWAT)
-+ rs->sbuf_size = RS_SNDLOWAT << 1;
- ret = 0;
- break;
- case SO_LINGER:
Bottom: daf53db464152f40dc8d6f2c99844510b03f8567
-Top: ea70904deb6e6424cbdeacc9a46e20ee1b29e5c0
+Top: fec0ec1fc45567784bfe0ec5aa5abdf1ca3180e7
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-10-21 14:16:03 -0700
+
+#endif /* INDEXER_H */
diff --git a/src/rsocket.c b/src/rsocket.c
-index cc5effe..ed708d4 100644
+index cc5effe..22e474d 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -62,6 +62,7 @@
+@@ -55,6 +55,7 @@
+
+ #define RS_OLAP_START_SIZE 2048
+ #define RS_MAX_TRANSFER 65536
++#define RS_SNDLOWAT 64
+ #define RS_QP_MAX_SIZE 0xFFFE
+ #define RS_QP_CTRL_SIZE 4
+ #define RS_CONN_RETRIES 6
+@@ -62,6 +63,7 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
-@@ -76,7 +77,7 @@ static uint32_t polling_time = 10;
+@@ -76,9 +78,12 @@ static uint32_t polling_time = 10;
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transfered
* for control messages:
++ * SGL, CTRL
* bits [28-0]: receive credits granted
++ * IOMAP_SGL
++ * bits [28-16]: reserved, bits [15-0]: index
*/
-@@ -111,15 +112,30 @@ struct rs_sge {
+
+ enum {
+@@ -88,7 +93,7 @@ enum {
+ RS_OP_RSVD_DRA_MORE,
+ RS_OP_SGL,
+ RS_OP_RSVD,
+- RS_OP_RSVD_DRA_SGL,
++ RS_OP_IOMAP_SGL,
+ RS_OP_CTRL
+ };
+ #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
+@@ -111,15 +116,30 @@ struct rs_sge {
uint32_t length;
};
struct rs_sge target_sgl;
struct rs_sge data_buf;
};
-@@ -155,6 +171,7 @@ struct rsocket {
+@@ -155,6 +175,7 @@ struct rsocket {
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
int opts;
long fd_flags;
-@@ -186,10 +203,19 @@ struct rsocket {
+@@ -186,10 +207,19 @@ struct rsocket {
int remote_sge;
struct rs_sge remote_sgl;
uint32_t rbuf_size;
struct ibv_mr *rmr;
-@@ -201,6 +227,18 @@ struct rsocket {
+@@ -201,6 +231,18 @@ struct rsocket {
uint8_t *sbuf;
};
void rs_configure(void)
{
FILE *f;
-@@ -251,6 +289,15 @@ void rs_configure(void)
- if (def_wmem < 1)
- def_wmem = 1;
- }
+@@ -247,9 +289,17 @@ void rs_configure(void)
+ if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
+ fscanf(f, "%u", &def_wmem);
+ fclose(f);
++ if (def_wmem < RS_SNDLOWAT)
++ def_wmem = RS_SNDLOWAT << 1;
++ }
+
+ if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) {
+ fscanf(f, "%hu", &def_iomap_size);
+ fclose(f);
-+
+
+- if (def_wmem < 1)
+- def_wmem = 1;
+ /* round to supported values */
+ def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
+ def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
-+ }
+ }
init = 1;
out:
- pthread_mutex_unlock(&mut);
-@@ -287,6 +334,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
rs->sq_size = inherited_rs->sq_size;
rs->rq_size = inherited_rs->rq_size;
rs->ctrl_avail = inherited_rs->ctrl_avail;
} else {
rs->sbuf_size = def_wmem;
rs->rbuf_size = def_mem;
-@@ -294,11 +342,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
rs->sq_size = def_sqsize;
rs->rq_size = def_rqsize;
rs->ctrl_avail = RS_QP_CTRL_SIZE;
return rs;
}
-@@ -336,6 +388,8 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
static int rs_init_bufs(struct rsocket *rs)
{
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return -1;
-@@ -348,11 +402,21 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
if (!rs->smr)
return -1;
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
return -1;
-@@ -452,6 +516,42 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs)
return 0;
}
static void rs_free(struct rsocket *rs)
{
if (rs->index >= 0)
-@@ -472,15 +572,20 @@ static void rs_free(struct rsocket *rs)
+@@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs)
free(rs->rbuf);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -492,9 +597,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
+@@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
struct rs_conn_data *conn)
{
conn->version = 1;
conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
conn->target_sgl.length = htonl(RS_SGL_SIZE);
-@@ -518,6 +625,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
(!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
rs->opts = RS_OPT_SWAP_SGL;
rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
rs->target_sgl[0].length = ntohl(conn->data_buf.length);
rs->target_sgl[0].key = ntohl(conn->data_buf.key);
-@@ -2020,3 +2134,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
+@@ -803,6 +920,20 @@ static int rs_write_data(struct rsocket *rs,
+ flags, addr, rkey);
+ }
+
++static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
++ struct ibv_sge *sgl, int nsge, int flags)
++{
++ uint64_t addr;
++
++ rs->sseq_no++;
++ rs->sqe_avail--;
++ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
++
++ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
++ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
++ flags, addr, rs->remote_iomap.key);
++}
++
+ static uint32_t rs_sbuf_left(struct rsocket *rs)
+ {
+ return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
+@@ -1046,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs)
+ */
+ static int rs_can_send(struct rsocket *rs)
+ {
+- return rs->sqe_avail && rs->sbuf_bytes_avail &&
++ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+ (rs->sseq_no != rs->sseq_comp) &&
+ (rs->target_sgl[rs->target_sge].length != 0);
+ }
+@@ -1216,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
+ return rrecvv(socket, iov, iovcnt, 0);
+ }
+
++static int rs_send_iomaps(struct rsocket *rs, int flags)
++{
++ struct rs_iomap_mr *iomr;
++ struct ibv_sge sge;
++ struct rs_iomap iom;
++ int ret;
++
++ fastlock_acquire(&rs->iomap_lock);
++ while (!dlist_empty(&rs->iomap_queue)) {
++ if (!rs_can_send(rs)) {
++ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
++ rs_conn_can_send);
++ if (ret)
++ break;
++ if (!(rs->state & rs_connect_wr)) {
++ ret = ERR(ECONNRESET);
++ break;
++ }
++ }
++
++ iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
++ if (!(rs->opts & RS_OPT_SWAP_SGL)) {
++ iom.offset = iomr->offset;
++ iom.sge.addr = (uintptr_t) iomr->mr->addr;
++ iom.sge.length = iomr->mr->length;
++ iom.sge.key = iomr->mr->rkey;
++ } else {
++ iom.offset = bswap_64(iomr->offset);
++ iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
++ iom.sge.length = bswap_32(iomr->mr->length);
++ iom.sge.key = bswap_32(iomr->mr->rkey);
++ }
++
++ if (rs->sq_inline >= sizeof iom) {
++ sge.addr = (uintptr_t) &iom;
++ sge.length = sizeof iom;
++ sge.lkey = 0;
++ ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
++ } else if (rs_sbuf_left(rs) >= sizeof iom) {
++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
++ rs->ssgl[0].length = sizeof iom;
++ ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
++ if (rs_sbuf_left(rs) > sizeof iom)
++ rs->ssgl[0].addr += sizeof iom;
++ else
++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
++ } else {
++ rs->ssgl[0].length = rs_sbuf_left(rs);
++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
++ rs->ssgl[0].length);
++ rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
++ memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
++ rs->ssgl[1].length);
++ ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
++ }
++ dlist_remove(&iomr->entry);
++ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
++ if (ret)
++ break;
++ }
++
++ rs->iomap_pending = dlist_empty(&rs->iomap_queue);
++ fastlock_release(&rs->iomap_lock);
++ return ret;
++}
++
+ /*
+ * We overlap sending the data, by posting a small work request immediately,
+ * then increasing the size of the send on each iteration.
+@@ -1239,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ }
+
+ fastlock_acquire(&rs->slock);
++ if (rs->iomap_pending) {
++ ret = rs_send_iomaps(rs, flags);
++ if (ret)
++ goto out;
++ }
+ for (left = len; left; left -= xfer_size, buf += xfer_size) {
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+@@ -1289,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ if (ret)
+ break;
+ }
++out:
+ fastlock_release(&rs->slock);
+
+ return (ret && left == len) ? ret : len - left;
+@@ -1347,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ len += iov[i].iov_len;
+
+ fastlock_acquire(&rs->slock);
++ if (rs->iomap_pending) {
++ ret = rs_send_iomaps(rs, flags);
++ if (ret)
++ goto out;
++ }
+ for (left = len; left; left -= xfer_size) {
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+@@ -1395,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ if (ret)
+ break;
+ }
++out:
+ fastlock_release(&rs->slock);
+
+ return (ret && left == len) ? ret : len - left;
+@@ -1814,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname,
+ case SO_SNDBUF:
+ if (!rs->sbuf)
+ rs->sbuf_size = (*(uint32_t *) optval) << 1;
++ if (rs->sbuf_size < RS_SNDLOWAT)
++ rs->sbuf_size = RS_SNDLOWAT << 1;
+ ret = 0;
+ break;
+ case SO_LINGER:
+@@ -2020,3 +2232,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
va_end(args);
return ret;
}