]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add support for iWarp
authorSean Hefty <sean.hefty@intel.com>
Thu, 11 Apr 2013 17:05:29 +0000 (10:05 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 13 Apr 2013 14:49:39 +0000 (07:49 -0700)
iWarp does not support RDMA writes with immediate data.
Instead of sending messages using immediate data, allow
the rsocket protocol to exchange messages using sends.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/rsocket.c

index ca771165dec3b2c2395a11537dccc55eb5f6c98b..4b3505aed29826ef1a8d186738a4202fe6720ba9 100644 (file)
@@ -123,6 +123,7 @@ enum {
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
 #define rs_msg_op(imm_data)   (imm_data >> 29)
 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE          sizeof(uint32_t)
 
 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
 #define rs_send_wr_id(data) ((uint64_t) data)
@@ -207,7 +208,12 @@ enum rs_state {
        rs_error           =                0x2000,
 };
 
-#define RS_OPT_SWAP_SGL 1
+#define RS_OPT_SWAP_SGL        (1 << 0)
+/*
+ * iWarp does not support RDMA write with immediate data.  For iWarp, we
+ * transfer rsocket messages as inline sends.
+ */
+#define RS_OPT_MSG_SEND        (1 << 1)
 
 union socket_addr {
        struct sockaddr         sa;
@@ -284,6 +290,7 @@ struct rsocket {
                        volatile struct rs_sge    *target_sgl;
                        struct rs_iomap   *target_iomap;
 
+                       int               rbuf_msg_index;
                        int               rbuf_bytes_avail;
                        int               rbuf_free_offset;
                        int               rbuf_offset;
@@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
+       uint32_t rbuf_msg_size;
        size_t len;
 
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
@@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs)
        if (rs->target_iomap_size)
                rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
 
-       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+       rbuf_msg_size = rs->rbuf_size;
+       if (rs->opts & RS_OPT_MSG_SEND)
+               rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
+       rs->rbuf = calloc(rbuf_msg_size, 1);
        if (!rs->rbuf)
                return ERR(ENOMEM);
 
-       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
        if (!rs->rmr)
                return -1;
 
@@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs)
 
 static int ds_init_bufs(struct ds_qp *qp)
 {
-       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
-                         sizeof(*qp->rbuf));
+       qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
        if (!qp->rbuf)
                return ERR(ENOMEM);
 
@@ -740,11 +750,25 @@ err1:
 static inline int rs_post_recv(struct rsocket *rs)
 {
        struct ibv_recv_wr wr, *bad;
+       struct ibv_sge sge;
 
-       wr.wr_id = rs_recv_wr_id(0);
        wr.next = NULL;
-       wr.sg_list = NULL;
-       wr.num_sge = 0;
+       if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_recv_wr_id(0);
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+       } else {
+               wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
+               sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+                          (rs->rbuf_msg_index * RS_MSG_SIZE);
+               sge.length = RS_MSG_SIZE;
+               sge.lkey = rs->rmr->lkey;
+
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               if(++rs->rbuf_msg_index == rs->rq_size)
+                       rs->rbuf_msg_index = 0;
+       }
 
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
 }
@@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs)
        int i, ret;
 
        rs_set_qp_size(rs);
+       if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
+               rs->opts |= RS_OPT_MSG_SEND;
        ret = rs_init_bufs(rs);
        if (ret)
                return ret;
@@ -1508,6 +1534,21 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return ret;
 }
 
+/*
+TODO:
+if MSG_SEND opt is set
+       if !sgl
+               post send with imm_data inline
+       else
+               post rdma write
+               pst send with imm_data inline
+else
+       existing code flow - rdma write with immediate
+
+post_write - rdma write only
+post msg - immediate data or send only
+post write msg - rdma write with msg (imm or send after)
+*/
 static int rs_post_write_msg(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
@@ -1528,6 +1569,29 @@ static int rs_post_write_msg(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
+{
+       struct ibv_send_wr wr, *bad;
+       struct ibv_sge sge;
+
+       wr.wr_id = rs_send_wr_id(msg);
+       wr.next = NULL;
+       if (rs->opts & RS_OPT_MSG_SEND) {
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               wr.opcode = IBV_WR_SEND;
+               wr.send_flags = IBV_SEND_INLINE;
+       } else {
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = flags;
+               wr.imm_data = htonl(imm_data);
+       }
+
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
 static int rs_post_write(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t wr_data, int flags,
@@ -1686,7 +1750,7 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       uint32_t imm_data;
+       uint32_t msg;
        int ret, rcnt = 0;
 
        while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
@@ -1695,19 +1759,25 @@ static int rs_poll_cq(struct rsocket *rs)
                                continue;
                        rcnt++;
 
-                       imm_data = ntohl(wc.imm_data);
-                       switch (rs_msg_op(imm_data)) {
+                       if (wc.wc_flags & IBV_WC_WITH_IMM) {
+                               msg = ntohl(wc.imm_data);
+                       } else {
+                               msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
+                                       [rs_wr_data(wc.wr_id)];
+
+                       }
+                       switch (rs_msg_op(msg)) {
                        case RS_OP_SGL:
-                               rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+                               rs->sseq_comp = (uint16_t) rs_msg_data(msg);
                                break;
                        case RS_OP_IOMAP_SGL:
                                /* The iomap was updated, that's nice to know. */
                                break;
                        case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+                               if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
                                        return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+                               } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
                                        rs->state &= ~rs_readable;
                                }
                                break;
@@ -1715,8 +1785,8 @@ static int rs_poll_cq(struct rsocket *rs)
                                /* We really shouldn't be here. */
                                break;
                        default:
-                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
+                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
                                if (++rs->rmsg_tail == rs->rq_size + 1)
                                        rs->rmsg_tail = 0;
                                break;
@@ -2037,6 +2107,11 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
+       if (rs->opts & RS_OPT_MSG_SEND) {
+               return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       }
        return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
               (rs->sseq_no != rs->sseq_comp) &&
               (rs->target_sgl[rs->target_sge].length != 0);