From: Sean Hefty Date: Wed, 24 Oct 2012 05:16:29 +0000 (-0700) Subject: Refresh of rs-iomap X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=6f85e0f5188b4ad73a131ca86c5b33fc31e32c37;p=~shefty%2Flibrdmacm.git Refresh of rs-iomap --- diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h index 21477e41..86345176 100644 --- a/include/rdma/rsocket.h +++ b/include/rdma/rsocket.h @@ -76,7 +76,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); enum { RDMA_SQSIZE, RDMA_RQSIZE, - RDMA_INLINE + RDMA_INLINE, + RDMA_IOMAPSIZE }; int rsetsockopt(int socket, int level, int optname, diff --git a/src/rsocket.c b/src/rsocket.c index 22e474da..d620d049 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -89,7 +89,7 @@ static uint32_t polling_time = 10; enum { RS_OP_DATA, RS_OP_RSVD_DATA_MORE, - RS_OP_RSVD_DRA, + RS_OP_WRITE, RS_OP_RSVD_DRA_MORE, RS_OP_SGL, RS_OP_RSVD, @@ -298,8 +298,8 @@ void rs_configure(void) fclose(f); /* round to supported values */ - def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8); - def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8); + def_iomap_size = (uint8_t) rs_value_to_scale( + (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8) } init = 1; out: @@ -546,12 +546,12 @@ static void rs_free_iomappings(struct rsocket *rs) while (!dlist_empty(&rs->iomap_list)) { iomr = container_of(rs->iomap_list.next, struct rs_iomap_mr, entry); - riounmap(iomr->mr->addr, iomr->mr->length); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); } while (!dlist_empty(&rs->iomap_queue)) { iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry); - riounmap(iomr->mr->addr, iomr->mr->length); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); } } @@ -870,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return rs_do_connect(rs); } -static int rs_post_write(struct rsocket *rs, +static int rs_post_write_msg(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t imm_data, int flags, uint64_t addr, uint32_t rkey) @@ -890,6 +890,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int rs_post_write(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags, + uint64_t addr, uint32_t rkey) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_RDMA_WRITE; + wr.send_flags = flags; + wr.wr.rdma.remote_addr = addr; + wr.wr.rdma.rkey = rkey; + + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -916,8 +935,22 @@ static int rs_write_data(struct rsocket *rs, rs->target_sge = 0; } - return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), - flags, addr, rkey); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), + flags, addr, rkey); +} + +static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, + struct ibv_sge *sgl, int nsge, uint32_t length, int flags) +{ + uint64_t addr; + + rs->sseq_no++; + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + + addr = iom->sge.addr + offset - iom->offset; + return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), + flags, addr, iom->sge.key); } static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, @@ -930,8 +963,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); - return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), - flags, addr, rs->remote_iomap.key); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), + flags, addr, rs->remote_iomap.key); } static uint32_t rs_sbuf_left(struct rsocket *rs) @@ -962,12 +995,12 @@ static void rs_send_credits(struct rsocket *rs) ibsge.lkey = 0; ibsge.length = sizeof(sge); - rs_post_write(rs, &ibsge, 1, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), - IBV_SEND_INLINE, - rs->remote_sgl.addr + - rs->remote_sge * sizeof(struct rs_sge), - rs->remote_sgl.key); + rs_post_write_msg(rs, &ibsge, 1, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + IBV_SEND_INLINE, + rs->remote_sgl.addr + + rs->remote_sge * sizeof(struct rs_sge), + rs->remote_sgl.key); rs->rbuf_bytes_avail -= rs->rbuf_size >> 1; rs->rbuf_free_offset += rs->rbuf_size >> 1; @@ -976,8 +1009,9 @@ static void rs_send_credits(struct rsocket *rs) if (++rs->remote_sge == rs->remote_sgl.length) rs->remote_sge = 0; } else { - rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0); + rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + 0, 0, 0); } } @@ -1011,6 +1045,9 @@ static int rs_poll_cq(struct rsocket *rs) case RS_OP_SGL: rs->sseq_comp = (uint16_t) rs_msg_data(imm_data); break; + case RS_OP_IOMAP_SGL: + /* The iomap was updated, that's nice to know. */ + break; case RS_OP_CTRL: if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; @@ -1019,6 +1056,9 @@ static int rs_poll_cq(struct rsocket *rs) rs->state &= ~rs_connect_rd; } break; + case RS_OP_WRITE: + /* We really shouldn't be here. */ + break; default: rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data); rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data); @@ -1036,6 +1076,10 @@ static int rs_poll_cq(struct rsocket *rs) if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) rs->state = rs_disconnected; break; + case RS_OP_IOMAP_SGL: + rs->sqe_avail++; + rs->sbuf_bytes_avail += sizeof(struct rs_iomap); + break; default: rs->sqe_avail++; rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); @@ -1935,8 +1979,8 @@ int rshutdown(int socket, int how) if ((rs->state & rs_connected) && rs->ctrl_avail) { rs->ctrl_avail--; - ret = rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); + ret = rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); } } @@ -2090,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname, if (rs->sq_inline < RS_MIN_INLINE) rs->sq_inline = RS_MIN_INLINE; break; + case RDMA_IOMAPSIZE: + rs->target_iomap_size = (uint16_t) rs_scale_to_value( + (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8); + break; default: break; } @@ -2191,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname, *((int *) optval) = rs->sq_inline; *optlen = sizeof(int); break; + case RDMA_IOMAPSIZE: + *((int *) optval) = rs->target_iomap_size; + *optlen = sizeof(int); + break; default: ret = ENOTSUP; break; @@ -2342,7 +2394,94 @@ out: return ret; } +static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset) +{ + int i; + + for (i = 0; i < rs->target_iomap_size; i++) { + if (offset >= rs->target_iomap[i].offset && + offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length) + return &rs->target_iomap[i]; + } + return NULL; +} + size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) { + struct rsocket *rs; + struct rs_iomap *iom = NULL; + struct ibv_sge sge; + size_t left; + uint32_t xfer_size, olen = RS_OLAP_START_SIZE; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->slock); + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (left = count; left; + left -= xfer_size, buf += xfer_size, offset += xfer_size) { + if (!iom || offset > iom->offset + iom->sge.length) { + iom = rs_find_iomap(rs, offset); + if (!iom) + break; + } + + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + if (olen < left) { + xfer_size = olen; + if (olen < RS_MAX_TRANSFER) + olen <<= 1; + } else { + xfer_size = left; + } + + if (xfer_size > rs->sbuf_bytes_avail) + xfer_size = rs->sbuf_bytes_avail; + if (xfer_size > iom->offset + iom->sge.length - offset) + xfer_size = iom->offset + iom->sge.length - offset; + + if (xfer_size <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = xfer_size; + sge.lkey = 0; + ret = rs_write_direct(rs, iom, offset, &sge, 1, + xfer_size, IBV_SEND_INLINE); + } else if (xfer_size <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); + rs->ssgl[0].length = xfer_size; + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0); + if (xfer_size < rs_sbuf_left(rs)) + rs->ssgl[0].addr += xfer_size; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + if (ret) + break; + } +out: + fastlock_release(&rs->slock); + return (ret && left == count) ? ret : count - left; }