]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add support for iWarp iwarp
authorSean Hefty <sean.hefty@intel.com>
Thu, 11 Apr 2013 17:05:29 +0000 (10:05 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 13 Apr 2013 14:49:39 +0000 (07:49 -0700)
iWarp does not support RDMA writes with immediate data.
Instead of sending messages using immediate data, allow
the rsocket protocol to exchange messages using sends.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index ca771165dec3b2c2395a11537dccc55eb5f6c98b..f529a3c977d4fb21b7f093a1dbef6e8a05c83bb5 100644 (file)
@@ -123,6 +123,7 @@ enum {
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
 #define rs_msg_op(imm_data)   (imm_data >> 29)
 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE          sizeof(uint32_t)
 
 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
 #define rs_send_wr_id(data) ((uint64_t) data)
@@ -207,7 +208,12 @@ enum rs_state {
        rs_error           =                0x2000,
 };
 
-#define RS_OPT_SWAP_SGL 1
+#define RS_OPT_SWAP_SGL        (1 << 0)
+/*
+ * iWarp does not support RDMA write with immediate data.  For iWarp, we
+ * transfer rsocket messages as inline sends.
+ */
+#define RS_OPT_MSG_SEND        (1 << 1)
 
 union socket_addr {
        struct sockaddr         sa;
@@ -284,6 +290,7 @@ struct rsocket {
                        volatile struct rs_sge    *target_sgl;
                        struct rs_iomap   *target_iomap;
 
+                       int               rbuf_msg_index;
                        int               rbuf_bytes_avail;
                        int               rbuf_free_offset;
                        int               rbuf_offset;
@@ -600,15 +607,15 @@ static void rs_set_qp_size(struct rsocket *rs)
 
        if (rs->sq_size > max_size)
                rs->sq_size = max_size;
-       else if (rs->sq_size < 2)
-               rs->sq_size = 2;
+       else if (rs->sq_size < 4)
+               rs->sq_size = 4;
        if (rs->sq_size <= (RS_QP_CTRL_SIZE << 2))
-               rs->ctrl_avail = 1;
+               rs->ctrl_avail = 2;
 
        if (rs->rq_size > max_size)
                rs->rq_size = max_size;
-       else if (rs->rq_size < 2)
-               rs->rq_size = 2;
+       else if (rs->rq_size < 4)
+               rs->rq_size = 4;
 }
 
 static void ds_set_qp_size(struct rsocket *rs)
@@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
+       uint32_t rbuf_msg_size;
        size_t len;
 
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
@@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs)
        if (rs->target_iomap_size)
                rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
 
-       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+       rbuf_msg_size = rs->rbuf_size;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
+       rs->rbuf = calloc(rbuf_msg_size, 1);
        if (!rs->rbuf)
                return ERR(ENOMEM);
 
-       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
        if (!rs->rmr)
                return -1;
 
@@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs)
 
 static int ds_init_bufs(struct ds_qp *qp)
 {
-       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
-                         sizeof(*qp->rbuf));
+       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
        if (!qp->rbuf)
                return ERR(ENOMEM);
 
@@ -740,11 +750,25 @@ err1:
 static inline int rs_post_recv(struct rsocket *rs)
 {
        struct ibv_recv_wr wr, *bad;
+       struct ibv_sge sge;
 
-       wr.wr_id = rs_recv_wr_id(0);
        wr.next = NULL;
-       wr.sg_list = NULL;
-       wr.num_sge = 0;
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_recv_wr_id(0);
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+       } else {
+               wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
+               sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+                          (rs->rbuf_msg_index * RS_MSG_SIZE);
+               sge.length = RS_MSG_SIZE;
+               sge.lkey = rs->rmr->lkey;
+
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               if(++rs->rbuf_msg_index == rs->rq_size)
+                       rs->rbuf_msg_index = 0;
+       }
 
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
 }
@@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs)
        int i, ret;
 
        rs_set_qp_size(rs);
+       if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
+               rs->opts |= RS_OPT_MSG_SEND;
        ret = rs_init_bufs(rs);
        if (ret)
                return ret;
@@ -1508,22 +1534,28 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return ret;
 }
 
-static int rs_post_write_msg(struct rsocket *rs,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags,
-                        uint64_t addr, uint32_t rkey)
+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
 {
        struct ibv_send_wr wr, *bad;
+       struct ibv_sge sge;
 
-       wr.wr_id = rs_send_wr_id(imm_data);
+       wr.wr_id = rs_send_wr_id(msg);
        wr.next = NULL;
-       wr.sg_list = sgl;
-       wr.num_sge = nsge;
-       wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-       wr.send_flags = flags;
-       wr.imm_data = htonl(imm_data);
-       wr.wr.rdma.remote_addr = addr;
-       wr.wr.rdma.rkey = rkey;
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = 0;
+               wr.imm_data = htonl(msg);
+       } else {
+               sge.addr = (uintptr_t) &msg;
+               sge.lkey = 0;
+               sge.length = sizeof msg;
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               wr.opcode = IBV_WR_SEND;
+               wr.send_flags = IBV_SEND_INLINE;
+       }
 
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
@@ -1547,6 +1579,34 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_write_msg(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint32_t msg, int flags,
+                        uint64_t addr, uint32_t rkey)
+{
+       struct ibv_send_wr wr, *bad;
+       int ret;
+
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_send_wr_id(msg);
+               wr.next = NULL;
+               wr.sg_list = sgl;
+               wr.num_sge = nsge;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = flags;
+               wr.imm_data = htonl(msg);
+               wr.wr.rdma.remote_addr = addr;
+               wr.wr.rdma.rkey = rkey;
+
+               return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+       } else {
+               ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
+               if (!ret)
+                       ret = rs_post_msg(rs, msg);
+               return ret;
+       }
+}
+
 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
                        uint32_t wr_data)
 {
@@ -1578,6 +1638,8 @@ static int rs_write_data(struct rsocket *rs,
 
        rs->sseq_no++;
        rs->sqe_avail--;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rs->sqe_avail--;
        rs->sbuf_bytes_avail -= length;
 
        addr = rs->target_sgl[rs->target_sge].addr;
@@ -1615,6 +1677,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
 
        rs->sseq_no++;
        rs->sqe_avail--;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rs->sqe_avail--;
        rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
 
        addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
@@ -1636,6 +1700,9 @@ static void rs_send_credits(struct rsocket *rs)
        rs->ctrl_avail--;
        rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
        if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
+               if (rs->opts & RS_OPT_MSG_SEND)
+                       rs->ctrl_avail--;
+
                if (!(rs->opts & RS_OPT_SWAP_SGL)) {
                        sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
                        sge.key = rs->rmr->rkey;
@@ -1664,17 +1731,21 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write_msg(rs, NULL, 0,
-                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                                 0, 0, 0);
+               rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
        }
 }
 
 static int rs_give_credits(struct rsocket *rs)
 {
-       return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
-               ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-              rs->ctrl_avail && (rs->state & rs_connected);
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+                       ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+                      rs->ctrl_avail && (rs->state & rs_connected);
+       } else {
+               return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+                       ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+                      (rs->ctrl_avail > 1) && (rs->state & rs_connected);
+       }
 }
 
 static void rs_update_credits(struct rsocket *rs)
@@ -1686,7 +1757,7 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       uint32_t imm_data;
+       uint32_t msg;
        int ret, rcnt = 0;
 
        while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
@@ -1695,19 +1766,25 @@ static int rs_poll_cq(struct rsocket *rs)
                                continue;
                        rcnt++;
 
-                       imm_data = ntohl(wc.imm_data);
-                       switch (rs_msg_op(imm_data)) {
+                       if (wc.wc_flags & IBV_WC_WITH_IMM) {
+                               msg = ntohl(wc.imm_data);
+                       } else {
+                               msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
+                                       [rs_wr_data(wc.wr_id)];
+
+                       }
+                       switch (rs_msg_op(msg)) {
                        case RS_OP_SGL:
-                               rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+                               rs->sseq_comp = (uint16_t) rs_msg_data(msg);
                                break;
                        case RS_OP_IOMAP_SGL:
                                /* The iomap was updated, that's nice to know. */
                                break;
                        case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+                               if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
                                        return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+                               } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
                                        rs->state &= ~rs_readable;
                                }
                                break;
@@ -1715,8 +1792,8 @@ static int rs_poll_cq(struct rsocket *rs)
                                /* We really shouldn't be here. */
                                break;
                        default:
-                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
+                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
                                if (++rs->rmsg_tail == rs->rq_size + 1)
                                        rs->rmsg_tail = 0;
                                break;
@@ -2037,9 +2114,15 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
-       return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-              (rs->sseq_no != rs->sseq_comp) &&
-              (rs->target_sgl[rs->target_sge].length != 0);
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       } else {
+               return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       }
 }
 
 static int ds_can_send(struct rsocket *rs)
@@ -3011,8 +3094,7 @@ int rshutdown(int socket, int how)
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
                        rs->ctrl_avail--;
-                       ret = rs_post_write_msg(rs, NULL, 0,
-                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+                       ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
                }
        }