]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of rs-iwarp
authorSean Hefty <sean.hefty@intel.com>
Sat, 13 Apr 2013 19:22:55 +0000 (12:22 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 13 Apr 2013 19:22:55 +0000 (12:22 -0700)
src/rsocket.c

index de15f58ba86223b80544d4cb8e57d463665c8c05..4b3505aed29826ef1a8d186738a4202fe6720ba9 100644 (file)
@@ -123,6 +123,7 @@ enum {
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
 #define rs_msg_op(imm_data)   (imm_data >> 29)
 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE          sizeof(uint32_t)
 
 #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
 #define rs_send_wr_id(data) ((uint64_t) data)
@@ -751,24 +752,22 @@ static inline int rs_post_recv(struct rsocket *rs)
        struct ibv_recv_wr wr, *bad;
        struct ibv_sge sge;
 
-       wr.wr_id = rs_recv_wr_id(0);
        wr.next = NULL;
        if (!(rs->opts & RS_OPT_MSG_SEND)) {
+               wr.wr_id = rs_recv_wr_id(0);
                wr.sg_list = NULL;
                wr.num_sge = 0;
        } else {
+               wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
                sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
                           (rs->rbuf_msg_index * RS_MSG_SIZE);
-               sge.length = sizeof(uint32_t);
+               sge.length = RS_MSG_SIZE;
                sge.lkey = rs->rmr->lkey;
 
                wr.sg_list = &sge;
                wr.num_sge = 1;
                if(++rs->rbuf_msg_index == rs->rq_size)
                        rs->rbuf_msg_index = 0;
-               /* TODO we need wr_id to reference back to receive buffer, so we can
-                * retrieve the incoming message from the buffer
-                */
        }
 
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
@@ -800,7 +799,7 @@ static int rs_create_ep(struct rsocket *rs)
        int i, ret;
 
        rs_set_qp_size(rs);
-       if (rs->cm_id->context->device->transport_type == IBV_TRANSPORT_IWARP)
+       if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
                rs->opts |= RS_OPT_MSG_SEND;
        ret = rs_init_bufs(rs);
        if (ret)
@@ -1535,6 +1534,21 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return ret;
 }
 
+/*
+TODO:
+if MSG_SEND opt is set
+       if !sgl
+               post send with imm_data inline
+       else
+               post rdma write
+               pst send with imm_data inline
+else
+       existing code flow - rdma write with immediate
+
+post_write - rdma write only
+post msg - immediate data or send only
+post write msg - rdma write with msg (imm or send after)
+*/
 static int rs_post_write_msg(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
@@ -1555,6 +1569,29 @@ static int rs_post_write_msg(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
+{
+       struct ibv_send_wr wr, *bad;
+       struct ibv_sge sge;
+
+       wr.wr_id = rs_send_wr_id(msg);
+       wr.next = NULL;
+       if (rs->opts & RS_OPT_MSG_SEND) {
+               wr.sg_list = &sge;
+               wr.num_sge = 1;
+               wr.opcode = IBV_WR_SEND;
+               wr.send_flags = IBV_SEND_INLINE;
+       } else {
+               wr.sg_list = NULL;
+               wr.num_sge = 0;
+               wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+               wr.send_flags = flags;
+               wr.imm_data = htonl(imm_data);
+       }
+
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
 static int rs_post_write(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t wr_data, int flags,
@@ -1713,7 +1750,7 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       uint32_t imm_data;
+       uint32_t msg;
        int ret, rcnt = 0;
 
        while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
@@ -1722,19 +1759,25 @@ static int rs_poll_cq(struct rsocket *rs)
                                continue;
                        rcnt++;
 
-                       imm_data = ntohl(wc.imm_data);
-                       switch (rs_msg_op(imm_data)) {
+                       if (wc.wc_flags & IBV_WC_WITH_IMM) {
+                               msg = ntohl(wc.imm_data);
+                       } else {
+                               msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
+                                       [rs_wr_data(wc.wr_id)];
+
+                       }
+                       switch (rs_msg_op(msg)) {
                        case RS_OP_SGL:
-                               rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+                               rs->sseq_comp = (uint16_t) rs_msg_data(msg);
                                break;
                        case RS_OP_IOMAP_SGL:
                                /* The iomap was updated, that's nice to know. */
                                break;
                        case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+                               if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
                                        return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+                               } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
                                        rs->state &= ~rs_readable;
                                }
                                break;
@@ -1742,8 +1785,8 @@ static int rs_poll_cq(struct rsocket *rs)
                                /* We really shouldn't be here. */
                                break;
                        default:
-                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
+                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
                                if (++rs->rmsg_tail == rs->rq_size + 1)
                                        rs->rmsg_tail = 0;
                                break;
@@ -2064,6 +2107,11 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
+       if (rs->opts & RS_OPT_MSG_SEND) {
+               return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+                      (rs->sseq_no != rs->sseq_comp) &&
+                      (rs->target_sgl[rs->target_sge].length != 0);
+       }
        return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
               (rs->sseq_no != rs->sseq_comp) &&
               (rs->target_sgl[rs->target_sge].length != 0);