enum {
RS_OP_DATA,
- RS_OP_DATA_MORE,
- RS_OP_DRA,
- RS_OP_DRA_MORE,
+ RS_OP_RSVD_DATA_MORE,
+ RS_OP_RSVD_DRA,
+ RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
- RS_OP_RSVD1,
- RS_OP_DRA_SGL,
+ RS_OP_RSVD,
+ RS_OP_RSVD_DRA_SGL,
RS_OP_CTRL
};
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
struct rs_sge data_buf;
};
-union rs_wr_id {
- uint64_t wr_id;
- struct {
- uint32_t reserved; /* sqe_count; */
- uint32_t length;
- };
+enum {
+ RS_WR_RECV,
+ RS_WR_DATA,
+ RS_WR_CTRL,
+ RS_WR_DISCONNECT
};
#define RS_RECV_WR_ID (~((uint64_t) 0))
pthread_mutex_unlock(&mut);
}
-/*
- * We currently generate a completion per send. sqe_count = 1
- */
-static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length)
-{
- union rs_wr_id wrid;
- /* wrid.reserved = sqe_count; */
- wrid.length = length;
- return wrid;
-}
-
static int rs_insert(struct rsocket *rs)
{
pthread_mutex_lock(&mut);
return rs_do_connect(rs);
}
-static void rs_shutdown_state(struct rsocket *rs, int state)
-{
- rs->state &= ~state;
- if (rs->state == rs_connected)
- rs->state = rs_disconnected;
-}
-
-static int rs_post_write(struct rsocket *rs, uint64_t wr_id,
+static int rs_post_write(struct rsocket *rs,
struct ibv_sge *sgl, int nsge,
uint32_t imm_data, int flags,
uint64_t addr, uint32_t rkey)
{
struct ibv_send_wr wr, *bad;
- wr.wr_id = wr_id;
+ wr.wr_id = (uint64_t) imm_data;
wr.next = NULL;
wr.sg_list = sgl;
wr.num_sge = nsge;
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
*/
-static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id,
+static int rs_write_data(struct rsocket *rs,
struct ibv_sge *sgl, int nsge,
- uint32_t imm_data, int flags)
+ uint32_t length, int flags)
{
uint64_t addr;
uint32_t rkey;
rs->sseq_no++;
rs->sqe_avail--;
- rs->sbuf_bytes_avail -= wr_id.length;
+ rs->sbuf_bytes_avail -= length;
addr = rs->target_sgl[rs->target_sge].addr;
rkey = rs->target_sgl[rs->target_sge].key;
- rs->target_sgl[rs->target_sge].addr += wr_id.length;
- rs->target_sgl[rs->target_sge].length -= wr_id.length;
+ rs->target_sgl[rs->target_sge].addr += length;
+ rs->target_sgl[rs->target_sge].length -= length;
if (!rs->target_sgl[rs->target_sge].length) {
if (++rs->target_sge == RS_SGL_SIZE)
rs->target_sge = 0;
}
- return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey);
+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+ flags, addr, rkey);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
ibsge.lkey = 0;
ibsge.length = sizeof(sge);
- rs_post_write(rs, 0, &ibsge, 1,
+ rs_post_write(rs, &ibsge, 1,
rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
IBV_SEND_INLINE,
rs->remote_sgl.addr +
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
- rs_post_write(rs, 0, NULL, 0,
+ rs_post_write(rs, NULL, 0,
rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
}
}
static int rs_poll_cq(struct rsocket *rs)
{
struct ibv_wc wc;
- union rs_wr_id *wr_id;
uint32_t imm_data;
int ret, rcnt = 0;
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
- rs_shutdown_state(rs, rs_connect_rd);
+ rs->state &= ~rs_connect_rd;
}
break;
default:
break;
}
} else {
- if (wc.wr_id) {
- wr_id = (union rs_wr_id *) &wc.wr_id;
- rs->sqe_avail++; /* += wr_id->sqe_count; */
- rs->sbuf_bytes_avail += wr_id->length;
- } else {
+ switch (rs_msg_op((uint32_t) wc.wr_id)) {
+ case RS_OP_SGL:
rs->ctrl_avail++;
+ break;
+ case RS_OP_CTRL:
+ rs->ctrl_avail++;
+ if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
+ rs->state = rs_disconnected;
+ break;
+ default:
+ rs->sqe_avail++;
+ rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
+ break;
}
if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
rs->state = rs_error;
return rs_can_send(rs) || !(rs->state & rs_connect_wr);
}
-static int rs_can_send_ctrl(struct rsocket *rs)
+static int rs_conn_can_send_ctrl(struct rsocket *rs)
{
- return rs->ctrl_avail;
+ return rs->ctrl_avail || !(rs->state & rs_connected);
}
static int rs_have_rdata(struct rsocket *rs)
return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
}
-static int rs_all_sends_done(struct rsocket *rs)
+static int rs_conn_all_sends_done(struct rsocket *rs)
{
- return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size;
+ return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) ||
+ !(rs->state & rs_connected);
}
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
sge.addr = (uintptr_t) buf;
sge.length = xfer_size;
sge.lkey = 0;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size),
- IBV_SEND_INLINE);
+ ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE);
} else if (xfer_size <= rs_sbuf_left(rs)) {
memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
rs->ssgl[0].length = xfer_size;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 1,
- rs_msg_set(RS_OP_DATA, xfer_size), 0);
+ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0);
if (xfer_size < rs_sbuf_left(rs))
rs->ssgl[0].addr += xfer_size;
else
rs->ssgl[0].length);
rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 2,
- rs_msg_set(RS_OP_DATA, xfer_size), 0);
+ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0);
rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
}
if (ret)
rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr,
&cur_iov, &offset, xfer_size);
rs->ssgl[0].length = xfer_size;
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 1,
- rs_msg_set(RS_OP_DATA, xfer_size),
+ ret = rs_write_data(rs, rs->ssgl, 1, xfer_size,
xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
if (xfer_size < rs_sbuf_left(rs))
rs->ssgl[0].addr += xfer_size;
&offset, rs->ssgl[0].length);
rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length);
- ret = rs_write_data(rs, rs_wrid(1, xfer_size),
- rs->ssgl, 2,
- rs_msg_set(RS_OP_DATA, xfer_size),
+ ret = rs_write_data(rs, rs->ssgl, 2, xfer_size,
xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0);
rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
}
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
- rs_shutdown_state(rs, rs_connect_rd);
+ rs->state &= ~rs_connect_rd;
return 0;
}
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
- rs->state = rs_disconnected;
+ rs->state &= ~(rs_connect_rd | rs_connect_wr);
} else {
- rs_shutdown_state(rs, rs_connect_wr);
- ctrl = (rs->state & rs_connected) ?
+ rs->state &= ~rs_connect_wr;
+ ctrl = (rs->state & rs_connect_rd) ?
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
- if (!rs_can_send_ctrl(rs)) {
- ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
+ if (!rs->ctrl_avail) {
+ ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl);
if (ret)
return ret;
}
- rs->ctrl_avail--;
- ret = rs_post_write(rs, 0, NULL, 0,
- rs_msg_set(RS_OP_CTRL, ctrl),
- 0, 0, 0);
+ if ((rs->state & rs_connected) && rs->ctrl_avail) {
+ rs->ctrl_avail--;
+ ret = rs_post_write(rs, NULL, 0,
+ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+ }
}
- if (!rs_all_sends_done(rs) && !(rs->state & rs_error))
- rs_process_cq(rs, 0, rs_all_sends_done);
+ if (rs->state & rs_connected)
+ rs_process_cq(rs, 0, rs_conn_all_sends_done);
if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
rs_set_nonblocking(rs, 1);