]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of fast-disc
authorSean Hefty <sean.hefty@intel.com>
Sat, 28 Jul 2012 00:35:36 +0000 (17:35 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 28 Jul 2012 00:35:36 +0000 (17:35 -0700)
src/rsocket.c

index 1e391b30aad1255e659e6b7f4a0e0b250b412f81..908b00c8fbcc128e8ac5c30e4a0774479a4aaa69 100644 (file)
@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
 
 enum {
        RS_OP_DATA,
-       RS_OP_DATA_MORE,
-       RS_OP_DRA,
-       RS_OP_DRA_MORE,
+       RS_OP_RSVD_DATA_MORE,
+       RS_OP_RSVD_DRA,
+       RS_OP_RSVD_DRA_MORE,
        RS_OP_SGL,
-       RS_OP_RSVD1,
-       RS_OP_DRA_SGL,
+       RS_OP_RSVD,
+       RS_OP_RSVD_DRA_SGL,
        RS_OP_CTRL
 };
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
@@ -124,12 +124,11 @@ struct rs_conn_data {
        struct rs_sge     data_buf;
 };
 
-union rs_wr_id {
-       uint64_t          wr_id;
-       struct {
-               uint32_t  reserved; /* sqe_count; */
-               uint32_t  length;
-       };
+enum {
+       RS_WR_RECV,
+       RS_WR_DATA,
+       RS_WR_CTRL,
+       RS_WR_DISCONNECT
 };
 
 #define RS_RECV_WR_ID (~((uint64_t) 0))
@@ -264,17 +263,6 @@ out:
        pthread_mutex_unlock(&mut);
 }
 
-/*
- * We currently generate a completion per send.  sqe_count = 1
- */
-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
-{
-       union rs_wr_id wrid;
-       /* wrid.reserved = sqe_count; */
-       wrid.length = length;
-       return wrid;
-}
-
 static int rs_insert(struct rsocket *rs)
 {
        pthread_mutex_lock(&mut);
@@ -772,21 +760,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return rs_do_connect(rs);
 }
 
-static void rs_shutdown_state(struct rsocket *rs, int state)
-{
-       rs->state &= ~state;
-       if (rs->state == rs_connected)
-               rs->state = rs_disconnected;
-}
-
-static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
+static int rs_post_write(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
                         uint64_t addr, uint32_t rkey)
 {
        struct ibv_send_wr wr, *bad;
 
-       wr.wr_id = wr_id;
+       wr.wr_id = (uint64_t) imm_data;
        wr.next = NULL;
        wr.sg_list = sgl;
        wr.num_sge = nsge;
@@ -803,29 +784,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
  */
-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
+static int rs_write_data(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags)
+                        uint32_t length, int flags)
 {
        uint64_t addr;
        uint32_t rkey;
 
        rs->sseq_no++;
        rs->sqe_avail--;
-       rs->sbuf_bytes_avail -= wr_id.length;
+       rs->sbuf_bytes_avail -= length;
 
        addr = rs->target_sgl[rs->target_sge].addr;
        rkey = rs->target_sgl[rs->target_sge].key;
 
-       rs->target_sgl[rs->target_sge].addr += wr_id.length;
-       rs->target_sgl[rs->target_sge].length -= wr_id.length;
+       rs->target_sgl[rs->target_sge].addr += length;
+       rs->target_sgl[rs->target_sge].length -= length;
 
        if (!rs->target_sgl[rs->target_sge].length) {
                if (++rs->target_sge == RS_SGL_SIZE)
                        rs->target_sge = 0;
        }
 
-       return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
+       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+                            flags, addr, rkey);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -856,7 +838,7 @@ static void rs_send_credits(struct rsocket *rs)
                ibsge.lkey = 0;
                ibsge.length = sizeof(sge);
 
-               rs_post_write(rs, 0, &ibsge, 1,
+               rs_post_write(rs, &ibsge, 1,
                              rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
                              IBV_SEND_INLINE,
                              rs->remote_sgl.addr +
@@ -870,7 +852,7 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write(rs, 0, NULL, 0,
+               rs_post_write(rs, NULL, 0,
                              rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
        }
 }
@@ -891,7 +873,6 @@ static void rs_update_credits(struct rsocket *rs)
 static int rs_poll_cq(struct rsocket *rs)
 {
        struct ibv_wc wc;
-       union rs_wr_id *wr_id;
        uint32_t imm_data;
        int ret, rcnt = 0;
 
@@ -911,7 +892,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-                                       rs_shutdown_state(rs, rs_connect_rd);
+                                       rs->state &= ~rs_connect_rd;
                                }
                                break;
                        default:
@@ -922,12 +903,19 @@ static int rs_poll_cq(struct rsocket *rs)
                                break;
                        }
                } else {
-                       if (wc.wr_id) {
-                               wr_id = (union rs_wr_id *) &wc.wr_id;
-                               rs->sqe_avail++; /* += wr_id->sqe_count; */
-                               rs->sbuf_bytes_avail += wr_id->length;
-                       } else {
+                       switch  (rs_msg_op((uint32_t) wc.wr_id)) {
+                       case RS_OP_SGL:
                                rs->ctrl_avail++;
+                               break;
+                       case RS_OP_CTRL:
+                               rs->ctrl_avail++;
+                               if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
+                                       rs->state = rs_disconnected;
+                               break;
+                       default:
+                               rs->sqe_avail++;
+                               rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
+                               break;
                        }
                        if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
                                rs->state = rs_error;
@@ -1075,9 +1063,9 @@ static int rs_conn_can_send(struct rsocket *rs)
        return rs_can_send(rs) || !(rs->state & rs_connect_wr);
 }
 
-static int rs_can_send_ctrl(struct rsocket *rs)
+static int rs_conn_can_send_ctrl(struct rsocket *rs)
 {
-       return rs->ctrl_avail;
+       return rs->ctrl_avail || !(rs->state & rs_connected);
 }
 
 static int rs_have_rdata(struct rsocket *rs)
@@ -1090,9 +1078,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
        return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
 }
 
-static int rs_all_sends_done(struct rsocket *rs)
+static int rs_conn_all_sends_done(struct rsocket *rs)
 {
-       return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
+       return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
+              !(rs->state & rs_connected);
 }
 
 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
@@ -1281,15 +1270,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                        sge.addr = (uintptr_t) buf;
                        sge.length = xfer_size;
                        sge.lkey = 0;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
-                                           IBV_SEND_INLINE);
+                       ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
                } else if (xfer_size <= rs_sbuf_left(rs)) {
                        memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
                        rs->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 1,
-                                           rs_msg_set(RS_OP_DATA, xfer_size), 0);
+                       ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
                        if (xfer_size < rs_sbuf_left(rs))
                                rs->ssgl[0].addr += xfer_size;
                        else
@@ -1300,9 +1285,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                rs->ssgl[0].length);
                        rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
                        memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 2,
-                                           rs_msg_set(RS_OP_DATA, xfer_size), 0);
+                       ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
                        rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
                }
                if (ret)
@@ -1395,9 +1378,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                        rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
                                    &cur_iov, &offset, xfer_size);
                        rs->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 1,
-                                           rs_msg_set(RS_OP_DATA, xfer_size),
+                       ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
                                            xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
                        if (xfer_size < rs_sbuf_left(rs))
                                rs->ssgl[0].addr += xfer_size;
@@ -1409,9 +1390,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                    &offset, rs->ssgl[0].length);
                        rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
                        rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
-                       ret = rs_write_data(rs, rs_wrid(1, xfer_size),
-                                           rs->ssgl, 2,
-                                           rs_msg_set(RS_OP_DATA, xfer_size),
+                       ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
                                            xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
                        rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
                }
@@ -1724,7 +1703,7 @@ int rshutdown(int socket, int how)
 
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
-               rs_shutdown_state(rs, rs_connect_rd);
+               rs->state &= ~rs_connect_rd;
                return 0;
        }
 
@@ -1734,26 +1713,27 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
-                       rs->state = rs_disconnected;
+                       rs->state &= ~(rs_connect_rd | rs_connect_wr);
                } else {
-                       rs_shutdown_state(rs, rs_connect_wr);
-                       ctrl = (rs->state & rs_connected) ?
+                       rs->state &= ~rs_connect_wr;
+                       ctrl = (rs->state & rs_connect_rd) ?
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
-               if (!rs_can_send_ctrl(rs)) {
-                       ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
+               if (!rs->ctrl_avail) {
+                       ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
                        if (ret)
                                return ret;
                }
 
-               rs->ctrl_avail--;
-               ret = rs_post_write(rs, 0, NULL, 0,
-                                   rs_msg_set(RS_OP_CTRL, ctrl),
-                                   0, 0, 0);
+               if ((rs->state & rs_connected) && rs->ctrl_avail) {
+                       rs->ctrl_avail--;
+                       ret = rs_post_write(rs, NULL, 0,
+                                           rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+               }
        }
 
-       if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
-               rs_process_cq(rs, 0, rs_all_sends_done);
+       if (rs->state & rs_connected)
+               rs_process_cq(rs, 0, rs_conn_all_sends_done);
 
        if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
                rs_set_nonblocking(rs, 1);