]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add support for iWarp
authorSean Hefty <sean.hefty@intel.com>
Thu, 11 Apr 2013 17:05:29 +0000 (10:05 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 13 Apr 2013 14:49:39 +0000 (07:49 -0700)
iWarp does not support RDMA writes with immediate data.
Instead of sending messages using immediate data, allow
the rsocket protocol to exchange messages using sends.

The rsocket protocol remains the same.  RDMA writes are
used for data transfers, with send messages used to transfer
rsocket protocol messages.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index ca771165dec3b2c2395a11537dccc55eb5f6c98b..abdd392ed215955dc1c84597b370ee4f8f2d8953 100644 (file)
@@ -123,6 +123,7 @@ enum {
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
 #define rs_msg_op(imm_data)   (imm_data >> 29)
 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE          sizeof(uint32_t)
 
 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
 #define rs_send_wr_id(data) ((uint64_t) data)
@@ -207,7 +208,12 @@ enum rs_state {
        rs_error           =                0x2000,
 };
 
-#define RS_OPT_SWAP_SGL 1
+#define RS_OPT_SWAP_SGL        (1 << 0)
+/*
+ * iWarp does not support RDMA write with immediate data.  For iWarp, we
+ * transfer rsocket messages as inline sends.
+ */
+#define RS_OPT_MSG_SEND        (1 << 1)
 
 union socket_addr {
        struct sockaddr         sa;
@@ -284,6 +290,7 @@ struct rsocket {
                        volatile struct rs_sge    *target_sgl;
                        struct rs_iomap   *target_iomap;
 
+                       int               rbuf_msg_index;
                        int               rbuf_bytes_avail;
                        int               rbuf_free_offset;
                        int               rbuf_offset;
@@ -600,15 +607,15 @@ static void rs_set_qp_size(struct rsocket *rs)
 
        if (rs->sq_size > max_size)
                rs->sq_size = max_size;
-       else if (rs->sq_size < 2)
-               rs->sq_size = 2;
+       else if (rs->sq_size < 4)
+               rs->sq_size = 4;
        if (rs->sq_size <= (RS_QP_CTRL_SIZE << 2))
-               rs->ctrl_avail = 1;
+               rs->ctrl_avail = 2;
 
        if (rs->rq_size > max_size)
                rs->rq_size = max_size;
-       else if (rs->rq_size < 2)
-               rs->rq_size = 2;
+       else if (rs->rq_size < 4)
+               rs->rq_size = 4;
 }
 
 static void ds_set_qp_size(struct rsocket *rs)
@@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
+       uint32_t rbuf_msg_size;
        size_t len;
 
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
@@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs)
        if (rs->target_iomap_size)
                rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
 
-       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+       rbuf_msg_size = rs->rbuf_size;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
+       rs->rbuf = calloc(rbuf_msg_size, 1);
        if (!rs->rbuf)
                return ERR(ENOMEM);
 
-       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
        if (!rs->rmr)
                return -1;
 
@@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs)
 
 static int ds_init_bufs(struct ds_qp *qp)
 {
-       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
-                         sizeof(*qp->rbuf));
+       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
        if (!qp->rbuf)
                return ERR(ENOMEM);
 
@@ -740,11 +750,25 @@ err1:
 static inline int rs_post_recv(struct rsocket *rs)
 {
        struct ibv_recv_wr wr, *bad;
+       struct ibv_sge sge;
 
-       wr.wr_id = rs_recv_wr_id(0);
        wr.next = NULL;
-       wr.sg_list = NULL;
-       wr.num_sge = 0;
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_recv_wr_id(0);
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+       } else {
+               wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
+               sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+                          (rs->rbuf_msg_index * RS_MSG_SIZE);
+               sge.length = RS_MSG_SIZE;
+               sge.lkey = rs->rmr->lkey;
+
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               if(++rs->rbuf_msg_index == rs->rq_size)
+                       rs->rbuf_msg_index = 0;
+       }
 
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
 }
@@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs)
        int i, ret;
 
        rs_set_qp_size(rs);
+       if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
+               rs->opts |= RS_OPT_MSG_SEND;
        ret = rs_init_bufs(rs);
        if (ret)
                return ret;
@@ -1224,6 +1250,9 @@ do_connect:
                param.flow_control = 1;
                param.retry_count = 7;
                param.rnr_retry_count = 7;
+               /* work-around: iWarp issues RDMA read during connection */
+               if (rs->opts & RS_OPT_MSG_SEND)
+                       param.initiator_depth = 1;
                rs->retries = 0;
 
                ret = rdma_connect(rs->cm_id, &param);
@@ -1508,22 +1537,28 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return ret;
 }
 
-static int rs_post_write_msg(struct rsocket *rs,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags,
-                        uint64_t addr, uint32_t rkey)
+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
 {
        struct ibv_send_wr wr, *bad;
+       struct ibv_sge sge;
 
-       wr.wr_id = rs_send_wr_id(imm_data);
+       wr.wr_id = rs_send_wr_id(msg);
        wr.next = NULL;
-       wr.sg_list = sgl;
-       wr.num_sge = nsge;
-       wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-       wr.send_flags = flags;
-       wr.imm_data = htonl(imm_data);
-       wr.wr.rdma.remote_addr = addr;
-       wr.wr.rdma.rkey = rkey;
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = 0;
+               wr.imm_data = htonl(msg);
+       } else {
+               sge.addr = (uintptr_t) &msg;
+               sge.lkey = 0;
+               sge.length = sizeof msg;
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               wr.opcode = IBV_WR_SEND;
+               wr.send_flags = IBV_SEND_INLINE;
+       }
 
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
@@ -1547,6 +1582,34 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_write_msg(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint32_t msg, int flags,
+                        uint64_t addr, uint32_t rkey)
+{
+       struct ibv_send_wr wr, *bad;
+       int ret;
+
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_send_wr_id(msg);
+               wr.next = NULL;
+               wr.sg_list = sgl;
+               wr.num_sge = nsge;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = flags;
+               wr.imm_data = htonl(msg);
+               wr.wr.rdma.remote_addr = addr;
+               wr.wr.rdma.rkey = rkey;
+
+               return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+       } else {
+               ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
+               if (!ret)
+                       ret = rs_post_msg(rs, msg);
+               return ret;
+       }
+}
+
 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
                        uint32_t wr_data)
 {
@@ -1578,6 +1641,8 @@ static int rs_write_data(struct rsocket *rs,
 
        rs->sseq_no++;
        rs->sqe_avail--;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rs->sqe_avail--;
        rs->sbuf_bytes_avail -= length;
 
        addr = rs->target_sgl[rs->target_sge].addr;
@@ -1615,6 +1680,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
 
        rs->sseq_no++;
        rs->sqe_avail--;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rs->sqe_avail--;
        rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
 
        addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
@@ -1636,6 +1703,9 @@ static void rs_send_credits(struct rsocket *rs)
        rs->ctrl_avail--;
        rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
        if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
+               if (rs->opts & RS_OPT_MSG_SEND)
+                       rs->ctrl_avail--;
+
                if (!(rs->opts & RS_OPT_SWAP_SGL)) {
                        sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
                        sge.key = rs->rmr->rkey;
@@ -1664,17 +1734,21 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write_msg(rs, NULL, 0,
-                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                                 0, 0, 0);
+               rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
        }
 }
 
 static int rs_give_credits(struct rsocket *rs)
 {
-       return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
-               ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-              rs->ctrl_avail && (rs->state & rs_connected);
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+                       ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+                      rs->ctrl_avail && (rs->state & rs_connected);
+       } else {
+               return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+                       ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+                      (rs->ctrl_avail > 1) && (rs->state & rs_connected);
+       }
 }
 
 static void rs_update_credits(struct rsocket *rs)
@@ -1686,7 +1760,7 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       uint32_t imm_data;
+       uint32_t msg;
        int ret, rcnt = 0;
 
        while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
@@ -1695,19 +1769,25 @@ static int rs_poll_cq(struct rsocket *rs)
                                continue;
                        rcnt++;
 
-                       imm_data = ntohl(wc.imm_data);
-                       switch (rs_msg_op(imm_data)) {
+                       if (wc.wc_flags & IBV_WC_WITH_IMM) {
+                               msg = ntohl(wc.imm_data);
+                       } else {
+                               msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size))
+                                       [rs_wr_data(wc.wr_id)];
+
+                       }
+                       switch (rs_msg_op(msg)) {
                        case RS_OP_SGL:
-                               rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+                               rs->sseq_comp = (uint16_t) rs_msg_data(msg);
                                break;
                        case RS_OP_IOMAP_SGL:
                                /* The iomap was updated, that's nice to know. */
                                break;
                        case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+                               if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
                                        return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+                               } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
                                        rs->state &= ~rs_readable;
                                }
                                break;
@@ -1715,8 +1795,8 @@ static int rs_poll_cq(struct rsocket *rs)
                                /* We really shouldn't be here. */
                                break;
                        default:
-                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
+                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
                                if (++rs->rmsg_tail == rs->rq_size + 1)
                                        rs->rmsg_tail = 0;
                                break;
@@ -2037,9 +2117,15 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
-       return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-              (rs->sseq_no != rs->sseq_comp) &&
-              (rs->target_sgl[rs->target_sge].length != 0);
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       } else {
+               return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       }
 }
 
 static int ds_can_send(struct rsocket *rs)
@@ -3011,8 +3097,7 @@ int rshutdown(int socket, int how)
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
                        rs->ctrl_avail--;
-                       ret = rs_post_write_msg(rs, NULL, 0,
-                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+                       ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
                }
        }