--- /dev/null
+Bottom: b5df9cc04d6e7629ae937bfe9bbc3c7da5193ea0
+Top: fffced6494cfa5fc5f9da5527d93fb7fe504bd86
+Author: Sean Hefty <sean.hefty@intel.com>
+Date: 2012-07-27 17:35:36 -0700
+
+Refresh of fast-disc
+
+---
+
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 1e391b3..908b00c 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -83,12 +83,12 @@ static uint32_t polling_time = 10;
+
+ enum {
+ RS_OP_DATA,
+- RS_OP_DATA_MORE,
+- RS_OP_DRA,
+- RS_OP_DRA_MORE,
++ RS_OP_RSVD_DATA_MORE,
++ RS_OP_RSVD_DRA,
++ RS_OP_RSVD_DRA_MORE,
+ RS_OP_SGL,
+- RS_OP_RSVD1,
+- RS_OP_DRA_SGL,
++ RS_OP_RSVD,
++ RS_OP_RSVD_DRA_SGL,
+ RS_OP_CTRL
+ };
+ #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
+@@ -124,12 +124,11 @@ struct rs_conn_data {
+ struct rs_sge data_buf;
+ };
+
+-union rs_wr_id {
+- uint64_t wr_id;
+- struct {
+- uint32_t reserved; /* sqe_count; */
+- uint32_t length;
+- };
++enum {
++ RS_WR_RECV,
++ RS_WR_DATA,
++ RS_WR_CTRL,
++ RS_WR_DISCONNECT
+ };
+
+ #define RS_RECV_WR_ID (~((uint64_t) 0))
+@@ -264,17 +263,6 @@ out:
+ pthread_mutex_unlock(&mut);
+ }
+
+-/*
+- * We currently generate a completion per send. sqe_count = 1
+- */
+-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
+-{
+- union rs_wr_id wrid;
+- /* wrid.reserved = sqe_count; */
+- wrid.length = length;
+- return wrid;
+-}
+-
+ static int rs_insert(struct rsocket *rs)
+ {
+ pthread_mutex_lock(&mut);
+@@ -772,21 +760,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ return rs_do_connect(rs);
+ }
+
+-static void rs_shutdown_state(struct rsocket *rs, int state)
+-{
+- rs->state &= ~state;
+- if (rs->state == rs_connected)
+- rs->state = rs_disconnected;
+-}
+-
+-static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
++static int rs_post_write(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t imm_data, int flags,
+ uint64_t addr, uint32_t rkey)
+ {
+ struct ibv_send_wr wr, *bad;
+
+- wr.wr_id = wr_id;
++ wr.wr_id = (uint64_t) imm_data;
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+@@ -803,29 +784,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
+ * Update target SGE before sending data. Otherwise the remote side may
+ * update the entry before we do.
+ */
+-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
++static int rs_write_data(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+- uint32_t imm_data, int flags)
++ uint32_t length, int flags)
+ {
+ uint64_t addr;
+ uint32_t rkey;
+
+ rs->sseq_no++;
+ rs->sqe_avail--;
+- rs->sbuf_bytes_avail -= wr_id.length;
++ rs->sbuf_bytes_avail -= length;
+
+ addr = rs->target_sgl[rs->target_sge].addr;
+ rkey = rs->target_sgl[rs->target_sge].key;
+
+- rs->target_sgl[rs->target_sge].addr += wr_id.length;
+- rs->target_sgl[rs->target_sge].length -= wr_id.length;
++ rs->target_sgl[rs->target_sge].addr += length;
++ rs->target_sgl[rs->target_sge].length -= length;
+
+ if (!rs->target_sgl[rs->target_sge].length) {
+ if (++rs->target_sge == RS_SGL_SIZE)
+ rs->target_sge = 0;
+ }
+
+- return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
++ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
++ flags, addr, rkey);
+ }
+
+ static uint32_t rs_sbuf_left(struct rsocket *rs)
+@@ -856,7 +838,7 @@ static void rs_send_credits(struct rsocket *rs)
+ ibsge.lkey = 0;
+ ibsge.length = sizeof(sge);
+
+- rs_post_write(rs, 0, &ibsge, 1,
++ rs_post_write(rs, &ibsge, 1,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ IBV_SEND_INLINE,
+ rs->remote_sgl.addr +
+@@ -870,7 +852,7 @@ static void rs_send_credits(struct rsocket *rs)
+ if (++rs->remote_sge == rs->remote_sgl.length)
+ rs->remote_sge = 0;
+ } else {
+- rs_post_write(rs, 0, NULL, 0,
++ rs_post_write(rs, NULL, 0,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+ }
+ }
+@@ -891,7 +873,6 @@ static void rs_update_credits(struct rsocket *rs)
+ static int rs_poll_cq(struct rsocket *rs)
+ {
+ struct ibv_wc wc;
+- union rs_wr_id *wr_id;
+ uint32_t imm_data;
+ int ret, rcnt = 0;
+
+@@ -911,7 +892,7 @@ static int rs_poll_cq(struct rsocket *rs)
+ rs->state = rs_disconnected;
+ return 0;
+ } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+- rs_shutdown_state(rs, rs_connect_rd);
++ rs->state &= ~rs_connect_rd;
+ }
+ break;
+ default:
+@@ -922,12 +903,19 @@ static int rs_poll_cq(struct rsocket *rs)
+ break;
+ }
+ } else {
+- if (wc.wr_id) {
+- wr_id = (union rs_wr_id *) &wc.wr_id;
+- rs->sqe_avail++; /* += wr_id->sqe_count; */
+- rs->sbuf_bytes_avail += wr_id->length;
+- } else {
++ switch (rs_msg_op((uint32_t) wc.wr_id)) {
++ case RS_OP_SGL:
+ rs->ctrl_avail++;
++ break;
++ case RS_OP_CTRL:
++ rs->ctrl_avail++;
++ if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
++ rs->state = rs_disconnected;
++ break;
++ default:
++ rs->sqe_avail++;
++ rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
++ break;
+ }
+ if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
+ rs->state = rs_error;
+@@ -1075,9 +1063,9 @@ static int rs_conn_can_send(struct rsocket *rs)
+ return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+ }
+
+-static int rs_can_send_ctrl(struct rsocket *rs)
++static int rs_conn_can_send_ctrl(struct rsocket *rs)
+ {
+- return rs->ctrl_avail;
++ return rs->ctrl_avail || !(rs->state & rs_connected);
+ }
+
+ static int rs_have_rdata(struct rsocket *rs)
+@@ -1090,9 +1078,10 @@ static int rs_conn_have_rdata(struct rsocket *rs)
+ return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
+ }
+
+-static int rs_all_sends_done(struct rsocket *rs)
++static int rs_conn_all_sends_done(struct rsocket *rs)
+ {
+- return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
++ return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
++ !(rs->state & rs_connected);
+ }
+
+ static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
+@@ -1281,15 +1270,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ sge.addr = (uintptr_t) buf;
+ sge.length = xfer_size;
+ sge.lkey = 0;
+- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+- &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
+- IBV_SEND_INLINE);
++ ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
+ } else if (xfer_size <= rs_sbuf_left(rs)) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+ rs->ssgl[0].length = xfer_size;
+- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+- rs->ssgl, 1,
+- rs_msg_set(RS_OP_DATA, xfer_size), 0);
++ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += xfer_size;
+ else
+@@ -1300,9 +1285,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+- rs->ssgl, 2,
+- rs_msg_set(RS_OP_DATA, xfer_size), 0);
++ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+ if (ret)
+@@ -1395,9 +1378,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
+ &cur_iov, &offset, xfer_size);
+ rs->ssgl[0].length = xfer_size;
+- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+- rs->ssgl, 1,
+- rs_msg_set(RS_OP_DATA, xfer_size),
++ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
+ xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += xfer_size;
+@@ -1409,9 +1390,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ &offset, rs->ssgl[0].length);
+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+ rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
+- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
+- rs->ssgl, 2,
+- rs_msg_set(RS_OP_DATA, xfer_size),
++ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
+ xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+@@ -1724,7 +1703,7 @@ int rshutdown(int socket, int how)
+
+ rs = idm_at(&idm, socket);
+ if (how == SHUT_RD) {
+- rs_shutdown_state(rs, rs_connect_rd);
++ rs->state &= ~rs_connect_rd;
+ return 0;
+ }
+
+@@ -1734,26 +1713,27 @@ int rshutdown(int socket, int how)
+ if (rs->state & rs_connected) {
+ if (how == SHUT_RDWR) {
+ ctrl = RS_CTRL_DISCONNECT;
+- rs->state = rs_disconnected;
++ rs->state &= ~(rs_connect_rd | rs_connect_wr);
+ } else {
+- rs_shutdown_state(rs, rs_connect_wr);
+- ctrl = (rs->state & rs_connected) ?
++ rs->state &= ~rs_connect_wr;
++ ctrl = (rs->state & rs_connect_rd) ?
+ RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
+ }
+- if (!rs_can_send_ctrl(rs)) {
+- ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
++ if (!rs->ctrl_avail) {
++ ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
+ if (ret)
+ return ret;
+ }
+
+- rs->ctrl_avail--;
+- ret = rs_post_write(rs, 0, NULL, 0,
+- rs_msg_set(RS_OP_CTRL, ctrl),
+- 0, 0, 0);
++ if ((rs->state & rs_connected) && rs->ctrl_avail) {
++ rs->ctrl_avail--;
++ ret = rs_post_write(rs, NULL, 0,
++ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
++ }
+ }
+
+- if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
+- rs_process_cq(rs, 0, rs_all_sends_done);
++ if (rs->state & rs_connected)
++ rs_process_cq(rs, 0, rs_conn_all_sends_done);
+
+ if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
+ rs_set_nonblocking(rs, 1);