From: Sean Hefty Date: Thu, 11 Apr 2013 17:05:29 +0000 (-0700) Subject: rsocket: Add support for iWarp X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=6fd4b589ad02ce05a2fc4efae15f59bd103d50c2;p=~shefty%2Flibrdmacm.git rsocket: Add support for iWarp iWarp does not support RDMA writes with immediate data. Instead of sending messages using immediate data, allow the rsocket protocol to exchange messages using sends. Signed-off-by: Sean Hefty --- diff --git a/src/rsocket.c b/src/rsocket.c index ca771165..f529a3c9 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -123,6 +123,7 @@ enum { #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) #define rs_msg_op(imm_data) (imm_data >> 29) #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) +#define RS_MSG_SIZE sizeof(uint32_t) #define RS_WR_ID_FLAG_RECV (((uint64_t) 1) << 63) #define rs_send_wr_id(data) ((uint64_t) data) @@ -207,7 +208,12 @@ enum rs_state { rs_error = 0x2000, }; -#define RS_OPT_SWAP_SGL 1 +#define RS_OPT_SWAP_SGL (1 << 0) +/* + * iWarp does not support RDMA write with immediate data. For iWarp, we + * transfer rsocket messages as inline sends. + */ +#define RS_OPT_MSG_SEND (1 << 1) union socket_addr { struct sockaddr sa; @@ -284,6 +290,7 @@ struct rsocket { volatile struct rs_sge *target_sgl; struct rs_iomap *target_iomap; + int rbuf_msg_index; int rbuf_bytes_avail; int rbuf_free_offset; int rbuf_offset; @@ -600,15 +607,15 @@ static void rs_set_qp_size(struct rsocket *rs) if (rs->sq_size > max_size) rs->sq_size = max_size; - else if (rs->sq_size < 2) - rs->sq_size = 2; + else if (rs->sq_size < 4) + rs->sq_size = 4; if (rs->sq_size <= (RS_QP_CTRL_SIZE << 2)) - rs->ctrl_avail = 1; + rs->ctrl_avail = 2; if (rs->rq_size > max_size) rs->rq_size = max_size; - else if (rs->rq_size < 2) - rs->rq_size = 2; + else if (rs->rq_size < 4) + rs->rq_size = 4; } static void ds_set_qp_size(struct rsocket *rs) @@ -635,6 +642,7 @@ static void ds_set_qp_size(struct rsocket *rs) static int rs_init_bufs(struct rsocket *rs) { + uint32_t rbuf_msg_size; size_t len; rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); @@ -664,11 +672,14 @@ static int rs_init_bufs(struct rsocket *rs) if (rs->target_iomap_size) rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE); - rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); + rbuf_msg_size = rs->rbuf_size; + if (rs->opts & RS_OPT_MSG_SEND) + rbuf_msg_size += rs->rq_size * RS_MSG_SIZE; + rs->rbuf = calloc(rbuf_msg_size, 1); if (!rs->rbuf) return ERR(ENOMEM); - rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size); + rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size); if (!rs->rmr) return -1; @@ -685,8 +696,7 @@ static int rs_init_bufs(struct rsocket *rs) static int ds_init_bufs(struct ds_qp *qp) { - qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), - sizeof(*qp->rbuf)); + qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1); if (!qp->rbuf) return ERR(ENOMEM); @@ -740,11 +750,25 @@ err1: static inline int rs_post_recv(struct rsocket *rs) { struct ibv_recv_wr wr, *bad; + struct ibv_sge sge; - wr.wr_id = rs_recv_wr_id(0); wr.next = NULL; - wr.sg_list = NULL; - wr.num_sge = 0; + if (!(rs->opts & RS_OPT_MSG_SEND)) { + wr.wr_id = rs_recv_wr_id(0); + wr.sg_list = NULL; + wr.num_sge = 0; + } else { + wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index); + sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size + + (rs->rbuf_msg_index * RS_MSG_SIZE); + sge.length = RS_MSG_SIZE; + sge.lkey = rs->rmr->lkey; + + wr.sg_list = &sge; + wr.num_sge = 1; + if(++rs->rbuf_msg_index == rs->rq_size) + rs->rbuf_msg_index = 0; + } return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } @@ -775,6 +799,8 @@ static int rs_create_ep(struct rsocket *rs) int i, ret; rs_set_qp_size(rs); + if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP) + rs->opts |= RS_OPT_MSG_SEND; ret = rs_init_bufs(rs); if (ret) return ret; @@ -1508,22 +1534,28 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return ret; } -static int rs_post_write_msg(struct rsocket *rs, - struct ibv_sge *sgl, int nsge, - uint32_t imm_data, int flags, - uint64_t addr, uint32_t rkey) +static int rs_post_msg(struct rsocket *rs, uint32_t msg) { struct ibv_send_wr wr, *bad; + struct ibv_sge sge; - wr.wr_id = rs_send_wr_id(imm_data); + wr.wr_id = rs_send_wr_id(msg); wr.next = NULL; - wr.sg_list = sgl; - wr.num_sge = nsge; - wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; - wr.send_flags = flags; - wr.imm_data = htonl(imm_data); - wr.wr.rdma.remote_addr = addr; - wr.wr.rdma.rkey = rkey; + if (!(rs->opts & RS_OPT_MSG_SEND)) { + wr.sg_list = NULL; + wr.num_sge = 0; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = 0; + wr.imm_data = htonl(msg); + } else { + sge.addr = (uintptr_t) &msg; + sge.lkey = 0; + sge.length = sizeof msg; + wr.sg_list = &sge; + wr.num_sge = 1; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_INLINE; + } return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } @@ -1547,6 +1579,34 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int rs_post_write_msg(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint32_t msg, int flags, + uint64_t addr, uint32_t rkey) +{ + struct ibv_send_wr wr, *bad; + int ret; + + if (!(rs->opts & RS_OPT_MSG_SEND)) { + wr.wr_id = rs_send_wr_id(msg); + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr.send_flags = flags; + wr.imm_data = htonl(msg); + wr.wr.rdma.remote_addr = addr; + wr.wr.rdma.rkey = rkey; + + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); + } else { + ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey); + if (!ret) + ret = rs_post_msg(rs, msg); + return ret; + } +} + static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge, uint32_t wr_data) { @@ -1578,6 +1638,8 @@ static int rs_write_data(struct rsocket *rs, rs->sseq_no++; rs->sqe_avail--; + if (rs->opts & RS_OPT_MSG_SEND) + rs->sqe_avail--; rs->sbuf_bytes_avail -= length; addr = rs->target_sgl[rs->target_sge].addr; @@ -1615,6 +1677,8 @@ static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, rs->sseq_no++; rs->sqe_avail--; + if (rs->opts & RS_OPT_MSG_SEND) + rs->sqe_avail--; rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); @@ -1636,6 +1700,9 @@ static void rs_send_credits(struct rsocket *rs) rs->ctrl_avail--; rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1); if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) { + if (rs->opts & RS_OPT_MSG_SEND) + rs->ctrl_avail--; + if (!(rs->opts & RS_OPT_SWAP_SGL)) { sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset]; sge.key = rs->rmr->rkey; @@ -1664,17 +1731,21 @@ static void rs_send_credits(struct rsocket *rs) if (++rs->remote_sge == rs->remote_sgl.length) rs->remote_sge = 0; } else { - rs_post_write_msg(rs, NULL, 0, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), - 0, 0, 0); + rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size)); } } static int rs_give_credits(struct rsocket *rs) { - return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || - ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && - rs->ctrl_avail && (rs->state & rs_connected); + if (!(rs->opts & RS_OPT_MSG_SEND)) { + return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || + ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && + rs->ctrl_avail && (rs->state & rs_connected); + } else { + return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || + ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && + (rs->ctrl_avail > 1) && (rs->state & rs_connected); + } } static void rs_update_credits(struct rsocket *rs) @@ -1686,7 +1757,7 @@ static void rs_update_credits(struct rsocket *rs) static int rs_poll_cq(struct rsocket *rs) { struct ibv_wc wc; - uint32_t imm_data; + uint32_t msg; int ret, rcnt = 0; while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) { @@ -1695,19 +1766,25 @@ static int rs_poll_cq(struct rsocket *rs) continue; rcnt++; - imm_data = ntohl(wc.imm_data); - switch (rs_msg_op(imm_data)) { + if (wc.wc_flags & IBV_WC_WITH_IMM) { + msg = ntohl(wc.imm_data); + } else { + msg = ((uint32_t *) rs->rbuf + rs->rbuf_size) + [rs_wr_data(wc.wr_id)]; + + } + switch (rs_msg_op(msg)) { case RS_OP_SGL: - rs->sseq_comp = (uint16_t) rs_msg_data(imm_data); + rs->sseq_comp = (uint16_t) rs_msg_data(msg); break; case RS_OP_IOMAP_SGL: /* The iomap was updated, that's nice to know. */ break; case RS_OP_CTRL: - if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { + if (rs_msg_data(msg) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; return 0; - } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { + } else if (rs_msg_data(msg) == RS_CTRL_SHUTDOWN) { rs->state &= ~rs_readable; } break; @@ -1715,8 +1792,8 @@ static int rs_poll_cq(struct rsocket *rs) /* We really shouldn't be here. */ break; default: - rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data); - rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data); + rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg); + rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg); if (++rs->rmsg_tail == rs->rq_size + 1) rs->rmsg_tail = 0; break; @@ -2037,9 +2114,15 @@ static int rs_poll_all(struct rsocket *rs) */ static int rs_can_send(struct rsocket *rs) { - return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && - (rs->sseq_no != rs->sseq_comp) && - (rs->target_sgl[rs->target_sge].length != 0); + if (!(rs->opts & RS_OPT_MSG_SEND)) { + return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && + (rs->sseq_no != rs->sseq_comp) && + (rs->target_sgl[rs->target_sge].length != 0); + } else { + return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && + (rs->sseq_no != rs->sseq_comp) && + (rs->target_sgl[rs->target_sge].length != 0); + } } static int ds_can_send(struct rsocket *rs) @@ -3011,8 +3094,7 @@ int rshutdown(int socket, int how) if ((rs->state & rs_connected) && rs->ctrl_avail) { rs->ctrl_avail--; - ret = rs_post_write_msg(rs, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); + ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl)); } }