#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_MSG_SIZE sizeof(uint32_t)
#define RS_RECV_WR_ID (~((uint64_t) 0))
#define DS_WR_RECV 0xFFFFFFFF
+/* TODO: wr_id length does not appear to be used */
#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
rs_error = 0x2000,
};
-#define RS_OPT_SWAP_SGL 1
+#define RS_OPT_SWAP_SGL (1 << 0)
+/*
+ * iWarp does not support RDMA write with immediate data. For iWarp, we
+ * transfer rsocket messages as inline sends.
+ */
+#define RS_OPT_MSG_SEND (1 << 1)
union socket_addr {
struct sockaddr sa;
volatile struct rs_sge *target_sgl;
struct rs_iomap *target_iomap;
+ int rbuf_msg_index;
int rbuf_bytes_avail;
int rbuf_free_offset;
int rbuf_offset;
static int rs_init_bufs(struct rsocket *rs)
{
+ uint32_t rbuf_msg_size;
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (rs->target_iomap_size)
rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
- rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+ rbuf_msg_size = rs->rbuf_size;
+ if (rs->opts & RS_OPT_MSG_SEND)
+ rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
+ rs->rbuf = calloc(rbuf_msg_size, 1);
if (!rs->rbuf)
return ERR(ENOMEM);
- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
if (!rs->rmr)
return -1;
static int ds_init_bufs(struct ds_qp *qp)
{
- qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh),
- sizeof(*qp->rbuf));
+ qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1);
if (!qp->rbuf)
return ERR(ENOMEM);
static inline int rs_post_recv(struct rsocket *rs)
{
struct ibv_recv_wr wr, *bad;
+ struct ibv_sge sge;
wr.wr_id = RS_RECV_WR_ID;
wr.next = NULL;
- wr.sg_list = NULL;
- wr.num_sge = 0;
+ if (!(rs->opts & RS_OPT_MSG_SEND)) {
+ wr.sg_list = NULL;
+ wr.num_sge = 0;
+ } else {
+ sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size +
+ (rs->rbuf_msg_index * RS_MSG_SIZE);
+ sge.length = sizeof(uint32_t);
+ sge.lkey = rs->rmr->lkey;
+
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+ if(++rs->rbuf_msg_index == rs->rq_size)
+ rs->rbuf_msg_index = 0;
+ /* TODO we need wr_id to reference back to receive buffer, so we can
+ * retrieve the incoming message from the buffer
+ */
+ }
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
int i, ret;
rs_set_qp_size(rs);
+ if (rs->cm_id->context->device->transport_type == IBV_TRANSPORT_IWARP)
+ rs->opts |= RS_OPT_MSG_SEND;
ret = rs_init_bufs(rs);
if (ret)
return ret;