+++ /dev/null
-Bottom: e8c026d3862906d30200710fcc27654d4fb2d580
-Top: de87f5a87dd536025f4ce22a3bad2600d4350081
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2013-04-11 10:05:29 -0700
-
-rsocket: Add support for iWarp
-
-iWarp does not support RDMA writes with immediate data.
-Instead of sending messages using immediate data, allow
-the rsocket protocol to exchange messages using sends.
-
-The rsocket protocol remains the same. RDMA writes are
-used for data transfers, with send messages used to transfer
-rsocket protocol messages.
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index ca77116..abdd392 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -123,6 +123,7 @@ enum {
- #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
- #define rs_msg_op(imm_data) (imm_data >> 29)
- #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-+#define RS_MSG_SIZE sizeof(uint32_t)
-
- #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
- #define rs_send_wr_id(data) ((uint64_t) data)
-@@ -207,7 +208,12 @@ enum rs_state {
- rs_error = 0x2000,
- };
-
--#define RS_OPT_SWAP_SGL 1
-+#define RS_OPT_SWAP_SGL (1 << 0)
-+/*
-+ * iWarp does not support RDMA write with immediate data. For iWarp, we
-+ * transfer rsocket messages as inline sends.
-+ */
-+#define RS_OPT_MSG_SEND (1 << 1)
-
- union socket_addr {
- struct sockaddr sa;
-@@ -284,6 +290,7 @@ struct rsocket {
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
-
-+ int rbuf_msg_index;
- int rbuf_bytes_avail;
- int rbuf_free_offset;
- int rbuf_offset;
-@@ -600,15 +607,15 @@ static void rs_set_qp_size(struct rsocket *rs)
-
- if (rs->sq_size > max_size)
- rs->sq_size = max_size;
-- else if (rs->sq_size < 2)
-- rs->sq_size = 2;
-+ else if (rs->sq_size < 4)
-+ rs->sq_size = 4;
- if (rs->sq_size <= (RS_QP_CTRL_SIZE << 2))
-- rs->ctrl_avail = 1;
-+ rs->ctrl_avail = 2;
-
- if (rs->rq_size > max_size)
- rs->rq_size = max_size;
-- else if (rs->rq_size < 2)
-- rs->rq_size = 2;
-+ else if (rs->rq_size < 4)
-+ rs->rq_size = 4;
- }
-
- static void ds_set_qp_size(struct rsocket *rs)
-@@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs)
-
- static int rs_init_bufs(struct rsocket *rs)
- {
-+ uint32_t rbuf_msg_size;
- size_t len;
-
- rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
-@@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs)
- if (rs->target_iomap_size)
- rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
-
-- rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
-+ rbuf_msg_size = rs->rbuf_size;
-+ if (rs->opts & RS_OPT_MSG_SEND)
-+ rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
-+ rs->rbuf = calloc(rbuf_msg_size, 1);
- if (!rs->rbuf)
- return ERR(ENOMEM);
-
-- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
-+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
- if (!rs->rmr)
- return -1;
-
-@@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs)
-
- static int ds_init_bufs(struct ds_qp *qp)
- {
-- qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
-- sizeof(*qp->rbuf));
-+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
- if (!qp->rbuf)
- return ERR(ENOMEM);
-
-@@ -740,11 +750,25 @@ err1:
- static inline int rs_post_recv(struct rsocket *rs)
- {
- struct ibv_recv_wr wr, *bad;
-+ struct ibv_sge sge;
-
-- wr.wr_id = rs_recv_wr_id(0);
- wr.next = NULL;
-- wr.sg_list = NULL;
-- wr.num_sge = 0;
-+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ wr.wr_id = rs_recv_wr_id(0);
-+ wr.sg_list = NULL;
-+ wr.num_sge = 0;
-+ } else {
-+ wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
-+ sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
-+ (rs->rbuf_msg_index * RS_MSG_SIZE);
-+ sge.length = RS_MSG_SIZE;
-+ sge.lkey = rs->rmr->lkey;
-+
-+ wr.sg_list = &sge;
-+ wr.num_sge = 1;
-+ if(++rs->rbuf_msg_index == rs->rq_size)
-+ rs->rbuf_msg_index = 0;
-+ }
-
- return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
- }
-@@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs)
- int i, ret;
-
- rs_set_qp_size(rs);
-+ if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
-+ rs->opts |= RS_OPT_MSG_SEND;
- ret = rs_init_bufs(rs);
- if (ret)
- return ret;
-@@ -1224,6 +1250,9 @@ do_connect:
- param.flow_control = 1;
- param.retry_count = 7;
- param.rnr_retry_count = 7;
-+ /* work-around: iWarp issues RDMA read during connection */
-+ if (rs->opts & RS_OPT_MSG_SEND)
-+ param.initiator_depth = 1;
- rs->retries = 0;
-
- ret = rdma_connect(rs->cm_id, ¶m);
-@@ -1508,22 +1537,28 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- return ret;
- }
-
--static int rs_post_write_msg(struct rsocket *rs,
-- struct ibv_sge *sgl, int nsge,
-- uint32_t imm_data, int flags,
-- uint64_t addr, uint32_t rkey)
-+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
- {
- struct ibv_send_wr wr, *bad;
-+ struct ibv_sge sge;
-
-- wr.wr_id = rs_send_wr_id(imm_data);
-+ wr.wr_id = rs_send_wr_id(msg);
- wr.next = NULL;
-- wr.sg_list = sgl;
-- wr.num_sge = nsge;
-- wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-- wr.send_flags = flags;
-- wr.imm_data = htonl(imm_data);
-- wr.wr.rdma.remote_addr = addr;
-- wr.wr.rdma.rkey = rkey;
-+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ wr.sg_list = NULL;
-+ wr.num_sge = 0;
-+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-+ wr.send_flags = 0;
-+ wr.imm_data = htonl(msg);
-+ } else {
-+ sge.addr = (uintptr_t) &msg;
-+ sge.lkey = 0;
-+ sge.length = sizeof msg;
-+ wr.sg_list = &sge;
-+ wr.num_sge = 1;
-+ wr.opcode = IBV_WR_SEND;
-+ wr.send_flags = IBV_SEND_INLINE;
-+ }
-
- return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-@@ -1547,6 +1582,34 @@ static int rs_post_write(struct rsocket *rs,
- return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-
-+static int rs_post_write_msg(struct rsocket *rs,
-+ struct ibv_sge *sgl, int nsge,
-+ uint32_t msg, int flags,
-+ uint64_t addr, uint32_t rkey)
-+{
-+ struct ibv_send_wr wr, *bad;
-+ int ret;
-+
-+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ wr.wr_id = rs_send_wr_id(msg);
-+ wr.next = NULL;
-+ wr.sg_list = sgl;
-+ wr.num_sge = nsge;
-+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-+ wr.send_flags = flags;
-+ wr.imm_data = htonl(msg);
-+ wr.wr.rdma.remote_addr = addr;
-+ wr.wr.rdma.rkey = rkey;
-+
-+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
-+ } else {
-+ ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
-+ if (!ret)
-+ ret = rs_post_msg(rs, msg);
-+ return ret;
-+ }
-+}
-+
- static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
- uint32_t wr_data)
- {
-@@ -1578,6 +1641,8 @@ static int rs_write_data(struct rsocket *rs,
-
- rs->sseq_no++;
- rs->sqe_avail--;
-+ if (rs->opts & RS_OPT_MSG_SEND)
-+ rs->sqe_avail--;
- rs->sbuf_bytes_avail -= length;
-
- addr = rs->target_sgl[rs->target_sge].addr;
-@@ -1615,6 +1680,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
-
- rs->sseq_no++;
- rs->sqe_avail--;
-+ if (rs->opts & RS_OPT_MSG_SEND)
-+ rs->sqe_avail--;
- rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-
- addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-@@ -1636,6 +1703,9 @@ static void rs_send_credits(struct rsocket *rs)
- rs->ctrl_avail--;
- rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
- if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
-+ if (rs->opts & RS_OPT_MSG_SEND)
-+ rs->ctrl_avail--;
-+
- if (!(rs->opts & RS_OPT_SWAP_SGL)) {
- sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
- sge.key = rs->rmr->rkey;
-@@ -1664,17 +1734,21 @@ static void rs_send_credits(struct rsocket *rs)
- if (++rs->remote_sge == rs->remote_sgl.length)
- rs->remote_sge = 0;
- } else {
-- rs_post_write_msg(rs, NULL, 0,
-- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-- 0, 0, 0);
-+ rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
- }
- }
-
- static int rs_give_credits(struct rsocket *rs)
- {
-- return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
-- ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-- rs->ctrl_avail && (rs->state & rs_connected);
-+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
-+ ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-+ rs->ctrl_avail && (rs->state & rs_connected);
-+ } else {
-+ return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
-+ ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-+ (rs->ctrl_avail > 1) && (rs->state & rs_connected);
-+ }
- }
-
- static void rs_update_credits(struct rsocket *rs)
-@@ -1686,7 +1760,7 @@ static void rs_update_credits(struct rsocket *rs)
- static int rs_poll_cq(struct rsocket *rs)
- {
- struct ibv_wc wc;
-- uint32_t imm_data;
-+ uint32_t msg;
- int ret, rcnt = 0;
-
- while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
-@@ -1695,19 +1769,25 @@ static int rs_poll_cq(struct rsocket *rs)
- continue;
- rcnt++;
-
-- imm_data = ntohl(wc.imm_data);
-- switch (rs_msg_op(imm_data)) {
-+ if (wc.wc_flags & IBV_WC_WITH_IMM) {
-+ msg = ntohl(wc.imm_data);
-+ } else {
-+ msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size))
-+ [rs_wr_data(wc.wr_id)];
-+
-+ }
-+ switch (rs_msg_op(msg)) {
- case RS_OP_SGL:
-- rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
-+ rs->sseq_comp = (uint16_t) rs_msg_data(msg);
- break;
- case RS_OP_IOMAP_SGL:
- /* The iomap was updated, that's nice to know. */
- break;
- case RS_OP_CTRL:
-- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
-+ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
- rs->state = rs_disconnected;
- return 0;
-- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-+ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
- rs->state &= ~rs_readable;
- }
- break;
-@@ -1715,8 +1795,8 @@ static int rs_poll_cq(struct rsocket *rs)
- /* We really shouldn't be here. */
- break;
- default:
-- rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-- rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
-+ rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
-+ rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
- if (++rs->rmsg_tail == rs->rq_size + 1)
- rs->rmsg_tail = 0;
- break;
-@@ -2037,9 +2117,15 @@ static int rs_poll_all(struct rsocket *rs)
- */
- static int rs_can_send(struct rsocket *rs)
- {
-- return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-- (rs->sseq_no != rs->sseq_comp) &&
-- (rs->target_sgl[rs->target_sge].length != 0);
-+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-+ (rs->sseq_no != rs->sseq_comp) &&
-+ (rs->target_sgl[rs->target_sge].length != 0);
-+ } else {
-+ return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-+ (rs->sseq_no != rs->sseq_comp) &&
-+ (rs->target_sgl[rs->target_sge].length != 0);
-+ }
- }
-
- static int ds_can_send(struct rsocket *rs)
-@@ -3011,8 +3097,7 @@ int rshutdown(int socket, int how)
-
- if ((rs->state & rs_connected) && rs->ctrl_avail) {
- rs->ctrl_avail--;
-- ret = rs_post_write_msg(rs, NULL, 0,
-- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-+ ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
- }
- }