#define RS_SNDLOWAT 2048
#define RS_QP_MIN_SIZE 16
#define RS_QP_MAX_SIZE 0xFFFE
-#define RS_QP_CTRL_SIZE 4
+#define RS_QP_CTRL_SIZE 4 /* must be power of 2 */
#define RS_CONN_RETRIES 6
#define RS_SGL_SIZE 2
static struct index_map idm;
int index; /* -1 if mapping is local and not in iomap_list */
};
-#define RS_MIN_INLINE (sizeof(struct rs_sge))
+#define RS_MAX_CTRL_MSG (sizeof(struct rs_sge))
#define rs_host_is_net() (1 == htonl(1))
#define RS_CONN_FLAG_NET (1 << 0)
#define RS_CONN_FLAG_IOMAP (1 << 1)
if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
(void) fscanf(f, "%hu", &def_inline);
fclose(f);
-
- if (def_inline < RS_MIN_INLINE)
- def_inline = RS_MIN_INLINE;
}
if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
static int rs_init_bufs(struct rsocket *rs)
{
- uint32_t rbuf_msg_size;
+ uint32_t total_rbuf_size, total_sbuf_size;
size_t len;
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return ERR(ENOMEM);
- rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+ total_sbuf_size = rs->sbuf_size;
+ if (rs->sq_inline < RS_MAX_CTRL_MSG)
+ total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
+ rs->sbuf = calloc(total_sbuf_size sizeof(*rs->sbuf));
if (!rs->sbuf)
return ERR(ENOMEM);
- rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
+ rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
if (!rs->smr)
return -1;
if (rs->target_iomap_size)
rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
- rbuf_msg_size = rs->rbuf_size;
+ total_rbuf_size = rs->rbuf_size;
if (rs->opts & RS_OPT_MSG_SEND)
- rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
- rs->rbuf = calloc(rbuf_msg_size, 1);
+ total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
+ rs->rbuf = calloc(total_rbuf_size, 1);
if (!rs->rbuf)
return ERR(ENOMEM);
- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
if (!rs->rmr)
return -1;
return ret;
rs->sq_inline = qp_attr.cap.max_inline_data;
- if (rs->sq_inline < RS_MIN_INLINE)
- return ERR(EINVAL);
+ if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
+ return ERR(ENOTSUP);
for (i = 0; i < rs->rq_size; i++) {
ret = rs_post_recv(rs);
goto err;
rs->sq_inline = qp_attr.cap.max_inline_data;
- if (rs->sq_inline < RS_MIN_INLINE) {
- ret = ERR(ENOMEM);
- goto err;
- }
-
ret = ds_add_qp_dest(qp, src_addr, addrlen);
if (ret)
goto err;
return ret;
}
+static void *rs_get_ctrl_buf(struct rsocket *rs)
+{
+ return rs->sbuf + rs->sbuf_size +
+ RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
+}
+
+//static int rs_post_send_msg(struct rsocket *rs, uint64_t wr_id, uint32_t msg)
+//{
+// struct ibv_send_wr wr, *bad;
+// struct ibv_sge sge;
+// void *ctrl_buf;
+//
+// wr.wr_id = wr_id;
+// wr.next = NULL;
+// if (rs->sq_inline < sizeof msg) {
+// ctrl_buf = rs_get_ctrl_buf(rs);
+// memcpy(ctrl_buf, &msg, sizeof msg);
+// sge.addr = (uintptr_t) ctrl_buf;
+// sge.lkey = rs->smr->lkey;
+// sge.length = sizeof msg;
+// wr.send_flags = 0;
+// } else {
+// sge.addr = (uintptr_t) &msg;
+// sge.lkey = 0;
+// sge.length = sizeof msg;
+// wr.send_flags = IBV_SEND_INLINE;
+// }
+// wr.sg_list = &sge;
+// wr.num_sge = 1;
+// wr.opcode = IBV_WR_SEND;
+//
+// return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+//}
+//
+//static int rs_post_msg(struct rsocket *rs, uint32_t msg)
+//{
+// struct ibv_send_wr wr, *bad;
+// struct ibv_sge sge;
+//
+// if (rs->opts & RS_OPT_MSG_SEND)
+// return rs_post_send_msg(rs, rs_send_wr_id(msg), msg);
+//
+// wr.wr_id = rs_send_wr_id(msg);
+// wr.next = NULL;
+// wr.sg_list = NULL;
+// wr.num_sge = 0;
+// wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+// wr.send_flags = 0;
+// wr.imm_data = htonl(msg);
+//
+// return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+//}
+
static int rs_post_msg(struct rsocket *rs, uint32_t msg)
{
struct ibv_send_wr wr, *bad;
rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
- return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
- flags, addr, rs->remote_iomap.key);
+ return rs_post_write_msg(rs, sgl, nsge,
+ rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), flags, addr,
+ rs->remote_iomap.key);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
ibsge.length = sizeof(sge);
rs_post_write_msg(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
- rs->remote_sge * sizeof(struct rs_sge),
- rs->remote_sgl.key);
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ IBV_SEND_INLINE,
+ rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
+ rs->remote_sgl.key);
rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
rs->rbuf_free_offset += rs->rbuf_size >> 1;
break;
case RDMA_INLINE:
rs->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE);
- if (rs->sq_inline < RS_MIN_INLINE)
- rs->sq_inline = RS_MIN_INLINE;
ret = 0;
break;
case RDMA_IOMAPSIZE: