From: Sean Hefty Date: Sat, 13 Apr 2013 19:22:55 +0000 (-0700) Subject: Refresh of rs-iwarp X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=a4d1c071e5f69a269ae028c53cd2d5c8330886f4;p=~shefty%2Flibrdmacm.git Refresh of rs-iwarp --- diff --git a/src/rsocket.c b/src/rsocket.c index de15f58b..4b3505ae 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -123,6 +123,7 @@ enum { #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) #define rs_msg_op(imm_data) (imm_data >> 29) #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) +#define RS_MSG_SIZE sizeof(uint32_t) #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63) #define rs_send_wr_id(data) ((uint64_t) data) @@ -751,24 +752,22 @@ static inline int rs_post_recv(struct rsocket *rs) struct ibv_recv_wr wr, *bad; struct ibv_sge sge; - wr.wr_id = rs_recv_wr_id(0); wr.next = NULL; if (!(rs->opts & RS_OPT_MSG_SEND)) { + wr.wr_id = rs_recv_wr_id(0); wr.sg_list = NULL; wr.num_sge = 0; } else { + wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index); sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size + (rs->rbuf_msg_index * RS_MSG_SIZE); - sge.length = sizeof(uint32_t); + sge.length = RS_MSG_SIZE; sge.lkey = rs->rmr->lkey; wr.sg_list = &sge; wr.num_sge = 1; if(++rs->rbuf_msg_index == rs->rq_size) rs->rbuf_msg_index = 0; - /* TODO we need wr_id to reference back to receive buffer, so we can - * retrieve the incoming message from the buffer - */ } return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); @@ -800,7 +799,7 @@ static int rs_create_ep(struct rsocket *rs) int i, ret; rs_set_qp_size(rs); - if (rs->cm_id->context->device->transport_type == IBV_TRANSPORT_IWARP) + if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP) rs->opts |= RS_OPT_MSG_SEND; ret = rs_init_bufs(rs); if (ret) @@ -1535,6 +1534,21 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return ret; } +/* +TODO: +if MSG_SEND opt is set + if !sgl + post send with imm_data inline + else + post rdma write + pst send with imm_data inline +else + existing code flow - rdma write with immediate + +post_write - rdma write only +post msg - immediate data or send only +post write msg - rdma write with msg (imm or send after) +*/ static int rs_post_write_msg(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t imm_data, int flags, @@ -1555,6 +1569,29 @@ static int rs_post_write_msg(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int rs_post_msg(struct rsocket *rs, uint32_t msg) +{ + struct ibv_send_wr wr, *bad; + struct ibv_sge sge; + + wr.wr_id = rs_send_wr_id(msg); + wr.next = NULL; + if (rs->opts & RS_OPT_MSG_SEND) { + wr.sg_list = &sge; + wr.num_sge = 1; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_INLINE; + } else { + wr.sg_list = NULL; + wr.num_sge = 0; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = flags; + wr.imm_data = htonl(imm_data); + } + + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); +} + static int rs_post_write(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t wr_data, int flags, @@ -1713,7 +1750,7 @@ static void rs_update_credits(struct rsocket *rs) static int rs_poll_cq(struct rsocket *rs) { struct ibv_wc wc; - uint32_t imm_data; + uint32_t msg; int ret, rcnt = 0; while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) { @@ -1722,19 +1759,25 @@ static int rs_poll_cq(struct rsocket *rs) continue; rcnt++; - imm_data = ntohl(wc.imm_data); - switch (rs_msg_op(imm_data)) { + if (wc.wc_flags & IBV_WC_WITH_IMM) { + msg = ntohl(wc.imm_data); + } else { + msg = ((uint32_t *) rs->rbuf + rs->rbuf_size) + [rs_wr_data(wc.wr_id)]; + + } + switch (rs_msg_op(msg)) { case RS_OP_SGL: - rs->sseq_comp = (uint16_t) rs_msg_data(imm_data); + rs->sseq_comp = (uint16_t) rs_msg_data(msg); break; case RS_OP_IOMAP_SGL: /* The iomap was updated, that's nice to know. */ break; case RS_OP_CTRL: - if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { + if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; return 0; - } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { + } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { rs->state &= ~rs_readable; } break; @@ -1742,8 +1785,8 @@ static int rs_poll_cq(struct rsocket *rs) /* We really shouldn't be here. */ break; default: - rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data); - rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data); + rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg); + rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg); if (++rs->rmsg_tail == rs->rq_size + 1) rs->rmsg_tail = 0; break; @@ -2064,6 +2107,11 @@ static int rs_poll_all(struct rsocket *rs) */ static int rs_can_send(struct rsocket *rs) { + if (rs->opts & RS_OPT_MSG_SEND) { + return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && + (rs->sseq_no != rs->sseq_comp) && + (rs->target_sgl[rs->target_sge].length != 0); + } return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && (rs->sseq_no != rs->sseq_comp) && (rs->target_sgl[rs->target_sge].length != 0);