#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define rs_wrid_set(sqe, length) ((((uint64_t) sqe) << 32) | ((uint64_t) length))
+#define rs_wrid_len(wrid) ((uint32_t) wrid)
+#define rs_wrid_sqe(wrid) 1
+
enum {
RS_CTRL_DISCONNECT
};
struct rs_sge data_buf;
};
-union rs_wr_id {
- uint64_t wr_id;
- struct {
- uint32_t reserved; /* sqe_count; */
- uint32_t length;
- };
-};
-
enum rs_state {
rs_init,
rs_bound,
uint32_t sbuf_size;
struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
+ struct ibv_sge ssge;
uint8_t *sbuf;
};
-/*
- * We currently generate a completion per send. sqe_count = 1
- */
-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
-{
- union rs_wr_id wrid;
- /* wrid.reserved = sqe_count; */
- wrid.length = length;
- return wrid;
-}
static int rs_insert(struct rsocket *rs)
{
if (!rs->rmr)
return -1;
- rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
+ rs->ssge.addr = (uintptr_t) rs->sbuf;
rs->sbuf_bytes_avail = rs->sbuf_size;
- rs->ssgl[0].lkey = rs->ssgl[1].lkey = rs->smr->lkey;
+ rs->ssge.lkey = rs->smr->lkey;
rs->rbuf_free_offset = rs->rbuf_size >> 1;
rs->rbuf_bytes_avail = rs->rbuf_size >> 1;
qp_attr.sq_sig_all = 1;
qp_attr.cap.max_send_wr = rs->sq_size;
qp_attr.cap.max_recv_wr = rs->rq_size;
- qp_attr.cap.max_send_sge = 2;
+ qp_attr.cap.max_send_sge = 1;
qp_attr.cap.max_recv_sge = 1;
qp_attr.cap.max_inline_data = rs->sq_inline;
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
*/
-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
- struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags)
+static int rs_write_data(struct rsocket *rs, struct ibv_sge *sge, int flags)
{
uint64_t addr;
uint32_t rkey;
+ uint32_t len = sge->length;
rs->sseq_no++;
rs->sqe_avail--;
- rs->sbuf_bytes_avail -= wr_id.length;
+ rs->sbuf_bytes_avail -= len;
addr = rs->target_sgl[rs->target_sge].addr;
rkey = rs->target_sgl[rs->target_sge].key;
- rs->target_sgl[rs->target_sge].addr += wr_id.length;
- rs->target_sgl[rs->target_sge].length -= wr_id.length;
+ rs->target_sgl[rs->target_sge].addr += len;
+ rs->target_sgl[rs->target_sge].length -= len;
if (!rs->target_sgl[rs->target_sge].length) {
if (++rs->target_sge == RS_SGL_SIZE)
rs->target_sge = 0;
}
- return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
+ return rs_post_write(rs, rs_wrid_set(1, len), sge, 1,
+ rs_msg_set(RS_OP_DATA, len), flags, addr, rkey);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
{
return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
- rs->ssgl[0].addr);
+ rs->ssge.addr);
}
static void rs_send_credits(struct rsocket *rs)
static int rs_poll_cq(struct rsocket *rs)
{
struct ibv_wc wc;
- union rs_wr_id *wr_id;
uint32_t imm_data;
int ret, rcnt = 0;
}
} else {
if (wc.wr_id) {
- wr_id = (union rs_wr_id *) &wc.wr_id;
- rs->sqe_avail++; /* += wr_id->sqe_count; */
- rs->sbuf_bytes_avail += wr_id->length;
+ rs->sqe_avail += rs_wrid_sqe(wc.wr_id);
+ rs->sbuf_bytes_avail += rs_wrid_len(wc.wr_id);
} else {
rs->ctrl_avail++;
}
xfer_size = rs->sbuf_bytes_avail;
if (xfer_size > rs->target_sgl[rs->target_sge].length)
xfer_size = rs->target_sgl[rs->target_sge].length;
+ if (xfer_size > rs_sbuf_left(rs))
+ xfer_size = rs_sbuf_left(rs);
if (xfer_size <= rs->sq_inline) {
sge.addr = (uintptr_t) buf;
sge.length = xfer_size;
sge.lkey = 0;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
- IBV_SEND_INLINE);
- } else if (xfer_size <= rs_sbuf_left(rs)) {
- memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
- rs->ssgl[0].length = xfer_size;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 1,
- rs_msg_set(RS_OP_DATA, xfer_size), 0);
+ ret = rs_write_data(rs, &sge, IBV_SEND_INLINE);
+ if (xfer_size == rs_sbuf_left(rs))
+ rs->ssge.addr = (uintptr_t) rs->sbuf;
+ } else {
+ memcpy((void *) (uintptr_t) rs->ssge.addr, buf, xfer_size);
+ rs->ssge.length = xfer_size;
+ ret = rs_write_data(rs, &rs->ssge, 0);
if (xfer_size < rs_sbuf_left(rs))
- rs->ssgl[0].addr += xfer_size;
+ rs->ssge.addr += xfer_size;
else
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
- } else {
- rs->ssgl[0].length = rs_sbuf_left(rs);
- memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
- rs->ssgl[0].length);
- rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
- memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 2,
- rs_msg_set(RS_OP_DATA, xfer_size), 0);
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ rs->ssge.addr = (uintptr_t) rs->sbuf;
}
if (ret)
break;
xfer_size = rs->sbuf_bytes_avail;
if (xfer_size > rs->target_sgl[rs->target_sge].length)
xfer_size = rs->target_sgl[rs->target_sge].length;
-
- if (xfer_size <= rs_sbuf_left(rs)) {
- rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
- &cur_iov, &offset, xfer_size);
- rs->ssgl[0].length = xfer_size;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 1,
- rs_msg_set(RS_OP_DATA, xfer_size),
- xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
- if (xfer_size < rs_sbuf_left(rs))
- rs->ssgl[0].addr += xfer_size;
- else
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
- } else {
- rs->ssgl[0].length = rs_sbuf_left(rs);
- rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov,
- &offset, rs->ssgl[0].length);
- rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
- rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 2,
- rs_msg_set(RS_OP_DATA, xfer_size),
- xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
- rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
- }
+ if (xfer_size > rs_sbuf_left(rs))
+ xfer_size = rs_sbuf_left(rs);
+
+ rs_copy_iov((void *) (uintptr_t) rs->ssge.addr,
+ &cur_iov, &offset, xfer_size);
+ rs->ssge.length = xfer_size;
+ ret = rs_write_data(rs, &rs->ssge, xfer_size <= rs->sq_inline ?
+ IBV_SEND_INLINE : 0);
if (ret)
break;
+
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssge.addr += xfer_size;
+ else
+ rs->ssge.addr = (uintptr_t) rs->sbuf;
}
fastlock_release(&rs->slock);