]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of rs-iomap
authorSean Hefty <sean.hefty@intel.com>
Wed, 24 Oct 2012 05:16:29 +0000 (22:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Wed, 24 Oct 2012 05:16:29 +0000 (22:16 -0700)
include/rdma/rsocket.h
src/rsocket.c

index 21477e413be10712ab957810a9c9bfdc188c71c0..86345176b922238fc4f7bb5ad3c8317b3718fcf9 100644 (file)
@@ -76,7 +76,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
 enum {
        RDMA_SQSIZE,
        RDMA_RQSIZE,
-       RDMA_INLINE
+       RDMA_INLINE,
+       RDMA_IOMAPSIZE
 };
 
 int rsetsockopt(int socket, int level, int optname,
index 22e474da495dacec356d83f5eed115d8d427baec..d620d0499da9c659439ef090a5d9ad2dce0026b1 100644 (file)
@@ -89,7 +89,7 @@ static uint32_t polling_time = 10;
 enum {
        RS_OP_DATA,
        RS_OP_RSVD_DATA_MORE,
-       RS_OP_RSVD_DRA,
+       RS_OP_WRITE,
        RS_OP_RSVD_DRA_MORE,
        RS_OP_SGL,
        RS_OP_RSVD,
@@ -298,8 +298,8 @@ void rs_configure(void)
                fclose(f);
 
                /* round to supported values */
-               def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
-               def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
+               def_iomap_size = (uint8_t) rs_value_to_scale(
+                       (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8)
        }
        init = 1;
 out:
@@ -546,12 +546,12 @@ static void rs_free_iomappings(struct rsocket *rs)
        while (!dlist_empty(&rs->iomap_list)) {
                iomr = container_of(rs->iomap_list.next,
                                    struct rs_iomap_mr, entry);
-               riounmap(iomr->mr->addr, iomr->mr->length);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
        }
        while (!dlist_empty(&rs->iomap_queue)) {
                iomr = container_of(rs->iomap_queue.next,
                                    struct rs_iomap_mr, entry);
-               riounmap(iomr->mr->addr, iomr->mr->length);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
        }
 }
 
@@ -870,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return rs_do_connect(rs);
 }
 
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
                         uint64_t addr, uint32_t rkey)
@@ -890,6 +890,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_write(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint64_t wr_id, int flags,
+                        uint64_t addr, uint32_t rkey)
+{
+       struct ibv_send_wr wr, *bad;
+
+       wr.wr_id = wr_id;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+       wr.opcode = IBV_WR_RDMA_WRITE;
+       wr.send_flags = flags;
+       wr.wr.rdma.remote_addr = addr;
+       wr.wr.rdma.rkey = rkey;
+
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
 /*
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
@@ -916,8 +935,22 @@ static int rs_write_data(struct rsocket *rs,
                        rs->target_sge = 0;
        }
 
-       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-                            flags, addr, rkey);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+                                flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+{
+       uint64_t addr;
+
+       rs->sseq_no++;
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= length;
+
+       addr = iom->sge.addr + offset - iom->offset;
+       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+                            flags, addr, iom->sge.key);
 }
 
 static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
@@ -930,8 +963,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
        rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
 
        addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-                            flags, addr, rs->remote_iomap.key);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+                                flags, addr, rs->remote_iomap.key);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -962,12 +995,12 @@ static void rs_send_credits(struct rsocket *rs)
                ibsge.lkey = 0;
                ibsge.length = sizeof(sge);
 
-               rs_post_write(rs, &ibsge, 1,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                             IBV_SEND_INLINE,
-                             rs->remote_sgl.addr +
-                             rs->remote_sge * sizeof(struct rs_sge),
-                             rs->remote_sgl.key);
+               rs_post_write_msg(rs, &ibsge, 1,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 IBV_SEND_INLINE,
+                                 rs->remote_sgl.addr +
+                                 rs->remote_sge * sizeof(struct rs_sge),
+                                 rs->remote_sgl.key);
 
                rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
                rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -976,8 +1009,9 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write(rs, NULL, 0,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+               rs_post_write_msg(rs, NULL, 0,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 0, 0, 0);
        }
 }
 
@@ -1011,6 +1045,9 @@ static int rs_poll_cq(struct rsocket *rs)
                        case RS_OP_SGL:
                                rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               /* The iomap was updated, that's nice to know. */
+                               break;
                        case RS_OP_CTRL:
                                if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
@@ -1019,6 +1056,9 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state &= ~rs_connect_rd;
                                }
                                break;
+                       case RS_OP_WRITE:
+                               /* We really shouldn't be here. */
+                               break;
                        default:
                                rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
                                rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
@@ -1036,6 +1076,10 @@ static int rs_poll_cq(struct rsocket *rs)
                                if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
                                        rs->state = rs_disconnected;
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               rs->sqe_avail++;
+                               rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+                               break;
                        default:
                                rs->sqe_avail++;
                                rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
@@ -1935,8 +1979,8 @@ int rshutdown(int socket, int how)
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
                        rs->ctrl_avail--;
-                       ret = rs_post_write(rs, NULL, 0,
-                                           rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+                       ret = rs_post_write_msg(rs, NULL, 0,
+                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
                }
        }
 
@@ -2090,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname,
                        if (rs->sq_inline < RS_MIN_INLINE)
                                rs->sq_inline = RS_MIN_INLINE;
                        break;
+               case RDMA_IOMAPSIZE:
+                       rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+                               (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
+                       break;
                default:
                        break;
                }
@@ -2191,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname,
                        *((int *) optval) = rs->sq_inline;
                        *optlen = sizeof(int);
                        break;
+               case RDMA_IOMAPSIZE:
+                       *((int *) optval) = rs->target_iomap_size;
+                       *optlen = sizeof(int);
+                       break;
                default:
                        ret = ENOTSUP;
                        break;
@@ -2342,7 +2394,94 @@ out:
        return ret;
 }
 
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+       int i;
+
+       for (i = 0; i < rs->target_iomap_size; i++) {
+               if (offset >= rs->target_iomap[i].offset &&
+                   offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
+                       return &rs->target_iomap[i];
+       }
+       return NULL;
+}
+
 size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
 {
+       struct rsocket *rs;
+       struct rs_iomap *iom = NULL;
+       struct ibv_sge sge;
+       size_t left;
+       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+       int ret = 0;
+
+       rs = idm_at(&idm, socket);
+       fastlock_acquire(&rs->slock);
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (left = count; left;
+            left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+               if (!iom || offset > iom->offset + iom->sge.length) {
+                       iom = rs_find_iomap(rs, offset);
+                       if (!iom)
+                               break;
+               }
+
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+                       if (!(rs->state & rs_connect_wr)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+               }
+
+               if (olen < left) {
+                       xfer_size = olen;
+                       if (olen < RS_MAX_TRANSFER)
+                               olen <<= 1;
+               } else {
+                       xfer_size = left;
+               }
+
+               if (xfer_size > rs->sbuf_bytes_avail)
+                       xfer_size = rs->sbuf_bytes_avail;
+               if (xfer_size > iom->offset + iom->sge.length - offset)
+                       xfer_size = iom->offset + iom->sge.length - offset;
+
+               if (xfer_size <= rs->sq_inline) {
+                       sge.addr = (uintptr_t) buf;
+                       sge.length = xfer_size;
+                       sge.lkey = 0;
+                       ret = rs_write_direct(rs, iom, offset, &sge, 1,
+                                             xfer_size, IBV_SEND_INLINE);
+               } else if (xfer_size <= rs_sbuf_left(rs)) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+                       rs->ssgl[0].length = xfer_size;
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
+                       if (xfer_size < rs_sbuf_left(rs))
+                               rs->ssgl[0].addr += xfer_size;
+                       else
+                               rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+               } else {
+                       rs->ssgl[0].length = rs_sbuf_left(rs);
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               if (ret)
+                       break;
+       }
+out:
+       fastlock_release(&rs->slock);
 
+       return (ret && left == count) ? ret : count - left;
 }