From 726f0d0e29b67a1fe3450571a9c03f0b6557aa06 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Fri, 27 Jul 2012 17:35:36 -0700 Subject: [PATCH] Refresh of fast-disc --- src/rsocket.c | 134 +++++++++++++++++++++----------------------------- 1 file changed, 57 insertions(+), 77 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index 1e391b30..908b00c8 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -83,12 +83,12 @@ static uint32_t polling_time = 10; enum { RS_OP_DATA, - RS_OP_DATA_MORE, - RS_OP_DRA, - RS_OP_DRA_MORE, + RS_OP_RSVD_DATA_MORE, + RS_OP_RSVD_DRA, + RS_OP_RSVD_DRA_MORE, RS_OP_SGL, - RS_OP_RSVD1, - RS_OP_DRA_SGL, + RS_OP_RSVD, + RS_OP_RSVD_DRA_SGL, RS_OP_CTRL }; #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) @@ -124,12 +124,11 @@ struct rs_conn_data { struct rs_sge data_buf; }; -union rs_wr_id { - uint64_t wr_id; - struct { - uint32_t reserved; /* sqe_count; */ - uint32_t length; - }; +enum { + RS_WR_RECV, + RS_WR_DATA, + RS_WR_CTRL, + RS_WR_DISCONNECT }; #define RS_RECV_WR_ID (~((uint64_t) 0)) @@ -264,17 +263,6 @@ out: pthread_mutex_unlock(&mut); } -/* - * We currently generate a completion per send. sqe_count = 1 - */ -static union rs_wr_id rs_wrid(uint32_t sqe_count, uint32_t length) -{ - union rs_wr_id wrid; - /* wrid.reserved = sqe_count; */ - wrid.length = length; - return wrid; -} - static int rs_insert(struct rsocket *rs) { pthread_mutex_lock(&mut); @@ -772,21 +760,14 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return rs_do_connect(rs); } -static void rs_shutdown_state(struct rsocket *rs, int state) -{ - rs->state &= ~state; - if (rs->state == rs_connected) - rs->state = rs_disconnected; -} - -static int rs_post_write(struct rsocket *rs, uint64_t wr_id, +static int rs_post_write(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t imm_data, int flags, uint64_t addr, uint32_t rkey) { struct ibv_send_wr wr, *bad; - wr.wr_id = wr_id; + wr.wr_id = (uint64_t) imm_data; wr.next = NULL; wr.sg_list = sgl; wr.num_sge = nsge; @@ -803,29 +784,30 @@ static int rs_post_write(struct rsocket *rs, uint64_t wr_id, * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. */ -static int rs_write_data(struct rsocket *rs, union rs_wr_id wr_id, +static int rs_write_data(struct rsocket *rs, struct ibv_sge *sgl, int nsge, - uint32_t imm_data, int flags) + uint32_t length, int flags) { uint64_t addr; uint32_t rkey; rs->sseq_no++; rs->sqe_avail--; - rs->sbuf_bytes_avail -= wr_id.length; + rs->sbuf_bytes_avail -= length; addr = rs->target_sgl[rs->target_sge].addr; rkey = rs->target_sgl[rs->target_sge].key; - rs->target_sgl[rs->target_sge].addr += wr_id.length; - rs->target_sgl[rs->target_sge].length -= wr_id.length; + rs->target_sgl[rs->target_sge].addr += length; + rs->target_sgl[rs->target_sge].length -= length; if (!rs->target_sgl[rs->target_sge].length) { if (++rs->target_sge == RS_SGL_SIZE) rs->target_sge = 0; } - return rs_post_write(rs, wr_id.wr_id, sgl, nsge, imm_data, flags, addr, rkey); + return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), + flags, addr, rkey); } static uint32_t rs_sbuf_left(struct rsocket *rs) @@ -856,7 +838,7 @@ static void rs_send_credits(struct rsocket *rs) ibsge.lkey = 0; ibsge.length = sizeof(sge); - rs_post_write(rs, 0, &ibsge, 1, + rs_post_write(rs, &ibsge, 1, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), IBV_SEND_INLINE, rs->remote_sgl.addr + @@ -870,7 +852,7 @@ static void rs_send_credits(struct rsocket *rs) if (++rs->remote_sge == rs->remote_sgl.length) rs->remote_sge = 0; } else { - rs_post_write(rs, 0, NULL, 0, + rs_post_write(rs, NULL, 0, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0); } } @@ -891,7 +873,6 @@ static void rs_update_credits(struct rsocket *rs) static int rs_poll_cq(struct rsocket *rs) { struct ibv_wc wc; - union rs_wr_id *wr_id; uint32_t imm_data; int ret, rcnt = 0; @@ -911,7 +892,7 @@ static int rs_poll_cq(struct rsocket *rs) rs->state = rs_disconnected; return 0; } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { - rs_shutdown_state(rs, rs_connect_rd); + rs->state &= ~rs_connect_rd; } break; default: @@ -922,12 +903,19 @@ static int rs_poll_cq(struct rsocket *rs) break; } } else { - if (wc.wr_id) { - wr_id = (union rs_wr_id *) &wc.wr_id; - rs->sqe_avail++; /* += wr_id->sqe_count; */ - rs->sbuf_bytes_avail += wr_id->length; - } else { + switch (rs_msg_op((uint32_t) wc.wr_id)) { + case RS_OP_SGL: rs->ctrl_avail++; + break; + case RS_OP_CTRL: + rs->ctrl_avail++; + if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) + rs->state = rs_disconnected; + break; + default: + rs->sqe_avail++; + rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); + break; } if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) { rs->state = rs_error; @@ -1075,9 +1063,9 @@ static int rs_conn_can_send(struct rsocket *rs) return rs_can_send(rs) || !(rs->state & rs_connect_wr); } -static int rs_can_send_ctrl(struct rsocket *rs) +static int rs_conn_can_send_ctrl(struct rsocket *rs) { - return rs->ctrl_avail; + return rs->ctrl_avail || !(rs->state & rs_connected); } static int rs_have_rdata(struct rsocket *rs) @@ -1090,9 +1078,10 @@ static int rs_conn_have_rdata(struct rsocket *rs) return rs_have_rdata(rs) || !(rs->state & rs_connect_rd); } -static int rs_all_sends_done(struct rsocket *rs) +static int rs_conn_all_sends_done(struct rsocket *rs) { - return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size; + return ((rs->sqe_avail + rs->ctrl_avail) == rs->sq_size) || + !(rs->state & rs_connected); } static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) @@ -1281,15 +1270,11 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) sge.addr = (uintptr_t) buf; sge.length = xfer_size; sge.lkey = 0; - ret = rs_write_data(rs, rs_wrid(1, xfer_size), - &sge, 1, rs_msg_set(RS_OP_DATA, xfer_size), - IBV_SEND_INLINE); + ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE); } else if (xfer_size <= rs_sbuf_left(rs)) { memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); rs->ssgl[0].length = xfer_size; - ret = rs_write_data(rs, rs_wrid(1, xfer_size), - rs->ssgl, 1, - rs_msg_set(RS_OP_DATA, xfer_size), 0); + ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0); if (xfer_size < rs_sbuf_left(rs)) rs->ssgl[0].addr += xfer_size; else @@ -1300,9 +1285,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs->ssgl[0].length); rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); - ret = rs_write_data(rs, rs_wrid(1, xfer_size), - rs->ssgl, 2, - rs_msg_set(RS_OP_DATA, xfer_size), 0); + ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0); rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; } if (ret) @@ -1395,9 +1378,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov, &offset, xfer_size); rs->ssgl[0].length = xfer_size; - ret = rs_write_data(rs, rs_wrid(1, xfer_size), - rs->ssgl, 1, - rs_msg_set(RS_OP_DATA, xfer_size), + ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0); if (xfer_size < rs_sbuf_left(rs)) rs->ssgl[0].addr += xfer_size; @@ -1409,9 +1390,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags &offset, rs->ssgl[0].length); rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length); - ret = rs_write_data(rs, rs_wrid(1, xfer_size), - rs->ssgl, 2, - rs_msg_set(RS_OP_DATA, xfer_size), + ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0); rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; } @@ -1724,7 +1703,7 @@ int rshutdown(int socket, int how) rs = idm_at(&idm, socket); if (how == SHUT_RD) { - rs_shutdown_state(rs, rs_connect_rd); + rs->state &= ~rs_connect_rd; return 0; } @@ -1734,26 +1713,27 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) { if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; - rs->state = rs_disconnected; + rs->state &= ~(rs_connect_rd | rs_connect_wr); } else { - rs_shutdown_state(rs, rs_connect_wr); - ctrl = (rs->state & rs_connected) ? + rs->state &= ~rs_connect_wr; + ctrl = (rs->state & rs_connect_rd) ? RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; } - if (!rs_can_send_ctrl(rs)) { - ret = rs_process_cq(rs, 0, rs_can_send_ctrl); + if (!rs->ctrl_avail) { + ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl); if (ret) return ret; } - rs->ctrl_avail--; - ret = rs_post_write(rs, 0, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), - 0, 0, 0); + if ((rs->state & rs_connected) && rs->ctrl_avail) { + rs->ctrl_avail--; + ret = rs_post_write(rs, NULL, 0, + rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); + } } - if (!rs_all_sends_done(rs) && !(rs->state & rs_error)) - rs_process_cq(rs, 0, rs_all_sends_done); + if (rs->state & rs_connected) + rs_process_cq(rs, 0, rs_conn_all_sends_done); if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) rs_set_nonblocking(rs, 1); -- 2.45.2