From bdafce023ae945e1c2fd739a60f60d005c8baef8 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Mon, 26 Nov 2012 12:10:16 -0800 Subject: [PATCH] Refresh of dsocket --- docs/rsocket | 47 ++--- src/rsocket.c | 509 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 378 insertions(+), 178 deletions(-) diff --git a/docs/rsocket b/docs/rsocket index 03d49df7..a6602084 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -194,7 +194,7 @@ application's buffer. Datagram Overview ----------------- -THe rsocket API supports datagram sockets. Datagram support is handled through an +The rsocket API supports datagram sockets. Datagram support is handled through an entirely different protocol and internal implementation. Unlike connected rsockets, datagram rsockets are not necessarily bound to a network (IP) address. A datagram socket may use any number of network (IP) addresses, including those which map to @@ -204,32 +204,19 @@ UDP socket, plus zero or more UD QPs. Rsockets uses headers inserted before user data sent over UDP sockets to resolve remote UD QP numbers. When a user first attempts to send a datagram to a remote -address (IP and UDP port), rsockets will take the following steps. First, it -will allocate and store the destination address into a lookup table for future -use. Then it will resolve which local network address should be used when sending -to the specified destination. It will allocate a UD QP on the RDMA device -associated with the local address. Finally, rsockets will send the user's -datagram to the remote UDP socket, but inserting a header before the datagram. -The header specifies the UD QP number associated with the local network address -(IP and UDP port) of the send. - -Under many circumstances, the rsocket UDP header also provides enough path -information for the receiver to send the reply using a UD QP. However, certain -fabric topologies may require contacting a subnet administrator. Whenever -rsockets lacks sufficient information to send data to a remote peer using -a UD QP, it will automatically fall back to using a standard UDP socket. This -helps minimize the latency for performing a given send, while lenghty address -and route resolution protocols complete. - -Currently, rsockets allocates a single UD QP, with an infrastructure is provided -to support more. Future enhancements may add support for multiple UD QPs, each -associated with a single network (IP) address. Multiplexing different network -addresses over a single UD QP and using shared receive queues are other possible -enhancements. - -Because rsockets may use multiple UD QPs, buffer management can become an issue. -The total size of receive buffers is specified by the mem_default configuration -files, but may be overridden with rsetsockopt. The buffer is divided into -MTU sized messages and distributed among the allocated QPs. As new QPs are -created, the receive buffers may be redistributed by posting loopback sends -on a QP in order to free the receive buffers for reposting to a different QP. \ No newline at end of file +address (IP and UDP port), rsockets will take the following steps: + +1. Store the destination address into a lookup table. +2. Resolve which local network address should be used when sending + to the specified destination. +3. Allocate a UD QP on the RDMA device associated with the local address. +4. Send the user's datagram to the remote UDP socket. + +A header is inserted before the user's datagram. The header specifies the +UD QP number associated with the local network address (IP and UDP port) of +the send. + +A service thread is used to process messages received on the UDP socket. This +thread updates the rsocket lookup tables with the remote QPN and path record +data. The service thread forwards data received on the UDP socket to an +rsocket QP. \ No newline at end of file diff --git a/src/rsocket.c b/src/rsocket.c index 0695d12d..a81b8f39 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -46,6 +46,7 @@ #include #include #include +#include #include #include @@ -116,6 +117,14 @@ enum { #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) #define rs_msg_op(imm_data) (imm_data >> 29) #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) +#define RS_RECV_WR_ID (~((uint64_t) 0)) + +#define DS_WR_RECV 0xFFFFFFFF +#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length) +#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV) +#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32)) +#define ds_wr_length(wr_id) ((uint32_t) wr_id) +#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV) enum { RS_CTRL_DISCONNECT, @@ -129,8 +138,14 @@ struct rs_msg { struct ds_qp; -struct ds_msg { - struct ds_qp *qp; +struct ds_rmsg { + struct ds_qp *qp; + uint32_t offset; + uint32_t length; +}; + +struct ds_smsg { + struct ds_smsg *next; }; struct rs_sge { @@ -167,8 +182,6 @@ struct rs_conn_data { struct rs_sge data_buf; }; -#define RS_RECV_WR_ID (~((uint64_t) 0)) - /* * rsocket states are ordered as passive, connecting, connected, disconnected. */ @@ -203,12 +216,11 @@ struct ds_qp { struct rsocket *rs; struct rdma_cm_id *cm_id; + struct ibv_mr *smr; struct ibv_mr *rmr; uint8_t *rbuf; - struct ibv_mr *smr; - uint16_t lid; - uint8_t sl; + int cq_armed; }; struct ds_dest { @@ -231,6 +243,13 @@ struct rsocket { /* data stream */ struct { struct rdma_cm_id *cm_id; + uint64_t tcp_opts; + + int ctrl_avail; + uint16_t sseq_no; + uint16_t sseq_comp; + uint16_t rseq_no; + uint16_t rseq_comp; int remote_sge; struct rs_sge remote_sgl; @@ -242,63 +261,58 @@ struct rsocket { void *target_buffer_list; volatile struct rs_sge *target_sgl; struct rs_iomap *target_iomap; + + int rbuf_bytes_avail; + int rbuf_free_offset; + int rbuf_offset; + struct ibv_mr *rmr; + uint8_t *rbuf; + + int sbuf_bytes_avail; + struct ibv_mr *smr; + struct ibv_sge ssgl[2]; }; /* datagram */ struct { - dlist_t qp_list; + struct ds_qp *qp_list; void *dest_map; struct ds_dest *conn_dest; - struct pollfd *fds; - nfds_t nfds; - int dsock; - int sbytes_needed; + + int udp_sock; + int epfd; + int rqe_avail; + struct ds_smsg *smsg_free; }; }; int opts; long fd_flags; uint64_t so_opts; - uint64_t tcp_opts; uint64_t ipv6_opts; int state; int cq_armed; int retries; int err; - int ctrl_avail; int sqe_avail; - int sbuf_bytes_avail; - uint16_t sseq_no; - uint16_t sseq_comp; + uint32_t sbuf_size; uint16_t sq_size; uint16_t sq_inline; + uint32_t rbuf_size; uint16_t rq_size; - uint16_t rseq_no; - uint16_t rseq_comp; - int rbuf_bytes_avail; - int rbuf_free_offset; - int rbuf_offset; int rmsg_head; int rmsg_tail; union { struct rs_msg *rmsg; - struct ds_msg *dmsg; + struct ds_rmsg *dmsg; }; + uint8_t *sbuf; struct rs_iomap_mr *remote_iomappings; dlist_entry iomap_list; dlist_entry iomap_queue; int iomap_pending; - - uint32_t rbuf_size; - struct ibv_mr *rmr; - uint8_t *rbuf; - - uint32_t sbuf_size; - struct ibv_mr *smr; - struct ibv_sge ssgl[2]; - uint8_t *sbuf; }; #define DS_UDP_TAG 0x5555555555555555ULL @@ -306,12 +320,32 @@ struct rsocket { struct ds_udp_header { uint64_t tag; uint8_t version; - uint8_t sl; - uint16_t slid; + uint8_t reserved[3]; uint32_t qpn; /* upper 8-bits reserved */ }; +#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list) + +static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) +{ + if (!rs->qp_list) + list_init(&qp->list); + else + list_insert_head(&qp->list, &rs->qp_list->list); + rs->qp_list = *qp; +} + +static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp) +{ + if (qp->list.next != qp->list) { + rs->qp_list = ds_next_qp(qp); + dlist_remove(&qp->list); + } else { + rs->qp_list = NULL; + } +} + static int rs_svc_run(void *arg) { return 0; @@ -468,7 +502,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->type = type; rs->index = -1; - rs->dsock = -1; + rs->udp_sock = -1; + rs->epfd = -1; + if (inherited_rs) { rs->sbuf_size = inherited_rs->sbuf_size; rs->rbuf_size = inherited_rs->rbuf_size; @@ -496,18 +532,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) return rs; } -/* - * TODO: Support datagram rsockets - */ static int rs_set_nonblocking(struct rsocket *rs, long arg) { + struct ds_qp *qp; int ret = 0; - if (rs->cm_id->recv_cq_channel) - ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); + if (rs->type == SOCK_STREAM) { + if (rs->cm_id->recv_cq_channel) + ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); - if (!ret && rs->state < rs_connected) - ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); + if (!ret && rs->state < rs_connected) + ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); + } else { + ret = fcntl(rs->epfd, F_SETFL, arg); + + if (!ret && rs->qp_list) { + qp = rs->qp_list; + do { + ret = fcntl(qp->cm_id->recv_cq_channel->fd, + F_SETFL, arg); + qp = ds_next_qp(qp); + } while (qp != rs->qp_list && !ret); + } + } return ret; } @@ -531,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs) rs->rq_size = 2; } +static void ds_set_qp_size(struct rsocket *rs) +{ + uint16_t max_size; + + max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); + + if (rs->sq_size > max_size) + rs->sq_size = max_size; + if (rs->rq_size > max_size) + rs->rq_size = max_size; + + if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT)) + rs->rq_size = rs->rbuf_size / RS_SNDLOWAT; + else + rs->rbuf_size = rs->rq_size * RS_SNDLOWAT; + + if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT)) + rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; + else + rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; +} + static int rs_init_bufs(struct rsocket *rs) { size_t len; rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); if (!rs->rmsg) - return -1; + return ERR(ENOEMEM); rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf)); if (!rs->sbuf) - return -1; + return ERR(ENOEMEM); rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size); if (!rs->smr) @@ -551,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs) sizeof(*rs->target_iomap) * rs->target_iomap_size; rs->target_buffer_list = malloc(len); if (!rs->target_buffer_list) - return -1; + return ERR(ENOEMEM); rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); if (!rs->target_mr) @@ -564,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs) rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); if (!rs->rbuf) - return -1; + return ERR(ENOEMEM); rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size); if (!rs->rmr) @@ -648,7 +717,7 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) sge.length = RS_SNDLOWAT; sge.lkey = qp->rmr; - wr.wr_id = RS_RECV_WR_ID; + wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf)); wr.next = NULL; wr.sg_list = &sge; wr.num_sge = 1; @@ -736,7 +805,8 @@ static void ds_free_qp(struct ds_qp *qp) if (qp->cm_id) { if (qp->cm_id->qp) { - dlist_remove(&qp->list); + epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, + qp->cm_id->recv_cq_channel->fd, NULL); rdma_destroy_qp(qp->cm_id); } rdma_destroy_id(qp->cm_id); @@ -744,31 +814,32 @@ static void ds_free_qp(struct ds_qp *qp) free(qp); } -static void ds_free_qps(struct rsocket *rs) +static void ds_free(struct rsocket *rs) { struct ds_qp *qp; - dlist_t *entry; - while (!dlist_empty(&rs->qp_list)) { - qp = container_of(rs->qp_list.next, struct ds_qp, list); - ds_free_qp(qp); - } -} - -static void ds_free(struct rsocket *rs) -{ if (rs->state & (rs_readable | rs_writable)) rs_svc_remove(rs); - if (rs->dsock >= 0) - close(rs->dsock); + if (rs->udp_sock >= 0) + close(rs->udp_sock); if (rs->index >= 0) rs_remove(rs); - ds_free_qps(rs); - if (rs->fds) - free(rs->fds); + if (rs->dmsg) + free(rs->dmsg); + + if (rs->smsg) + free(rs->smsg); + + while (rs->qp_list) { + ds_remove_qp(rs, rs->qp_list); + ds_free_qp(rs->qp_list); + } + + if (rs->epfd >= 0) + close(rs->epfd); if (rs->sbuf) free(rs->sbuf); @@ -875,15 +946,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) static int ds_init(struct rsocket *rs, int domain) { - rs->dsock = socket(domain, SOCK_DGRAM, 0); - if (rs->dsock < 0) - return rs->dsock; + rs->udp_sock = socket(domain, SOCK_DGRAM, 0); + if (rs->udp_sock < 0) + return rs->udp_sock; - rs->fds = calloc(1, sizeof *fds); - if (!rs->fds) - return ERR(ENOMEM); - rs->nfds = 1; - dlist_init(&rs->qp_list); + rs->epfd = epoll_create(0); + if (rs->epfd < 0) + return rs->epfd; return 0; } @@ -916,7 +985,7 @@ int rsocket(int domain, int type, int protocol) if (ret) goto err; - index = rs->dsock; + index = rs->udp_sock; } ret = rs_insert(rs, index); @@ -941,7 +1010,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) if (!ret) rs->state = rs_bound; } else { - ret = bind(rs->dsock, addr, addrlen); + ret = bind(rs->udp_sock, addr, addrlen); if (!ret) { ret = rs_svc_insert(rs); if (!ret) @@ -1130,21 +1199,30 @@ connected: static int ds_init_ep(struct rsocket *rs) { - int ret; + struct ds_smsg *msg; + int i, ret; - rs_set_qp_size(rs); - if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT)) - rs->rq_size = rs->rbuf_size / RS_SNDLOWAT; - else - rs->rbuf_size = rs->rq_size * RS_SNDLOWAT; + ds_set_qp_size(rs); - rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf)); + rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); if (!rs->sbuf) return ERR(ENOMEM); - rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf; + rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); + if (!rs->dmsg) + return ERR(ENOEMEM); + rs->sbuf_bytes_avail = rs->sbuf_size; rs->sqe_avail = rs->sq_size; + rs->rqe_avail = rs->rq_size; + + rs->smsg_free = (struct ds_smsg *) rs->sbuf; + msg = rs->smsg_free; + for (i = 0; i < rs->sq_size - 1; i++) { + msg->next = (void *) msg + i * RS_SNDLOWAT; + msg = msg->next; + } + msg->next = NULL; ret = rs_svc_insert(rs); if (ret) @@ -1186,7 +1264,7 @@ static int ds_get_src_addr(struct rsocket *rs, uint16_t port; *src_len = sizeof src_addr; - ret = getsockname(rs->dsock, &src_addr->sa, src_len); + ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); if (ret || !rs_any_addr(src_addr)) return ret; @@ -1211,6 +1289,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, socklen_t addrlen, struct ds_qp **qp) { struct ibv_qp_init_attr qp_attr; + struct epoll_event event; int ret; *qp = calloc(1, sizeof(struct ds_qp)); @@ -1245,17 +1324,24 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, qp_attr.cap.max_send_sge = 2; qp_attr.cap.max_recv_sge = 1; qp_attr.cap.max_inline_data = rs->sq_inline; - ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); if (ret) - return ret; + goto err; + + event.events = EPOLLIN | EPOLLOUT; + event.data.ptr = *qp; + ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, + (*qp)->cm_id->recv_cq_channel->fd, &event); + if (ret) + goto err; for (i = 0; i < rs->rq_size; i++) { ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); if (ret) goto err; } - list_insert_head(&(*qp)->list, &rs->qp_list); + + ds_insert_qp(rs, *qp); return 0; err: ds_free_qp(*qp); @@ -1265,12 +1351,14 @@ err: static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr, socklen_t addrlen, struct ds_qp **qp) { - dlist_t *entry; + if (rs->qp_list) { + *qp = rs->qp_list; + do { + if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr) + return 0; - for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) { - *qp = container_of(entry, struct ds_qp, list); - if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr) - return 0; + *qp = ds_next_qp(*qp); + } while (*qp != rs->qp_list); } return ds_create_qp(rs, src_addr, addrlen, qp); @@ -1328,7 +1416,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) ret = rs_do_connect(rs); } else { fastlock_acquire(&rs->slock); - ret = connect(rs->dsock, addr, addrlen); + ret = connect(rs->udp_sock, addr, addrlen); if (!ret) ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); fastlock_release(&rs->slock); @@ -1428,9 +1516,12 @@ static int ds_send_data(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t length, int flags) { + uint64_t offset; + rs->sqe_avail--; rs->sbuf_bytes_avail -= length; - return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags); + offset = sgl->addr - (uintptr_t) rs->sbuf; + return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags); } static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, @@ -1688,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc return ret; } +/* + * Poll all CQs associated with a datagram rsocket. We need to drop any + * received messages that we do not have room to store. To limit drops, + * we only poll if we have room to store the receive or we need a send + * buffer. To ensure fairness, we poll the CQs round robin, remembering + * where we left off. + */ +static void ds_poll_cqs(struct rsocket *rs) +{ + struct ds_qp *qp; + struct ds_smsg *msg; + struct ibv_wc wc; + int ret, cnt; + + qp = rs->qp_list; + do { + cnt = 0; + do { + ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc); + if (ret <= 0) { + qp = ds_next_qp(qp); + continue; + } + + if (ds_wr_is_recv(wc.wr_id)) { + if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) { + rs->rqe_avail--; + rs->dmsg[rs->rmsg_tail].qp = qp; + if (++rs->rmsg_tail == rs->rq_size + 1) + rs->rmsg_tail = 0; + } else { + ds_post_recv(rs, qp, qp->rbuf + + ds_wr_offset(wc.wr_id)); + } + } else { + if (ds_wr_length(wc.wr_id) > rs->sq_inline) { + msg = (struct ds_smsg *) + (rs->sbuf + ds_wr_offset(wc.wr_id)); + msg->next = rs->smsg_free; + rs->smsg_free = msg; + } + rs->sqe_avail++; + } + + qp = ds_next_qp(qp); + if (!rs->rqe_avail && rs->sqe_avail) { + rs->qp_list = qp; + return; + } + cnt++; + } while (qp != rs->qp_list); + } while (cnt); +} + +static void ds_req_notify_cqs(struct rsocket *rs) +{ + struct ds_qp *qp; + + qp = rs->qp_list; + do { + if (!qp->cq_armed) { + ibv_req_notify_cq(qp->cm_id->recv_cq, 0); + qp->cq_armed = 1; + } + qp = ds_next_qp(qp); + } while (qp != rs->qp_list); +} + +static int ds_get_cq_event(struct rsocket *rs) +{ + struct epoll_event event; + struct ds_qp *qp; + struct ibv_cq *cq; + void *context; + int ret; + + if (!rs->cq_armed) + return 0; + + ret = epoll_wait(rs->epfd, &event, 1, -1); + if (ret <= 0) + return ret; + + qp = event.data.ptr; + ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context); + if (!ret) { + ibv_ack_cq_events(rs->cm_id->recv_cq, 1); + qp->cq_armed = 0; + rs->cq_armed = 0; + } + + return ret; +} + +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) +{ + int ret = 0; + + fastlock_acquire(&rs->cq_lock); + do { + ds_poll_cqs(rs); + if (test(rs)) { + ret = 0; + break; + } else if (nonblock) { + ret = ERR(EWOULDBLOCK); + } else if (!rs->cq_armed) { + ds_req_notify_cqs(rs); + rs->cq_armed = 1; + } else { + rs_update_credits(rs); + fastlock_acquire(&rs->cq_wait_lock); + fastlock_release(&rs->cq_lock); + + ret = ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); + fastlock_acquire(&rs->cq_lock); + } + } while (!ret); + + fastlock_release(&rs->cq_lock); + return ret; +} + +static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) +{ + struct timeval s, e; + uint32_t poll_time = 0; + int ret; + + do { + ret = ds_process_cqs(rs, 1, test); + if (!ret || nonblock || errno != EWOULDBLOCK) + return ret; + + if (!poll_time) + gettimeofday(&s, NULL); + + gettimeofday(&e, NULL); + poll_time = (e.tv_sec - s.tv_sec) * 1000000 + + (e.tv_usec - s.tv_usec) + 1; + } while (poll_time <= polling_time); + + ret = ds_process_cqs(rs, 0, test); + return ret; +} + static int rs_nonblocking(struct rsocket *rs, int flags) { return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); @@ -1721,7 +1959,7 @@ static int rs_can_send(struct rsocket *rs) static int ds_can_send(struct rsocket *rs) { - return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed); + return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT); } static int rs_conn_can_send(struct rsocket *rs) @@ -1961,31 +2199,22 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, { struct ds_udp_header hdr; struct msghdr msg; - struct iovec miov[4]; + struct iovec miov[8]; struct ds_qp *qp; - if (iovcnt > 4) + if (iovcnt > 8) return ERR(ENOTSUP); - qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL; hdr.tag = htonll(DS_UDP_TAG); hdr.version = 1; - if (qp) { - hdr.sl = qp->sl; - hdr.slid = htons(qp->lid); - hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF); - } else { - hdr.sl = 0; - hdr.slid = 0; - hdr.qpn = 0; - } + memset(&hdr->reserved, 0, sizeof hdr->reserved); + hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF); miov[0].iov_base = &hdr; miov[0].iov_len = sizeof hdr; memcpy(&miov[1], iov, sizeof *iov * iovcnt); memset(&msg, 0, sizeof msg); - /* TODO: specify name if needed */ msg.msg_iov = miov; msg.msg_iovlen = iovcnt + 1; return sendmsg(rs->fd, msg, flags); @@ -2004,14 +2233,13 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) struct ibv_sge sge; int ret = 0; - if (!rs->conn_dest || !rs->conn_dest->ah) + if (!rs->conn_dest->ah) return ds_send_udp(rs, buf, len, flags); - rs->sbytes_needed = len; if (!ds_can_send(rs)) { - ret = rs_get_comp(rs, 1, ds_can_send); + ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send); if (ret) - return ds_send_udp(rs, buf, len, flags); + return ret; } if (len <= rs->sq_inline) { @@ -2145,8 +2373,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, } fastlock_acquire(&rs->slock); - ds_connect(rs, dest_addr, addrlen); + if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + if (ret) + goto out; + } ret = dsend(rs, buf, len, flags); +out: fastlock_release(&rs->slock); return ret; } @@ -2629,7 +2862,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); return 0; } else { - return getpeername(rs->fs, addr, addrlen); + return getpeername(rs->udp_sock, addr, addrlen); } } @@ -2642,7 +2875,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); return 0; } else { - return getsockname(rs->fd, addr, addrlen); + return getsockname(rs->udp_sock, addr, addrlen); } } @@ -2656,7 +2889,7 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { - ret = setsockopt(rs->fd, optname, optval, optlen); + ret = setsockopt(rs->udp_sock, optname, optval, optlen); if (ret) return ret; } @@ -2666,13 +2899,15 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->so_opts; switch (optname) { case SO_REUSEADDR: - ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, - RDMA_OPTION_ID_REUSEADDR, - (void *) optval, optlen); - if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) && - rs->cm_id->context && - (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB)))) - ret = 0; + if (rs->type == SOCK_STREAM) { + ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, + RDMA_OPTION_ID_REUSEADDR, + (void *) optval, optlen); + if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) && + rs->cm_id->context && + (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB)))) + ret = 0; + } opt_on = *(int *) optval; break; case SO_RCVBUF: @@ -2722,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname, opts = &rs->ipv6_opts; switch (optname) { case IPV6_V6ONLY: - ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, - RDMA_OPTION_ID_AFONLY, - (void *) optval, optlen); + if (rs->type == SOCK_STREAM) { + ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, + RDMA_OPTION_ID_AFONLY, + (void *) optval, optlen); + } opt_on = *(int *) optval; break; default: @@ -2778,9 +3015,6 @@ int rgetsockopt(int socket, int level, int optname, int ret = 0; rs = idm_at(&idm, socket); - if (rs->type == SOCK_DGRAM && level != SOL_RDMA) - return getsockopt(rs->fd, level, optname, optval, optlen); - switch (level) { case SOL_SOCKET: switch (optname) { @@ -3100,24 +3334,3 @@ out: return (ret && left == count) ? ret : count - left; } - -ssize_t urecvfrom(int socket, void *buf, size_t len, int flags, - struct sockaddr *src_addr, socklen_t *addrlen) -{ - int ret; - - ret = rrecv(socket, buf, len, flags); - if (ret > 0 && src_addr) - rgetpeername(socket, src_addr, addrlen); - - return ret; -} - -ssize_t usendto(int socket, const void *buf, size_t len, int flags, - const struct sockaddr *dest_addr, socklen_t addrlen) -{ - if (dest_addr || addrlen) - return ERR(EISCONN); - - return usend(socket, buf, len, flags); -} -- 2.41.0