#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
#define RS_QP_SIZE 512
+#define RS_QP_MAX_SIZE 0xFFFE
+#define RS_QP_MIN_SIZE 8
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
#define RS_SGL_SIZE 2
int sbuf_bytes_avail;
uint16_t sseq_no;
uint16_t sseq_comp;
+ uint16_t sq_size;
+ uint16_t rq_size;
uint16_t rseq_no;
uint16_t rseq_comp;
int rbuf_bytes_avail;
int rbuf_offset;
int rmsg_head;
int rmsg_tail;
- struct rs_msg rmsg[RS_QP_SIZE + 1];
+ struct rs_msg *rmsg;
int remote_sge;
struct rs_sge remote_sgl;
return NULL;
rs->index = -1;
- rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
- rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
+ if (inherited_rs) {
+ rs->sbuf_size = inherited_rs->sbuf_size;
+ rs->rbuf_size = inherited_rs->rbuf_size;
+ rs->sq_size = inherited_rs->sq_size;
+ rs->rq_size = inherited_rs->rq_size;
+ rs->ctrl_avail = inherited_rs->ctrl_avail;
+ } else {
+ rs->sbuf_size = rs->rbuf_size = RS_BUF_SIZE;
+ rs->sq_size = rs->rq_size = RS_QP_SIZE;
+ rs->ctrl_avail = RS_QP_CTRL_SIZE;
+ }
fastlock_init(&rs->slock);
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
return ret;
}
+static void rs_set_qp_size(struct rsocket *rs)
+{
+ uint16_t max_size;
+
+ max_size = min(ucma_max_qpsize(rs->cm_id), RS_QP_MAX_SIZE);
+
+ if (rs->sq_size > max_size)
+ rs->sq_size = max_size;
+ if (rs->rq_size > max_size)
+ rs->rq_size = max_size;
+}
+
static int rs_init_bufs(struct rsocket *rs)
{
+ rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
+ if (!rs->rmsg)
+ return -1;
+
rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
if (!rs->sbuf)
return -1;
rs->rbuf_free_offset = rs->rbuf_size >> 1;
rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
- rs->ctrl_avail = RS_QP_CTRL_SIZE;
- rs->sqe_avail = RS_QP_SIZE - rs->ctrl_avail;
- rs->rseq_comp = RS_QP_SIZE >> 1;
+ rs->sqe_avail = rs->sq_size - rs->ctrl_avail;
+ rs->rseq_comp = rs->rq_size >> 1;
return 0;
}
if (!rs->cm_id->recv_cq_channel)
return -1;
- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, RS_QP_SIZE * 2,
+ rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
rs->cm_id, rs->cm_id->recv_cq_channel, 0);
if (!rs->cm_id->recv_cq)
goto err1;
struct ibv_qp_init_attr qp_attr;
int i, ret;
+ rs_set_qp_size(rs);
ret = rs_init_bufs(rs);
if (ret)
return ret;
qp_attr.recv_cq = rs->cm_id->recv_cq;
qp_attr.qp_type = IBV_QPT_RC;
qp_attr.sq_sig_all = 1;
- qp_attr.cap.max_send_wr = RS_QP_SIZE;
- qp_attr.cap.max_recv_wr = RS_QP_SIZE;
+ qp_attr.cap.max_send_wr = rs->sq_size;
+ qp_attr.cap.max_recv_wr = rs->rq_size;
qp_attr.cap.max_send_sge = 2;
qp_attr.cap.max_recv_sge = 1;
qp_attr.cap.max_inline_data = RS_INLINE;
if (ret)
return ret;
- for (i = 0; i < RS_QP_SIZE; i++) {
+ for (i = 0; i < rs->rq_size; i++) {
ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0);
if (ret)
return ret;
if (rs->index >= 0)
rs_remove(rs);
+ if (rs->rmsg)
+ free(rs->rmsg);
+
if (rs->sbuf) {
if (rs->smr)
rdma_dereg_mr(rs->smr);
{
conn->version = 1;
conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
- conn->credits = htons(RS_QP_SIZE);
+ conn->credits = htons(rs->rq_size);
conn->reserved2 = 0;
conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
struct rs_sge sge;
rs->ctrl_avail--;
- rs->rseq_comp = rs->rseq_no + (RS_QP_SIZE >> 1);
+ rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) {
if (!(rs->opts & RS_OPT_SWAP_SGL)) {
sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset];
ibsge.length = sizeof(sge);
rs_post_write(rs, 0, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + RS_QP_SIZE),
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
IBV_SEND_INLINE,
rs->remote_sgl.addr +
rs->remote_sge * sizeof(struct rs_sge),
rs->remote_sge = 0;
} else {
rs_post_write(rs, 0, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + RS_QP_SIZE), 0, 0, 0);
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
}
}
default:
rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
- if (++rs->rmsg_tail == RS_QP_SIZE + 1)
+ if (++rs->rmsg_tail == rs->rq_size + 1)
rs->rmsg_tail = 0;
break;
}
static int rs_all_sends_done(struct rsocket *rs)
{
- return (rs->sqe_avail + rs->ctrl_avail) == RS_QP_SIZE;
+ return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
}
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
rsize = left;
} else {
rsize = rs->rmsg[rmsg_head].data;
- if (++rmsg_head == RS_QP_SIZE + 1)
+ if (++rmsg_head == rs->rq_size + 1)
rmsg_head = 0;
}
} else {
rs->rseq_no++;
rsize = rs->rmsg[rs->rmsg_head].data;
- if (++rs->rmsg_head == RS_QP_SIZE + 1)
+ if (++rs->rmsg_head == rs->rq_size + 1)
rs->rmsg_head = 0;
}