From c375e043233dd6d8b876b4aabdc1ee4e97462ccf Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Tue, 23 Oct 2012 17:07:33 -0700 Subject: [PATCH] Refresh of rs-iomap --- src/rsocket.c | 108 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 5 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index ed708d43..22e474da 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -55,6 +55,7 @@ #define RS_OLAP_START_SIZE 2048 #define RS_MAX_TRANSFER 65536 +#define RS_SNDLOWAT 64 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 @@ -79,7 +80,10 @@ static uint32_t polling_time = 10; * for data transfers: * bits [28:0]: bytes transfered * for control messages: + * SGL, CTRL * bits [28-0]: receive credits granted + * IOMAP_SGL + * bits [28-16]: reserved, bits [15-0]: index */ enum { @@ -89,7 +93,7 @@ enum { RS_OP_RSVD_DRA_MORE, RS_OP_SGL, RS_OP_RSVD, - RS_OP_RSVD_DRA_SGL, + RS_OP_IOMAP_SGL, RS_OP_CTRL }; #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) @@ -285,9 +289,8 @@ void rs_configure(void) if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) { fscanf(f, "%u", &def_wmem); fclose(f); - - if (def_wmem < 1) - def_wmem = 1; + if (def_wmem < RS_SNDLOWAT) + def_wmem = RS_SNDLOWAT << 1; } if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) { @@ -917,6 +920,20 @@ static int rs_write_data(struct rsocket *rs, flags, addr, rkey); } +static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, + struct ibv_sge *sgl, int nsge, int flags) +{ + uint64_t addr; + + rs->sseq_no++; + rs->sqe_avail--; + rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); + + addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); + return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), + flags, addr, rs->remote_iomap.key); +} + static uint32_t rs_sbuf_left(struct rsocket *rs) { return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) - @@ -1160,7 +1177,7 @@ static int rs_poll_all(struct rsocket *rs) */ static int rs_can_send(struct rsocket *rs) { - return rs->sqe_avail && rs->sbuf_bytes_avail && + return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && (rs->sseq_no != rs->sseq_comp) && (rs->target_sgl[rs->target_sge].length != 0); } @@ -1330,6 +1347,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt) return rrecvv(socket, iov, iovcnt, 0); } +static int rs_send_iomaps(struct rsocket *rs, int flags) +{ + struct rs_iomap_mr *iomr; + struct ibv_sge sge; + struct rs_iomap iom; + int ret; + + fastlock_acquire(&rs->iomap_lock); + while (!dlist_empty(&rs->iomap_queue)) { + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry); + if (!(rs->opts & RS_OPT_SWAP_SGL)) { + iom.offset = iomr->offset; + iom.sge.addr = (uintptr_t) iomr->mr->addr; + iom.sge.length = iomr->mr->length; + iom.sge.key = iomr->mr->rkey; + } else { + iom.offset = bswap_64(iomr->offset); + iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr); + iom.sge.length = bswap_32(iomr->mr->length); + iom.sge.key = bswap_32(iomr->mr->rkey); + } + + if (rs->sq_inline >= sizeof iom) { + sge.addr = (uintptr_t) &iom; + sge.length = sizeof iom; + sge.lkey = 0; + ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE); + } else if (rs_sbuf_left(rs) >= sizeof iom) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom); + rs->ssgl[0].length = sizeof iom; + ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0); + if (rs_sbuf_left(rs) > sizeof iom) + rs->ssgl[0].addr += sizeof iom; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, + rs->ssgl[0].length); + rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length; + memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length, + rs->ssgl[1].length); + ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + dlist_remove(&iomr->entry); + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + if (ret) + break; + } + + rs->iomap_pending = dlist_empty(&rs->iomap_queue); + fastlock_release(&rs->iomap_lock); + return ret; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1353,6 +1437,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } fastlock_acquire(&rs->slock); + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), @@ -1403,6 +1492,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1461,6 +1551,11 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags len += iov[i].iov_len; fastlock_acquire(&rs->slock); + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), @@ -1509,6 +1604,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1928,6 +2024,8 @@ int rsetsockopt(int socket, int level, int optname, case SO_SNDBUF: if (!rs->sbuf) rs->sbuf_size = (*(uint32_t *) optval) << 1; + if (rs->sbuf_size < RS_SNDLOWAT) + rs->sbuf_size = RS_SNDLOWAT << 1; ret = 0; break; case SO_LINGER: -- 2.41.0