#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE sizeof(uint32_t)
#define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63)
#define rs_send_wr_id(data) ((uint64_t) data)
rs_error = 0x2000,
};
-#define RS_OPT_SWAP_SGL 1
+#define RS_OPT_SWAP_SGL (1 << 0)
+/*
+ * iWarp does not support RDMA write with immediate data. For iWarp, we
+ * transfer rsocket messages as inline sends.
+ */
+#define RS_OPT_MSG_SEND (1 << 1)
union socket_addr {
struct sockaddr sa;
volatile struct rs_sge *target_sgl;
struct rs_iomap *target_iomap;
+ int rbuf_msg_index;
int rbuf_bytes_avail;
int rbuf_free_offset;
int rbuf_offset;
if (rs->sq_size > max_size)
rs->sq_size = max_size;
- else if (rs->sq_size < 2)
- rs->sq_size = 2;
+ else if (rs->sq_size < 4)
+ rs->sq_size = 4;
if (rs->sq_size <= (RS_QP_CTRL_SIZE << 2))
- rs->ctrl_avail = 1;
+ rs->ctrl_avail = 2;
if (rs->rq_size > max_size)
rs->rq_size = max_size;
- else if (rs->rq_size < 2)
- rs->rq_size = 2;
+ else if (rs->rq_size < 4)
+ rs->rq_size = 4;
}
static void ds_set_qp_size(struct rsocket *rs)
static int rs_init_bufs(struct rsocket *rs)
{
+ uint32_t rbuf_msg_size;
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (rs->target_iomap_size)
rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
- rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+ rbuf_msg_size = rs->rbuf_size;
+ if (rs->opts & RS_OPT_MSG_SEND)
+ rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
+ rs->rbuf = calloc(rbuf_msg_size, 1);
if (!rs->rbuf)
return ERR(ENOMEM);
- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
if (!rs->rmr)
return -1;
static int ds_init_bufs(struct ds_qp *qp)
{
- qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
- sizeof(*qp->rbuf));
+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
if (!qp->rbuf)
return ERR(ENOMEM);
static inline int rs_post_recv(struct rsocket *rs)
{
struct ibv_recv_wr wr, *bad;
+ struct ibv_sge sge;
- wr.wr_id = rs_recv_wr_id(0);
wr.next = NULL;
- wr.sg_list = NULL;
- wr.num_sge = 0;
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ wr.wr_id = rs_recv_wr_id(0);
+ wr.sg_list = NULL;
+ wr.num_sge = 0;
+ } else {
+ wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index);
+ sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+ (rs->rbuf_msg_index * RS_MSG_SIZE);
+ sge.length = RS_MSG_SIZE;
+ sge.lkey = rs->rmr->lkey;
+
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+ if(++rs->rbuf_msg_index == rs->rq_size)
+ rs->rbuf_msg_index = 0;
+ }
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
int i, ret;
rs_set_qp_size(rs);
+ if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP)
+ rs->opts |= RS_OPT_MSG_SEND;
ret = rs_init_bufs(rs);
if (ret)
return ret;
return ret;
}
-static int rs_post_write_msg(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags,
- uint64_t addr, uint32_t rkey)
+static int rs_post_msg(struct rsocket *rs, uint32_t msg)
{
struct ibv_send_wr wr, *bad;
+ struct ibv_sge sge;
- wr.wr_id = rs_send_wr_id(imm_data);
+ wr.wr_id = rs_send_wr_id(msg);
wr.next = NULL;
- wr.sg_list = sgl;
- wr.num_sge = nsge;
- wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
- wr.send_flags = flags;
- wr.imm_data = htonl(imm_data);
- wr.wr.rdma.remote_addr = addr;
- wr.wr.rdma.rkey = rkey;
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+ wr.opcode = IBV_WR_SEND;
+ wr.send_flags = IBV_SEND_INLINE;
+ } else {
+ wr.sg_list = NULL;
+ wr.num_sge = 0;
+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+ wr.send_flags = 0;
+ wr.imm_data = htonl(msg);
+ }
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+static int rs_post_write_msg(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint32_t msg, int flags,
+ uint64_t addr, uint32_t rkey)
+{
+ struct ibv_send_wr wr, *bad;
+ int ret;
+
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ wr.wr_id = rs_send_wr_id(msg);
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+ wr.send_flags = flags;
+ wr.imm_data = htonl(msg);
+ wr.wr.rdma.remote_addr = addr;
+ wr.wr.rdma.rkey = rkey;
+
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+ } else {
+ ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey);
+ if (!ret)
+ ret = rs_post_msg(rs, msg);
+ return ret;
+ }
+}
+
static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
uint32_t wr_data)
{
rs->sseq_no++;
rs->sqe_avail--;
+ if (rs->opts & RS_OPT_MSG_SEND)
+ rs->sqe_avail--;
rs->sbuf_bytes_avail -= length;
addr = rs->target_sgl[rs->target_sge].addr;
rs->sseq_no++;
rs->sqe_avail--;
+ if (rs->opts & RS_OPT_MSG_SEND)
+ rs->sqe_avail--;
rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
rs->ctrl_avail--;
rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
+ if (rs->opts & RS_OPT_MSG_SEND)
+ rs->ctrl_avail--;
+
if (!(rs->opts & RS_OPT_SWAP_SGL)) {
sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
sge.key = rs->rmr->rkey;
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
- rs_post_write_msg(rs, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- 0, 0, 0);
+ rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size));
}
}
static int rs_give_credits(struct rsocket *rs)
{
- return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
- ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
- rs->ctrl_avail && (rs->state & rs_connected);
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+ ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+ rs->ctrl_avail && (rs->state & rs_connected);
+ } else {
+ return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+ ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+ (rs->ctrl_avail > 1) && (rs->state & rs_connected);
+ }
}
static void rs_update_credits(struct rsocket *rs)
static int rs_poll_cq(struct rsocket *rs)
{
struct ibv_wc wc;
- uint32_t imm_data;
+ uint32_t msg;
int ret, rcnt = 0;
while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) {
continue;
rcnt++;
- imm_data = ntohl(wc.imm_data);
- switch (rs_msg_op(imm_data)) {
+ if (wc.wc_flags & IBV_WC_WITH_IMM) {
+ msg = ntohl(wc.imm_data);
+ } else {
+ msg = ((uint32_t *) rs->rbuf + rs->rbuf_size)
+ [rs_wr_data(wc.wr_id)];
+
+ }
+ switch (rs_msg_op(msg)) {
case RS_OP_SGL:
- rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
+ rs->sseq_comp = (uint16_t) rs_msg_data(msg);
break;
case RS_OP_IOMAP_SGL:
/* The iomap was updated, that's nice to know. */
break;
case RS_OP_CTRL:
- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
+ if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) {
rs->state = rs_disconnected;
return 0;
- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+ } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) {
rs->state &= ~rs_readable;
}
break;
/* We really shouldn't be here. */
break;
default:
- rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
- rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
+ rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg);
+ rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg);
if (++rs->rmsg_tail == rs->rq_size + 1)
rs->rmsg_tail = 0;
break;
*/
static int rs_can_send(struct rsocket *rs)
{
- return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
- (rs->sseq_no != rs->sseq_comp) &&
- (rs->target_sgl[rs->target_sge].length != 0);
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+ (rs->sseq_no != rs->sseq_comp) &&
+ (rs->target_sgl[rs->target_sge].length != 0);
+ } else {
+ return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
+ (rs->sseq_no != rs->sseq_comp) &&
+ (rs->target_sgl[rs->target_sge].length != 0);
+ }
}
static int ds_can_send(struct rsocket *rs)
if ((rs->state & rs_connected) && rs->ctrl_avail) {
rs->ctrl_avail--;
- ret = rs_post_write_msg(rs, NULL, 0,
- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+ ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl));
}
}