+++ /dev/null
-Bottom: cb13e6cd851f8f4575498a9c544ff53a15b46db6
-Top: a44edddc396d1994c73398a43b9fef91c0a5125f
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2013-04-13 12:22:55 -0700
-
-Refresh of rs-iwarp
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index de15f58..4b3505a 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -123,6 +123,7 @@ enum {
- #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
- #define rs_msg_op(imm_data) (imm_data >> 29)
- #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-+#define RS_MSG_SIZE sizeof(uint32_t)
-
- #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
- #define rs_send_wr_id(data) ((uint64_t) data)
-@@ -751,24 +752,22 @@ static inline int rs_post_recv(struct rsocket *rs)
- struct ibv_recv_wr wr, *bad;
- struct ibv_sge sge;
-
-- wr.wr_id = rs_recv_wr_id(0);
- wr.next = NULL;
- if (!(rs->opts & RS_OPT_MSG_SEND)) {
-+ wr.wr_id = rs_recv_wr_id(0);
- wr.sg_list = NULL;
- wr.num_sge = 0;
- } else {
-+ wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
- sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
- (rs->rbuf_msg_index * RS_MSG_SIZE);
-- sge.length = sizeof(uint32_t);
-+ sge.length = RS_MSG_SIZE;
- sge.lkey = rs->rmr->lkey;
-
- wr.sg_list = &sge;
- wr.num_sge = 1;
- if(++rs->rbuf_msg_index == rs->rq_size)
- rs->rbuf_msg_index = 0;
-- /* TODO we need wr_id to reference back to receive buffer, so we can
-- * retrieve the incoming message from the buffer
-- */
- }
-
- return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
-@@ -800,7 +799,7 @@ static int rs_create_ep(struct rsocket *rs)
- int i, ret;
-
- rs_set_qp_size(rs);
-- if (rs->cm_id->context->device->transport_type == IBV_TRANSPORT_IWARP)
-+ if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
- rs->opts |= RS_OPT_MSG_SEND;
- ret = rs_init_bufs(rs);
- if (ret)
-@@ -1535,6 +1534,21 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- return ret;
- }
-
-+/*
-+TODO:
-+if MSG_SEND opt is set
-+ if !sgl
-+ post send with imm_data inline
-+ else
-+ post rdma write
-+ pst send with imm_data inline
-+else
-+ existing code flow - rdma write with immediate
-+
-+post_write - rdma write only
-+post msg - immediate data or send only
-+post write msg - rdma write with msg (imm or send after)
-+*/
- static int rs_post_write_msg(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags,
-@@ -1555,6 +1569,29 @@ static int rs_post_write_msg(struct rsocket *rs,
- return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-
-+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
-+{
-+ struct ibv_send_wr wr, *bad;
-+ struct ibv_sge sge;
-+
-+ wr.wr_id = rs_send_wr_id(msg);
-+ wr.next = NULL;
-+ if (rs->opts & RS_OPT_MSG_SEND) {
-+ wr.sg_list = &sge;
-+ wr.num_sge = 1;
-+ wr.opcode = IBV_WR_SEND;
-+ wr.send_flags = IBV_SEND_INLINE;
-+ } else {
-+ wr.sg_list = NULL;
-+ wr.num_sge = 0;
-+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-+ wr.send_flags = flags;
-+ wr.imm_data = htonl(imm_data);
-+ }
-+
-+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
-+}
-+
- static int rs_post_write(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t wr_data, int flags,
-@@ -1713,7 +1750,7 @@ static void rs_update_credits(struct rsocket *rs)
- static int rs_poll_cq(struct rsocket *rs)
- {
- struct ibv_wc wc;
-- uint32_t imm_data;
-+ uint32_t msg;
- int ret, rcnt = 0;
-
- while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
-@@ -1722,19 +1759,25 @@ static int rs_poll_cq(struct rsocket *rs)
- continue;
- rcnt++;
-
-- imm_data = ntohl(wc.imm_data);
-- switch (rs_msg_op(imm_data)) {
-+ if (wc.wc_flags & IBV_WC_WITH_IMM) {
-+ msg = ntohl(wc.imm_data);
-+ } else {
-+ msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
-+ [rs_wr_data(wc.wr_id)];
-+
-+ }
-+ switch (rs_msg_op(msg)) {
- case RS_OP_SGL:
-- rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
-+ rs->sseq_comp = (uint16_t) rs_msg_data(msg);
- break;
- case RS_OP_IOMAP_SGL:
- /* The iomap was updated, that's nice to know. */
- break;
- case RS_OP_CTRL:
-- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
-+ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
- rs->state = rs_disconnected;
- return 0;
-- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-+ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
- rs->state &= ~rs_readable;
- }
- break;
-@@ -1742,8 +1785,8 @@ static int rs_poll_cq(struct rsocket *rs)
- /* We really shouldn't be here. */
- break;
- default:
-- rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-- rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
-+ rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
-+ rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
- if (++rs->rmsg_tail == rs->rq_size + 1)
- rs->rmsg_tail = 0;
- break;
-@@ -2064,6 +2107,11 @@ static int rs_poll_all(struct rsocket *rs)
- */
- static int rs_can_send(struct rsocket *rs)
- {
-+ if (rs->opts & RS_OPT_MSG_SEND) {
-+ return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-+ (rs->sseq_no != rs->sseq_comp) &&
-+ (rs->target_sgl[rs->target_sge].length != 0);
-+ }
- return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
- (rs->sseq_no != rs->sseq_comp) &&
- (rs->target_sgl[rs->target_sge].length != 0);
Bottom: e8c026d3862906d30200710fcc27654d4fb2d580
-Top: cb13e6cd851f8f4575498a9c544ff53a15b46db6
+Top: a44edddc396d1994c73398a43b9fef91c0a5125f
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2013-04-11 10:05:29 -0700
---
diff --git a/src/rsocket.c b/src/rsocket.c
-index ca77116..de15f58 100644
+index ca77116..4b3505a 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -207,7 +207,12 @@ enum rs_state {
+@@ -123,6 +123,7 @@ enum {
+ #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
+ #define rs_msg_op(imm_data) (imm_data >> 29)
+ #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
++#define RS_MSG_SIZE sizeof(uint32_t)
+
+ #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
+ #define rs_send_wr_id(data) ((uint64_t) data)
+@@ -207,7 +208,12 @@ enum rs_state {
rs_error = 0x2000,
};
union socket_addr {
struct sockaddr sa;
-@@ -284,6 +289,7 @@ struct rsocket {
+@@ -284,6 +290,7 @@ struct rsocket {
volatile struct rs_sge *target_sgl;
struct rs_iomap *target_iomap;
int rbuf_bytes_avail;
int rbuf_free_offset;
int rbuf_offset;
-@@ -635,6 +641,7 @@ static void ds_set_qp_size(struct rsocket *rs)
+@@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs)
static int rs_init_bufs(struct rsocket *rs)
{
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
-@@ -664,11 +671,14 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs)
if (rs->target_iomap_size)
rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
if (!rs->rmr)
return -1;
-@@ -685,8 +695,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs)
static int ds_init_bufs(struct ds_qp *qp)
{
if (!qp->rbuf)
return ERR(ENOMEM);
-@@ -740,11 +749,27 @@ err1:
+@@ -740,11 +750,25 @@ err1:
static inline int rs_post_recv(struct rsocket *rs)
{
struct ibv_recv_wr wr, *bad;
+ struct ibv_sge sge;
- wr.wr_id = rs_recv_wr_id(0);
+- wr.wr_id = rs_recv_wr_id(0);
wr.next = NULL;
- wr.sg_list = NULL;
- wr.num_sge = 0;
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
++ wr.wr_id = rs_recv_wr_id(0);
+ wr.sg_list = NULL;
+ wr.num_sge = 0;
+ } else {
++ wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
+ sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+ (rs->rbuf_msg_index * RS_MSG_SIZE);
-+ sge.length = sizeof(uint32_t);
++ sge.length = RS_MSG_SIZE;
+ sge.lkey = rs->rmr->lkey;
+
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+ if(++rs->rbuf_msg_index == rs->rq_size)
+ rs->rbuf_msg_index = 0;
-+ /* TODO we need wr_id to reference back to receive buffer, so we can
-+ * retrieve the incoming message from the buffer
-+ */
+ }
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
-@@ -775,6 +800,8 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs)
int i, ret;
rs_set_qp_size(rs);
-+ if (rs->cm_id->context->device->transport_type == IBV_TRANSPORT_IWARP)
++ if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
+ rs->opts |= RS_OPT_MSG_SEND;
ret = rs_init_bufs(rs);
if (ret)
return ret;
+@@ -1508,6 +1534,21 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ return ret;
+ }
+
++/*
++TODO:
++if MSG_SEND opt is set
++ if !sgl
++ post send with imm_data inline
++ else
++ post rdma write
++ pst send with imm_data inline
++else
++ existing code flow - rdma write with immediate
++
++post_write - rdma write only
++post msg - immediate data or send only
++post write msg - rdma write with msg (imm or send after)
++*/
+ static int rs_post_write_msg(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t imm_data, int flags,
+@@ -1528,6 +1569,29 @@ static int rs_post_write_msg(struct rsocket *rs,
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+ }
+
++static int rs_post_msg(struct rsocket *rs, uint32_t msg)
++{
++ struct ibv_send_wr wr, *bad;
++ struct ibv_sge sge;
++
++ wr.wr_id = rs_send_wr_id(msg);
++ wr.next = NULL;
++ if (rs->opts & RS_OPT_MSG_SEND) {
++ wr.sg_list = &sge;
++ wr.num_sge = 1;
++ wr.opcode = IBV_WR_SEND;
++ wr.send_flags = IBV_SEND_INLINE;
++ } else {
++ wr.sg_list = NULL;
++ wr.num_sge = 0;
++ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
++ wr.send_flags = flags;
++ wr.imm_data = htonl(imm_data);
++ }
++
++ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
++}
++
+ static int rs_post_write(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t wr_data, int flags,
+@@ -1686,7 +1750,7 @@ static void rs_update_credits(struct rsocket *rs)
+ static int rs_poll_cq(struct rsocket *rs)
+ {
+ struct ibv_wc wc;
+- uint32_t imm_data;
++ uint32_t msg;
+ int ret, rcnt = 0;
+
+ while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
+@@ -1695,19 +1759,25 @@ static int rs_poll_cq(struct rsocket *rs)
+ continue;
+ rcnt++;
+
+- imm_data = ntohl(wc.imm_data);
+- switch (rs_msg_op(imm_data)) {
++ if (wc.wc_flags & IBV_WC_WITH_IMM) {
++ msg = ntohl(wc.imm_data);
++ } else {
++ msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
++ [rs_wr_data(wc.wr_id)];
++
++ }
++ switch (rs_msg_op(msg)) {
+ case RS_OP_SGL:
+- rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
++ rs->sseq_comp = (uint16_t) rs_msg_data(msg);
+ break;
+ case RS_OP_IOMAP_SGL:
+ /* The iomap was updated, that's nice to know. */
+ break;
+ case RS_OP_CTRL:
+- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
++ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
+ rs->state = rs_disconnected;
+ return 0;
+- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
++ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
+ rs->state &= ~rs_readable;
+ }
+ break;
+@@ -1715,8 +1785,8 @@ static int rs_poll_cq(struct rsocket *rs)
+ /* We really shouldn't be here. */
+ break;
+ default:
+- rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
+- rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
++ rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
++ rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
+ if (++rs->rmsg_tail == rs->rq_size + 1)
+ rs->rmsg_tail = 0;
+ break;
+@@ -2037,6 +2107,11 @@ static int rs_poll_all(struct rsocket *rs)
+ */
+ static int rs_can_send(struct rsocket *rs)
+ {
++ if (rs->opts & RS_OPT_MSG_SEND) {
++ return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
++ (rs->sseq_no != rs->sseq_comp) &&
++ (rs->target_sgl[rs->target_sge].length != 0);
++ }
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+ (rs->sseq_no != rs->sseq_comp) &&
+ (rs->target_sgl[rs->target_sge].length != 0);