From 8d8e192aa985114a4714ea91a9ec1136033bca83 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Thu, 15 Nov 2012 20:55:16 -0800 Subject: [PATCH] Refresh of usocket --- docs/rsocket | 50 +- src/rsocket.c | 427 ++++++++-- src/usocket.c | 2253 ------------------------------------------------- 3 files changed, 425 insertions(+), 2305 deletions(-) delete mode 100644 src/usocket.c diff --git a/docs/rsocket b/docs/rsocket index 1484f65b..03d49df7 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -1,7 +1,7 @@ -rsocket Protocol and Design Guide 9/10/2012 +rsocket Protocol and Design Guide 11/11/2012 -Overview --------- +Data Streaming (TCP) Overview +----------------------------- Rsockets is a protocol over RDMA that supports a socket-level API for applications. For details on the current state of the implementation, readers should refer to the rsocket man page. This @@ -189,3 +189,47 @@ registered remote data buffer. From host A's perspective, the transfer appears as a normal send/write operation, with the data stream redirected directly into the receiving application's buffer. + + + +Datagram Overview +----------------- +THe rsocket API supports datagram sockets. Datagram support is handled through an +entirely different protocol and internal implementation. Unlike connected rsockets, +datagram rsockets are not necessarily bound to a network (IP) address. A datagram +socket may use any number of network (IP) addresses, including those which map to +different RDMA devices. As a result, a single datagram rsocket must support +using multiple RDMA devices and ports, and a datagram rsocket references a single +UDP socket, plus zero or more UD QPs. + +Rsockets uses headers inserted before user data sent over UDP sockets to resolve +remote UD QP numbers. When a user first attempts to send a datagram to a remote +address (IP and UDP port), rsockets will take the following steps. First, it +will allocate and store the destination address into a lookup table for future +use. Then it will resolve which local network address should be used when sending +to the specified destination. It will allocate a UD QP on the RDMA device +associated with the local address. Finally, rsockets will send the user's +datagram to the remote UDP socket, but inserting a header before the datagram. +The header specifies the UD QP number associated with the local network address +(IP and UDP port) of the send. + +Under many circumstances, the rsocket UDP header also provides enough path +information for the receiver to send the reply using a UD QP. However, certain +fabric topologies may require contacting a subnet administrator. Whenever +rsockets lacks sufficient information to send data to a remote peer using +a UD QP, it will automatically fall back to using a standard UDP socket. This +helps minimize the latency for performing a given send, while lenghty address +and route resolution protocols complete. + +Currently, rsockets allocates a single UD QP, with an infrastructure is provided +to support more. Future enhancements may add support for multiple UD QPs, each +associated with a single network (IP) address. Multiplexing different network +addresses over a single UD QP and using shared receive queues are other possible +enhancements. + +Because rsockets may use multiple UD QPs, buffer management can become an issue. +The total size of receive buffers is specified by the mem_default configuration +files, but may be overridden with rsetsockopt. The buffer is divided into +MTU sized messages and distributed among the allocated QPs. As new QPs are +created, the receive buffers may be redistributed by posting loopback sends +on a QP in order to free the receive buffers for reposting to a different QP. \ No newline at end of file diff --git a/src/rsocket.c b/src/rsocket.c index 58fcb8e5..99e638c8 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -110,6 +110,12 @@ struct rs_msg { uint32_t data; }; +struct ds_qp; + +struct ds_msg { + struct ds_qp *qp; +}; + struct rs_sge { uint64_t addr; uint32_t key; @@ -169,13 +175,63 @@ enum rs_state { #define RS_OPT_SWAP_SGL 1 -struct rsocket { +union socket_addr { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; +}; + +struct ds_qp { struct rdma_cm_id *cm_id; + int rbuf_cnt; + uint16_t lid; + uint8_t sl; +}; + +struct ds_dest { + union socket_addr addr; + struct ds_qp *qp; + struct ibv_ah *ah; + uint32_t qpn; + atomic_t refcnt; +}; + +struct rsocket { + int type; + int index; fastlock_t slock; fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; - fastlock_t iomap_lock; + fastlock_t map_lock; /* acquire slock first if needed */ + + union { + /* data stream */ + struct { + struct rdma_cm_id *cm_id; + + int remote_sge; + struct rs_sge remote_sgl; + struct rs_sge remote_iomap; + + struct ibv_mr *target_mr; + int target_sge; + int target_iomap_size; + void *target_buffer_list; + volatile struct rs_sge *target_sgl; + struct rs_iomap *target_iomap; + }; + /* datagram */ + struct { + struct ds_qp *qp_array; + void *dest_map; + struct ds_dest *conn_dest; + struct pollfd *fds; + nfds_t nfds; + int fd; + int sbytes_needed; + }; + }; int opts; long fd_flags; @@ -186,7 +242,7 @@ struct rsocket { int cq_armed; int retries; int err; - int index; + int ctrl_avail; int sqe_avail; int sbuf_bytes_avail; @@ -203,34 +259,37 @@ struct rsocket { int rbuf_offset; int rmsg_head; int rmsg_tail; - struct rs_msg *rmsg; - - int remote_sge; - struct rs_sge remote_sgl; - struct rs_sge remote_iomap; + union { + struct rs_msg *rmsg; + struct ds_msg *dmsg; + }; struct rs_iomap_mr *remote_iomappings; dlist_entry iomap_list; dlist_entry iomap_queue; int iomap_pending; - struct ibv_mr *target_mr; - int target_sge; - int target_iomap_size; - void *target_buffer_list; - volatile struct rs_sge *target_sgl; - struct rs_iomap *target_iomap; - uint32_t rbuf_size; - struct ibv_mr *rmr; + struct ibv_mr *rmr; uint8_t *rbuf; uint32_t sbuf_size; - struct ibv_mr *smr; + struct ibv_mr *smr; struct ibv_sge ssgl[2]; uint8_t *sbuf; }; +#define DS_UDP_TAG 0x5555555555555555ULL + +struct ds_udp_header { + uint64_t tag; + uint8_t version; + uint8_t sl; + uint16_t slid; + uint32_t qpn; /* upper 8-bits reserved */ +}; + + static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? @@ -306,10 +365,10 @@ out: pthread_mutex_unlock(&mut); } -static int rs_insert(struct rsocket *rs) +static int rs_insert(struct rsocket *rs, index) { pthread_mutex_lock(&mut); - rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs); + rs->index = idm_set(&idm, index, rs); pthread_mutex_unlock(&mut); return rs->index; } @@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs) pthread_mutex_unlock(&mut); } -static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) { struct rsocket *rs; @@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) if (!rs) return NULL; + rs->type = type; rs->index = -1; + rs->fd = -1; if (inherited_rs) { rs->sbuf_size = inherited_rs->sbuf_size; rs->rbuf_size = inherited_rs->rbuf_size; @@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); - fastlock_init(&rs->iomap_lock); + fastlock_init(&rs->map_lock); dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); return rs; @@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - fastlock_destroy(&rs->iomap_lock); + if (rs->fd >= 0) + close(rs->fd); + if (rs->fds) + free(rs->fds); + + fastlock_destroy(&rs->map_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) rs->sseq_comp = ntohs(conn->credits); } +static int ds_init(struct rsocket *rs, int domain) +{ + rs->fd = socket(domain, SOCK_DGRAM, 0); + if (rs->fd < 0) + return rs->fd; + + rs->fds = calloc(1, sizeof *fds); + if (!rs->fds) + return ERR(ENOMEM); + rs->nfds = 1; + + return 0; +} + int rsocket(int domain, int type, int protocol) { struct rsocket *rs; - int ret; + int index, ret; if ((domain != PF_INET && domain != PF_INET6) || - (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) + ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || + (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || + (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) return ERR(ENOTSUP); rs_configure(); - rs = rs_alloc(NULL); + rs = rs_alloc(NULL, type); if (!rs) return ERR(ENOMEM); - ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); - if (ret) - goto err; + if (type == SOCK_STREAM) { + ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); + if (ret) + goto err; + + rs->cm_id->route.addr.src_addr.sa_family = domain; + index = rs->cm_id->channel->fd; + } else { + ret = ds_init(rs, domain); + if (ret) + goto err; - ret = rs_insert(rs); + index = rs->fd; + } + + ret = rs_insert(rs, index); if (ret < 0) goto err; - rs->cm_id->route.addr.src_addr.sa_family = domain; return rs->index; err: @@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) int ret; rs = idm_at(&idm, socket); - ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); - if (!ret) - rs->state = rs_bound; + if (rs->type == SOCK_STREAM) { + ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); + if (!ret) + rs->state = rs_bound; + } else { + ret = bind(rs->fd, addr, addrlen); + } return ret; } @@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) int ret; rs = idm_at(&idm, socket); - new_rs = rs_alloc(rs); + new_rs = rs_alloc(rs, rs->type); if (!new_rs) return ERR(ENOMEM); @@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) if (ret) goto err; - ret = rs_insert(new_rs); + ret = rs_insert(new_rs, rs->cm_id->channel->fd); if (ret < 0) goto err; @@ -854,6 +950,66 @@ connected: return ret; } +static int ds_compare_dest(const void *dst1, const void *dst2) +{ + const struct sockaddr *sa1, *sa2; + size_t len; + + sa1 = (const struct sockaddr *) dst1; + sa2 = (const struct sockaddr *) dst2; + + len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); + return memcmp(dst1, dst2, len); +} + +/* Caller must hold map_lock around accessing source address */ +static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr, + socklen_t dst_len, socklen_t *src_len) +{ + static union socket_addr src_addr; + int sock, ret; + + sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0); + if (sock < 0) + return NULL; + + ret = connect(sock, &dst_addr->sa, dst_len); + if (ret) + goto out; + + *src_len = sizeof src_addr; + ret = getsockname(sock, &src_addr.sa, src_len); + +out: + close(sock); + return ret ? NULL : &src_addr; +} + +static struct ds_qp *ds_get_qp(struct rsocket *rs, + const struct sockaddr *src_addr, socklen_t addrlen) +{ + union socket_addr *addr; + socklen_t len; + +} + +static int ds_connect(struct rsocket *rs, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + struct ds_dest **dest; + + fastlock_acquire(&rs->map_lock); + dest = tfind(dest_addr, rs->dest_map, ds_compare_dest); + if (dest) { + rs->conn_dest = *dest; + goto out; + } +out: + fastlock_release(&rs->map_lock); + return 0; +} + int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) { struct rsocket *rs; @@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int ds_post_send(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_SEND; + wr.send_flags = flags; + wr.wr.ud.ah = rs->conn_dest->ah; + wr.wr.ud.remote_qpn = rs->conn_dest->qpn; + wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; + + return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs, flags, addr, rkey); } +static int ds_send_data(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint32_t length, int flags) +{ + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags); +} + static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, struct ibv_sge *sgl, int nsge, uint32_t length, int flags) { @@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } +static int ds_can_send(struct rsocket *rs) +{ + return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed); +} + static int rs_conn_can_send(struct rsocket *rs) { return rs_can_send(rs) || !(rs->state & rs_connect_wr); @@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) struct rs_iomap iom; int ret; - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); while (!dlist_empty(&rs->iomap_queue)) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), @@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) } rs->iomap_pending = !dlist_empty(&rs->iomap_queue); - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } +static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, + int iovcnt, int flags) +{ + struct ds_udp_header hdr; + struct msghdr msg; + struct iovec miov[4]; + struct ds_qp *qp; + + if (iovcnt > 4) + return ERR(ENOTSUP); + + qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL; + hdr.tag = htonll(DS_UDP_TAG); + hdr.version = 1; + if (qp) { + hdr.sl = qp->sl; + hdr.slid = htons(qp->lid); + hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF); + } else { + hdr.sl = 0; + hdr.slid = 0; + hdr.qpn = 0; + } + + miov[0].iov_base = &hdr; + miov[0].iov_len = sizeof hdr; + memcpy(&miov[1], iov, sizeof *iov * iovcnt); + + memset(&msg, 0, sizeof msg); + /* TODO: specify name if needed */ + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; + return sendmsg(rs->fd, msg, flags); +} + +static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct iovec iov; + iov.iov_base = buf; + iov_iov_len = len; + return ds_sendv_udp(s, &iov, 1, flags); +} + +static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct ibv_sge sge; + int ret = 0; + + if (!rs->conn_dest || !rs->conn_dest->ah) + return ds_send_udp(rs, buf, len, flags); + + rs->sbytes_needed = len; + if (!ds_can_send(rs)) { + ret = rs_get_comp(rs, 1, ds_can_send); + if (ret) + return ds_send_udp(rs, buf, len, flags); + } + + if (len <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = len; + sge.lkey = 0; + ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE); + } else if (len <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len); + rs->ssgl[0].length = len; + ret = ds_send_data(rs, rs->ssgl, 1, len, 0); + if (len < rs_sbuf_left(rs)) + rs->ssgl[0].addr += len; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = len - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = ds_send_data(rs, rs->ssgl, 2, len, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + + return ret ? ret : len; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM) { + fastlock_acquire(&rs->slock); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; + } + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { @@ -1537,10 +1817,21 @@ out: ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { - if (dest_addr || addrlen) - return ERR(EISCONN); + struct rsocket *rs; + + rs = idm_at(&idm, socket); + if (rs->type == SOCK_STREAM) { + if (dest_addr || addrlen) + return ERR(EISCONN); + + return rsend(socket, buf, len, flags); + } - return rsend(socket, buf, len, flags); + fastlock_acquire(&rs->slock); + ds_connect(rs, dest_addr, addrlen); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; } static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) @@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) if (msg->msg_control && msg->msg_controllen) return ERR(ENOTSUP); - return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); + return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags); } ssize_t rwrite(int socket, const void *buf, size_t count) @@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); + return 0; + } else { + return getpeername(rs->fs, addr, addrlen); + } } int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) @@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); + return 0; + } else { + return getsockname(rs->fd, addr, addrlen); + } } int rsetsockopt(int socket, int level, int optname, @@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { + ret = setsockopt(rs->fd, optname, optval, optlen); + if (ret) + return ret; + } + switch (level) { case SOL_SOCKET: opts = &rs->so_opts; @@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname, int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) + return getsockopt(rs->fd, level, optname, optval, optlen); + switch (level) { case SOL_SOCKET: switch (optname) { @@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) return ERR(EINVAL); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); if (prot & PROT_WRITE) { iomr = rs_get_iomap_mr(rs); access |= IBV_ACCESS_REMOTE_WRITE; @@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse dlist_insert_tail(&iomr->entry, &rs->iomap_list); } out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return offset; } @@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len) int ret = 0; rs = idm_at(&idm, socket); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); for (entry = rs->iomap_list.next; entry != &rs->iomap_list; entry = entry->next) { @@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len) } ret = ERR(EINVAL); out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } @@ -2475,3 +2783,24 @@ out: return (ret && left == count) ? ret : count - left; } + +ssize_t urecvfrom(int socket, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + int ret; + + ret = rrecv(socket, buf, len, flags); + if (ret > 0 && src_addr) + rgetpeername(socket, src_addr, addrlen); + + return ret; +} + +ssize_t usendto(int socket, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + if (dest_addr || addrlen) + return ERR(EISCONN); + + return usend(socket, buf, len, flags); +} diff --git a/src/usocket.c b/src/usocket.c deleted file mode 100644 index 87da990c..00000000 --- a/src/usocket.c +++ /dev/null @@ -1,2253 +0,0 @@ -/* - * Copyright (c) 2012 Intel Corporation. All rights reserved. - * - * This software is available to you under a choice of one of two - * licenses. You may choose to be licensed under the terms of the GNU - * General Public License (GPL) Version 2, available from the file - * COPYING in the main directory of this source tree, or the - * OpenIB.org BSD license below: - * - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * - Redistributions of source code must retain the above - * copyright notice, this list of conditions and the following - * disclaimer. - * - * - Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS - * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN - * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - * - */ - -#if HAVE_CONFIG_H -# include -#endif /* HAVE_CONFIG_H */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include "cma.h" -#include "indexer.h" - -//#define RS_SNDLOWAT 64 -//#define RS_QP_MAX_SIZE 0xFFFE -//#define RS_SGL_SIZE 2 -//static struct index_map idm; -//static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; - -//static uint16_t def_inline = 64; -//static uint16_t def_sqsize = 384; -//static uint16_t def_rqsize = 384; -//static uint32_t def_mem = (1 << 17); -//static uint32_t def_wmem = (1 << 17); -//static uint32_t polling_time = 10; - -//enum { -// RS_OP_DATA, -// RS_OP_RSVD_DATA_MORE, -// RS_OP_WRITE, /* opcode is not transmitted over the network */ -// RS_OP_RSVD_DRA_MORE, -// RS_OP_SGL, -// RS_OP_RSVD, -// RS_OP_IOMAP_SGL, -// RS_OP_CTRL -//}; -//#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) -//#define rs_msg_op(imm_data) (imm_data >> 29) -//#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) - -struct rs_msg { - uint32_t op; - uint32_t data; -}; - -struct rs_sge { - uint64_t addr; - uint32_t key; - uint32_t length; -}; - -struct rs_iomap { - uint64_t offset; - struct rs_sge sge; -}; - -struct rs_iomap_mr { - uint64_t offset; - struct ibv_mr *mr; - dlist_entry entry; - atomic_t refcnt; - int index; /* -1 if mapping is local and not in iomap_list */ -}; - -#define RS_MIN_INLINE (sizeof(struct rs_sge)) -#define rs_host_is_net() (1 == htonl(1)) -#define RS_CONN_FLAG_NET (1 << 0) -#define RS_CONN_FLAG_IOMAP (1 << 1) - -struct rs_conn_data { - uint8_t version; - uint8_t flags; - uint16_t credits; - uint8_t reserved[3]; - uint8_t target_iomap_size; - struct rs_sge target_sgl; - struct rs_sge data_buf; -}; - -#define RS_RECV_WR_ID (~((uint64_t) 0)) - -/* - * usocket states are ordered as passive, connecting, connected, disconnected. - */ -enum rs_state { - rs_init, - rs_bound = 0x0001, - rs_listening = 0x0002, - rs_opening = 0x0004, - rs_resolving_addr = rs_opening | 0x0010, - rs_resolving_route = rs_opening | 0x0020, - rs_connecting = rs_opening | 0x0040, - rs_accepting = rs_opening | 0x0080, - rs_connected = 0x0100, - rs_connect_wr = 0x0200, - rs_connect_rd = 0x0400, - rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr, - rs_connect_error = 0x0800, - rs_disconnected = 0x1000, - rs_error = 0x2000, -}; - -#define RS_OPT_SWAP_SGL 1 - -struct usocket { - struct rdma_cm_id *cm_id; - fastlock_t slock; - fastlock_t rlock; - fastlock_t cq_lock; - fastlock_t cq_wait_lock; - fastlock_t iomap_lock; - - int opts; - long fd_flags; - uint64_t so_opts; - uint64_t tcp_opts; - uint64_t ipv6_opts; - int state; - int cq_armed; - int retries; - int err; - int index; - int ctrl_avail; - int sqe_avail; - int sbuf_bytes_avail; - uint16_t sseq_no; - uint16_t sseq_comp; - uint16_t sq_size; - uint16_t sq_inline; - - uint16_t rq_size; - uint16_t rseq_no; - uint16_t rseq_comp; - int rbuf_bytes_avail; - int rbuf_free_offset; - int rbuf_offset; - int rmsg_head; - int rmsg_tail; - struct rs_msg *rmsg; - - int remote_sge; - struct rs_sge remote_sgl; - struct rs_sge remote_iomap; - - struct rs_iomap_mr *remote_iomappings; - dlist_entry iomap_list; - dlist_entry iomap_queue; - int iomap_pending; - - struct ibv_mr *target_mr; - int target_sge; - int target_iomap_size; - void *target_buffer_list; - volatile struct rs_sge *target_sgl; - struct rs_iomap *target_iomap; - - uint32_t rbuf_size; - struct ibv_mr *rmr; - uint8_t *rbuf; - - uint32_t sbuf_size; - struct ibv_mr *smr; - struct ibv_sge ssgl[2]; - uint8_t *sbuf; -}; - -static int rs_value_to_scale(int value, int bits) -{ - return value <= (1 << (bits - 1)) ? - value : (1 << (bits - 1)) | (value >> bits); -} - -static int rs_scale_to_value(int value, int bits) -{ - return value <= (1 << (bits - 1)) ? - value : (value & ~(1 << (bits - 1))) << bits; -} - -void rs_configure(void) -{ - FILE *f; - static int init; - - if (init) - return; - - pthread_mutex_lock(&mut); - if (init) - goto out; - - if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) { - (void) fscanf(f, "%u", &polling_time); - fclose(f); - } - - if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) { - (void) fscanf(f, "%hu", &def_inline); - fclose(f); - - if (def_inline < RS_MIN_INLINE) - def_inline = RS_MIN_INLINE; - } - - if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) { - (void) fscanf(f, "%hu", &def_sqsize); - fclose(f); - } - - if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) { - (void) fscanf(f, "%hu", &def_rqsize); - fclose(f); - } - - if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) { - (void) fscanf(f, "%u", &def_mem); - fclose(f); - - if (def_mem < 1) - def_mem = 1; - } - - if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) { - (void) fscanf(f, "%u", &def_wmem); - fclose(f); - if (def_wmem < RS_SNDLOWAT) - def_wmem = RS_SNDLOWAT << 1; - } - - if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) { - (void) fscanf(f, "%hu", &def_iomap_size); - fclose(f); - - /* round to supported values */ - def_iomap_size = (uint8_t) rs_value_to_scale( - (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8); - } - init = 1; -out: - pthread_mutex_unlock(&mut); -} - -static int rs_insert(struct usocket *us) -{ - pthread_mutex_lock(&mut); - us->index = idm_set(&idm, us->cm_id->channel->fd, us); - pthread_mutex_unlock(&mut); - return us->index; -} - -static void rs_remove(struct usocket *us) -{ - pthread_mutex_lock(&mut); - idm_clear(&idm, us->index); - pthread_mutex_unlock(&mut); -} - -static struct usocket *rs_alloc(struct usocket *inherited_rs) -{ - struct usocket *us; - - us = calloc(1, sizeof *us); - if (!us) - return NULL; - - us->index = -1; - if (inherited_rs) { - us->sbuf_size = inherited_rs->sbuf_size; - us->rbuf_size = inherited_rs->rbuf_size; - us->sq_inline = inherited_rs->sq_inline; - us->sq_size = inherited_rs->sq_size; - us->rq_size = inherited_rs->rq_size; - us->ctrl_avail = inherited_rs->ctrl_avail; - us->target_iomap_size = inherited_rs->target_iomap_size; - } else { - us->sbuf_size = def_wmem; - us->rbuf_size = def_mem; - us->sq_inline = def_inline; - us->sq_size = def_sqsize; - us->rq_size = def_rqsize; - us->ctrl_avail = RS_QP_CTRL_SIZE; - us->target_iomap_size = def_iomap_size; - } - fastlock_init(&us->slock); - fastlock_init(&us->rlock); - fastlock_init(&us->cq_lock); - fastlock_init(&us->cq_wait_lock); - fastlock_init(&us->iomap_lock); - dlist_init(&us->iomap_list); - dlist_init(&us->iomap_queue); - return us; -} - -static int rs_set_nonblocking(struct usocket *us, long arg) -{ - int ret = 0; - - if (us->cm_id->recv_cq_channel) - ret = fcntl(us->cm_id->recv_cq_channel->fd, F_SETFL, arg); - - if (!ret && us->state < rs_connected) - ret = fcntl(us->cm_id->channel->fd, F_SETFL, arg); - - return ret; -} - -static void rs_set_qp_size(struct usocket *us) -{ - uint16_t max_size; - - max_size = min(ucma_max_qpsize(us->cm_id), RS_QP_MAX_SIZE); - - if (us->sq_size > max_size) - us->sq_size = max_size; - else if (us->sq_size < 2) - us->sq_size = 2; - if (us->sq_size <= (RS_QP_CTRL_SIZE << 2)) - us->ctrl_avail = 1; - - if (us->rq_size > max_size) - us->rq_size = max_size; - else if (us->rq_size < 2) - us->rq_size = 2; -} - -static int rs_init_bufs(struct usocket *us) -{ - size_t len; - - us->rmsg = calloc(us->rq_size + 1, sizeof(*us->rmsg)); - if (!us->rmsg) - return -1; - - us->sbuf = calloc(us->sbuf_size, sizeof(*us->sbuf)); - if (!us->sbuf) - return -1; - - us->smr = rdma_reg_msgs(us->cm_id, us->sbuf, us->sbuf_size); - if (!us->smr) - return -1; - - len = sizeof(*us->target_sgl) * RS_SGL_SIZE + - sizeof(*us->target_iomap) * us->target_iomap_size; - us->target_buffer_list = malloc(len); - if (!us->target_buffer_list) - return -1; - - us->target_mr = rdma_reg_write(us->cm_id, us->target_buffer_list, len); - if (!us->target_mr) - return -1; - - memset(us->target_buffer_list, 0, len); - us->target_sgl = us->target_buffer_list; - if (us->target_iomap_size) - us->target_iomap = (struct rs_iomap *) (us->target_sgl + RS_SGL_SIZE); - - us->rbuf = calloc(us->rbuf_size, sizeof(*us->rbuf)); - if (!us->rbuf) - return -1; - - us->rmr = rdma_reg_write(us->cm_id, us->rbuf, us->rbuf_size); - if (!us->rmr) - return -1; - - us->ssgl[0].addr = us->ssgl[1].addr = (uintptr_t) us->sbuf; - us->sbuf_bytes_avail = us->sbuf_size; - us->ssgl[0].lkey = us->ssgl[1].lkey = us->smr->lkey; - - us->rbuf_free_offset = us->rbuf_size >> 1; - us->rbuf_bytes_avail = us->rbuf_size >> 1; - us->sqe_avail = us->sq_size - us->ctrl_avail; - us->rseq_comp = us->rq_size >> 1; - return 0; -} - -static int rs_create_cq(struct usocket *us) -{ - us->cm_id->recv_cq_channel = ibv_create_comp_channel(us->cm_id->verbs); - if (!us->cm_id->recv_cq_channel) - return -1; - - us->cm_id->recv_cq = ibv_create_cq(us->cm_id->verbs, us->sq_size + us->rq_size, - us->cm_id, us->cm_id->recv_cq_channel, 0); - if (!us->cm_id->recv_cq) - goto err1; - - if (us->fd_flags & O_NONBLOCK) { - if (rs_set_nonblocking(us, O_NONBLOCK)) - goto err2; - } - - us->cm_id->send_cq_channel = us->cm_id->recv_cq_channel; - us->cm_id->send_cq = us->cm_id->recv_cq; - return 0; - -err2: - ibv_destroy_cq(us->cm_id->recv_cq); - us->cm_id->recv_cq = NULL; -err1: - ibv_destroy_comp_channel(us->cm_id->recv_cq_channel); - us->cm_id->recv_cq_channel = NULL; - return -1; -} - -static inline int -rs_post_recv(struct usocket *us) -{ - struct ibv_recv_wr wr, *bad; - - wr.wr_id = RS_RECV_WR_ID; - wr.next = NULL; - wr.sg_list = NULL; - wr.num_sge = 0; - - return rdma_seterrno(ibv_post_recv(us->cm_id->qp, &wr, &bad)); -} - -static int rs_create_ep(struct usocket *us) -{ - struct ibv_qp_init_attr qp_attr; - int i, ret; - - rs_set_qp_size(us); - ret = rs_init_bufs(us); - if (ret) - return ret; - - ret = rs_create_cq(us); - if (ret) - return ret; - - memset(&qp_attr, 0, sizeof qp_attr); - qp_attr.qp_context = us; - qp_attr.send_cq = us->cm_id->send_cq; - qp_attr.recv_cq = us->cm_id->recv_cq; - qp_attr.qp_type = IBV_QPT_RC; - qp_attr.sq_sig_all = 1; - qp_attr.cap.max_send_wr = us->sq_size; - qp_attr.cap.max_recv_wr = us->rq_size; - qp_attr.cap.max_send_sge = 2; - qp_attr.cap.max_recv_sge = 1; - qp_attr.cap.max_inline_data = us->sq_inline; - - ret = rdma_create_qp(us->cm_id, NULL, &qp_attr); - if (ret) - return ret; - - for (i = 0; i < us->rq_size; i++) { - ret = rs_post_recv(us); - if (ret) - return ret; - } - return 0; -} - -static void rs_release_iomap_mr(struct rs_iomap_mr *iomr) -{ - if (atomic_dec(&iomr->refcnt)) - return; - - dlist_remove(&iomr->entry); - ibv_dereg_mr(iomr->mr); - if (iomr->index >= 0) - iomr->mr = NULL; - else - free(iomr); -} - -static void rs_free_iomappings(struct usocket *us) -{ - struct rs_iomap_mr *iomr; - - while (!dlist_empty(&us->iomap_list)) { - iomr = container_of(us->iomap_list.next, - struct rs_iomap_mr, entry); - riounmap(us->index, iomr->mr->addr, iomr->mr->length); - } - while (!dlist_empty(&us->iomap_queue)) { - iomr = container_of(us->iomap_queue.next, - struct rs_iomap_mr, entry); - riounmap(us->index, iomr->mr->addr, iomr->mr->length); - } -} - -static void rs_free(struct usocket *us) -{ - if (us->index >= 0) - rs_remove(us); - - if (us->rmsg) - free(us->rmsg); - - if (us->sbuf) { - if (us->smr) - rdma_dereg_mr(us->smr); - free(us->sbuf); - } - - if (us->rbuf) { - if (us->rmr) - rdma_dereg_mr(us->rmr); - free(us->rbuf); - } - - if (us->target_buffer_list) { - if (us->target_mr) - rdma_dereg_mr(us->target_mr); - free(us->target_buffer_list); - } - - if (us->cm_id) { - rs_free_iomappings(us); - if (us->cm_id->qp) - rdma_destroy_qp(us->cm_id); - rdma_destroy_id(us->cm_id); - } - - fastlock_destroy(&us->iomap_lock); - fastlock_destroy(&us->cq_wait_lock); - fastlock_destroy(&us->cq_lock); - fastlock_destroy(&us->rlock); - fastlock_destroy(&us->slock); - free(us); -} - -static void rs_set_conn_data(struct usocket *us, struct rdma_conn_param *param, - struct rs_conn_data *conn) -{ - conn->version = 1; - conn->flags = RS_CONN_FLAG_IOMAP | - (rs_host_is_net() ? RS_CONN_FLAG_NET : 0); - conn->credits = htons(us->rq_size); - memset(conn->reserved, 0, sizeof conn->reserved); - conn->target_iomap_size = (uint8_t) rs_value_to_scale(us->target_iomap_size, 8); - - conn->target_sgl.addr = htonll((uintptr_t) us->target_sgl); - conn->target_sgl.length = htonl(RS_SGL_SIZE); - conn->target_sgl.key = htonl(us->target_mr->rkey); - - conn->data_buf.addr = htonll((uintptr_t) us->rbuf); - conn->data_buf.length = htonl(us->rbuf_size >> 1); - conn->data_buf.key = htonl(us->rmr->rkey); - - param->private_data = conn; - param->private_data_len = sizeof *conn; -} - -static void rs_save_conn_data(struct usocket *us, struct rs_conn_data *conn) -{ - us->remote_sgl.addr = ntohll(conn->target_sgl.addr); - us->remote_sgl.length = ntohl(conn->target_sgl.length); - us->remote_sgl.key = ntohl(conn->target_sgl.key); - us->remote_sge = 1; - if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) || - (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET))) - us->opts = RS_OPT_SWAP_SGL; - - if (conn->flags & RS_CONN_FLAG_IOMAP) { - us->remote_iomap.addr = us->remote_sgl.addr + - sizeof(us->remote_sgl) * us->remote_sgl.length; - us->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8); - us->remote_iomap.key = us->remote_sgl.key; - } - - us->target_sgl[0].addr = ntohll(conn->data_buf.addr); - us->target_sgl[0].length = ntohl(conn->data_buf.length); - us->target_sgl[0].key = ntohl(conn->data_buf.key); - - us->sseq_comp = ntohs(conn->credits); -} - -int usocket(int domain, int type, int protocol) -{ - struct usocket *us; - int ret; - - if ((domain != PF_INET && domain != PF_INET6) || - (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) - return ERR(ENOTSUP); - - rs_configure(); - us = rs_alloc(NULL); - if (!us) - return ERR(ENOMEM); - - ret = rdma_create_id(NULL, &us->cm_id, us, RDMA_PS_TCP); - if (ret) - goto err; - - ret = rs_insert(us); - if (ret < 0) - goto err; - - us->cm_id->route.addr.src_addr.sa_family = domain; - return us->index; - -err: - rs_free(us); - return ret; -} - -int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) -{ - struct usocket *us; - int ret; - - us = idm_at(&idm, socket); - ret = rdma_bind_addr(us->cm_id, (struct sockaddr *) addr); - if (!ret) - us->state = rs_bound; - return ret; -} - -int rlisten(int socket, int backlog) -{ - struct usocket *us; - int ret; - - us = idm_at(&idm, socket); - ret = rdma_listen(us->cm_id, backlog); - if (!ret) - us->state = rs_listening; - return ret; -} - -/* - * Nonblocking is usually not inherited between sockets, but we need to - * inherit it here to establish the connection only. This is needed to - * prevent rdma_accept from blocking until the remote side finishes - * establishing the connection. If we were to allow rdma_accept to block, - * then a single thread cannot establish a connection with itself, or - * two threads which try to connect to each other can deadlock trying to - * form a connection. - * - * Data transfers on the new socket remain blocking unless the user - * specifies otherwise through rfcntl. - */ -int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) -{ - struct usocket *us, *new_rs; - struct rdma_conn_param param; - struct rs_conn_data *creq, cresp; - int ret; - - us = idm_at(&idm, socket); - new_rs = rs_alloc(us); - if (!new_rs) - return ERR(ENOMEM); - - ret = rdma_get_request(us->cm_id, &new_rs->cm_id); - if (ret) - goto err; - - ret = rs_insert(new_rs); - if (ret < 0) - goto err; - - creq = (struct rs_conn_data *) new_rs->cm_id->event->param.conn.private_data; - if (creq->version != 1) { - ret = ERR(ENOTSUP); - goto err; - } - - if (us->fd_flags & O_NONBLOCK) - rs_set_nonblocking(new_rs, O_NONBLOCK); - - ret = rs_create_ep(new_rs); - if (ret) - goto err; - - rs_save_conn_data(new_rs, creq); - param = new_rs->cm_id->event->param.conn; - rs_set_conn_data(new_rs, ¶m, &cresp); - ret = rdma_accept(new_rs->cm_id, ¶m); - if (!ret) - new_rs->state = rs_connect_rdwr; - else if (errno == EAGAIN || errno == EWOULDBLOCK) - new_rs->state = rs_accepting; - else - goto err; - - if (addr && addrlen) - rgetpeername(new_rs->index, addr, addrlen); - return new_rs->index; - -err: - rs_free(new_rs); - return ret; -} - -static int rs_do_connect(struct usocket *us) -{ - struct rdma_conn_param param; - struct rs_conn_data creq, *cresp; - int to, ret; - - switch (us->state) { - case rs_init: - case rs_bound: -resolve_addr: - to = 1000 << us->retries++; - ret = rdma_resolve_addr(us->cm_id, NULL, - &us->cm_id->route.addr.dst_addr, to); - if (!ret) - goto resolve_route; - if (errno == EAGAIN || errno == EWOULDBLOCK) - us->state = rs_resolving_addr; - break; - case rs_resolving_addr: - ret = ucma_complete(us->cm_id); - if (ret) { - if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES) - goto resolve_addr; - break; - } - - us->retries = 0; -resolve_route: - to = 1000 << us->retries++; - ret = rdma_resolve_route(us->cm_id, to); - if (!ret) - goto do_connect; - if (errno == EAGAIN || errno == EWOULDBLOCK) - us->state = rs_resolving_route; - break; - case rs_resolving_route: - ret = ucma_complete(us->cm_id); - if (ret) { - if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES) - goto resolve_route; - break; - } -do_connect: - ret = rs_create_ep(us); - if (ret) - break; - - memset(¶m, 0, sizeof param); - rs_set_conn_data(us, ¶m, &creq); - param.flow_control = 1; - param.retry_count = 7; - param.rnr_retry_count = 7; - us->retries = 0; - - ret = rdma_connect(us->cm_id, ¶m); - if (!ret) - goto connected; - if (errno == EAGAIN || errno == EWOULDBLOCK) - us->state = rs_connecting; - break; - case rs_connecting: - ret = ucma_complete(us->cm_id); - if (ret) - break; -connected: - cresp = (struct rs_conn_data *) us->cm_id->event->param.conn.private_data; - if (cresp->version != 1) { - ret = ERR(ENOTSUP); - break; - } - - rs_save_conn_data(us, cresp); - us->state = rs_connect_rdwr; - break; - case rs_accepting: - if (!(us->fd_flags & O_NONBLOCK)) - rs_set_nonblocking(us, 0); - - ret = ucma_complete(us->cm_id); - if (ret) - break; - - us->state = rs_connect_rdwr; - break; - default: - ret = ERR(EINVAL); - break; - } - - if (ret) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - errno = EINPROGRESS; - } else { - us->state = rs_connect_error; - us->err = errno; - } - } - return ret; -} - -int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) -{ - struct usocket *us; - - us = idm_at(&idm, socket); - memcpy(&us->cm_id->route.addr.dst_addr, addr, addrlen); - return rs_do_connect(us); -} - -static int rs_post_write_msg(struct usocket *us, - struct ibv_sge *sgl, int nsge, - uint32_t imm_data, int flags, - uint64_t addr, uint32_t rkey) -{ - struct ibv_send_wr wr, *bad; - - wr.wr_id = (uint64_t) imm_data; - wr.next = NULL; - wr.sg_list = sgl; - wr.num_sge = nsge; - wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; - wr.send_flags = flags; - wr.imm_data = htonl(imm_data); - wr.wr.rdma.remote_addr = addr; - wr.wr.rdma.rkey = rkey; - - return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad)); -} - -static int rs_post_write(struct usocket *us, - struct ibv_sge *sgl, int nsge, - uint64_t wr_id, int flags, - uint64_t addr, uint32_t rkey) -{ - struct ibv_send_wr wr, *bad; - - wr.wr_id = wr_id; - wr.next = NULL; - wr.sg_list = sgl; - wr.num_sge = nsge; - wr.opcode = IBV_WR_RDMA_WRITE; - wr.send_flags = flags; - wr.wr.rdma.remote_addr = addr; - wr.wr.rdma.rkey = rkey; - - return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad)); -} - -/* - * Update target SGE before sending data. Otherwise the remote side may - * update the entry before we do. - */ -static int rs_write_data(struct usocket *us, - struct ibv_sge *sgl, int nsge, - uint32_t length, int flags) -{ - uint64_t addr; - uint32_t rkey; - - us->sseq_no++; - us->sqe_avail--; - us->sbuf_bytes_avail -= length; - - addr = us->target_sgl[us->target_sge].addr; - rkey = us->target_sgl[us->target_sge].key; - - us->target_sgl[us->target_sge].addr += length; - us->target_sgl[us->target_sge].length -= length; - - if (!us->target_sgl[us->target_sge].length) { - if (++us->target_sge == RS_SGL_SIZE) - us->target_sge = 0; - } - - return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_DATA, length), - flags, addr, rkey); -} - -static int rs_write_direct(struct usocket *us, struct rs_iomap *iom, uint64_t offset, - struct ibv_sge *sgl, int nsge, uint32_t length, int flags) -{ - uint64_t addr; - - us->sqe_avail--; - us->sbuf_bytes_avail -= length; - - addr = iom->sge.addr + offset - iom->offset; - return rs_post_write(us, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), - flags, addr, iom->sge.key); -} - -static int rs_write_iomap(struct usocket *us, struct rs_iomap_mr *iomr, - struct ibv_sge *sgl, int nsge, int flags) -{ - uint64_t addr; - - us->sseq_no++; - us->sqe_avail--; - us->sbuf_bytes_avail -= sizeof(struct rs_iomap); - - addr = us->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); - return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), - flags, addr, us->remote_iomap.key); -} - -static uint32_t rs_sbuf_left(struct usocket *us) -{ - return (uint32_t) (((uint64_t) (uintptr_t) &us->sbuf[us->sbuf_size]) - - us->ssgl[0].addr); -} - -static void rs_send_credits(struct usocket *us) -{ - struct ibv_sge ibsge; - struct rs_sge sge; - - us->ctrl_avail--; - us->rseq_comp = us->rseq_no + (us->rq_size >> 1); - if (us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) { - if (!(us->opts & RS_OPT_SWAP_SGL)) { - sge.addr = (uintptr_t) &us->rbuf[us->rbuf_free_offset]; - sge.key = us->rmr->rkey; - sge.length = us->rbuf_size >> 1; - } else { - sge.addr = bswap_64((uintptr_t) &us->rbuf[us->rbuf_free_offset]); - sge.key = bswap_32(us->rmr->rkey); - sge.length = bswap_32(us->rbuf_size >> 1); - } - - ibsge.addr = (uintptr_t) &sge; - ibsge.lkey = 0; - ibsge.length = sizeof(sge); - - rs_post_write_msg(us, &ibsge, 1, - rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size), - IBV_SEND_INLINE, - us->remote_sgl.addr + - us->remote_sge * sizeof(struct rs_sge), - us->remote_sgl.key); - - us->rbuf_bytes_avail -= us->rbuf_size >> 1; - us->rbuf_free_offset += us->rbuf_size >> 1; - if (us->rbuf_free_offset >= us->rbuf_size) - us->rbuf_free_offset = 0; - if (++us->remote_sge == us->remote_sgl.length) - us->remote_sge = 0; - } else { - rs_post_write_msg(us, NULL, 0, - rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size), - 0, 0, 0); - } -} - -static int rs_give_credits(struct usocket *us) -{ - return ((us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) || - ((short) ((short) us->rseq_no - (short) us->rseq_comp) >= 0)) && - us->ctrl_avail && (us->state & rs_connected); -} - -static void rs_update_credits(struct usocket *us) -{ - if (rs_give_credits(us)) - rs_send_credits(us); -} - -static int rs_poll_cq(struct usocket *us) -{ - struct ibv_wc wc; - uint32_t imm_data; - int ret, rcnt = 0; - - while ((ret = ibv_poll_cq(us->cm_id->recv_cq, 1, &wc)) > 0) { - if (wc.wr_id == RS_RECV_WR_ID) { - if (wc.status != IBV_WC_SUCCESS) - continue; - rcnt++; - - imm_data = ntohl(wc.imm_data); - switch (rs_msg_op(imm_data)) { - case RS_OP_SGL: - us->sseq_comp = (uint16_t) rs_msg_data(imm_data); - break; - case RS_OP_IOMAP_SGL: - /* The iomap was updated, that's nice to know. */ - break; - case RS_OP_CTRL: - if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { - us->state = rs_disconnected; - return 0; - } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { - us->state &= ~rs_connect_rd; - } - break; - case RS_OP_WRITE: - /* We really shouldn't be here. */ - break; - default: - us->rmsg[us->rmsg_tail].op = rs_msg_op(imm_data); - us->rmsg[us->rmsg_tail].data = rs_msg_data(imm_data); - if (++us->rmsg_tail == us->rq_size + 1) - us->rmsg_tail = 0; - break; - } - } else { - switch (rs_msg_op((uint32_t) wc.wr_id)) { - case RS_OP_SGL: - us->ctrl_avail++; - break; - case RS_OP_CTRL: - us->ctrl_avail++; - if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) - us->state = rs_disconnected; - break; - case RS_OP_IOMAP_SGL: - us->sqe_avail++; - us->sbuf_bytes_avail += sizeof(struct rs_iomap); - break; - default: - us->sqe_avail++; - us->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); - break; - } - if (wc.status != IBV_WC_SUCCESS && (us->state & rs_connected)) { - us->state = rs_error; - us->err = EIO; - } - } - } - - if (us->state & rs_connected) { - while (!ret && rcnt--) - ret = rs_post_recv(us); - - if (ret) { - us->state = rs_error; - us->err = errno; - } - } - return ret; -} - -static int rs_get_cq_event(struct usocket *us) -{ - struct ibv_cq *cq; - void *context; - int ret; - - if (!us->cq_armed) - return 0; - - ret = ibv_get_cq_event(us->cm_id->recv_cq_channel, &cq, &context); - if (!ret) { - ibv_ack_cq_events(us->cm_id->recv_cq, 1); - us->cq_armed = 0; - } else if (errno != EAGAIN) { - us->state = rs_error; - } - - return ret; -} - -/* - * Although we serialize rsend and rrecv calls with respect to themselves, - * both calls may run simultaneously and need to poll the CQ for completions. - * We need to serialize access to the CQ, but rsend and rrecv need to - * allow each other to make forward progress. - * - * For example, rsend may need to wait for credits from the remote side, - * which could be stalled until the remote process calls rrecv. This should - * not block rrecv from receiving data from the remote side however. - * - * We handle this by using two locks. The cq_lock protects against polling - * the CQ and processing completions. The cq_wait_lock serializes access to - * waiting on the CQ. - */ -static int rs_process_cq(struct usocket *us, int nonblock, int (*test)(struct usocket *us)) -{ - int ret; - - fastlock_acquire(&us->cq_lock); - do { - rs_update_credits(us); - ret = rs_poll_cq(us); - if (test(us)) { - ret = 0; - break; - } else if (ret) { - break; - } else if (nonblock) { - ret = ERR(EWOULDBLOCK); - } else if (!us->cq_armed) { - ibv_req_notify_cq(us->cm_id->recv_cq, 0); - us->cq_armed = 1; - } else { - rs_update_credits(us); - fastlock_acquire(&us->cq_wait_lock); - fastlock_release(&us->cq_lock); - - ret = rs_get_cq_event(us); - fastlock_release(&us->cq_wait_lock); - fastlock_acquire(&us->cq_lock); - } - } while (!ret); - - rs_update_credits(us); - fastlock_release(&us->cq_lock); - return ret; -} - -static int rs_get_comp(struct usocket *us, int nonblock, int (*test)(struct usocket *us)) -{ - struct timeval s, e; - uint32_t poll_time = 0; - int ret; - - do { - ret = rs_process_cq(us, 1, test); - if (!ret || nonblock || errno != EWOULDBLOCK) - return ret; - - if (!poll_time) - gettimeofday(&s, NULL); - - gettimeofday(&e, NULL); - poll_time = (e.tv_sec - s.tv_sec) * 1000000 + - (e.tv_usec - s.tv_usec) + 1; - } while (poll_time <= polling_time); - - ret = rs_process_cq(us, 0, test); - return ret; -} - -static int rs_nonblocking(struct usocket *us, int flags) -{ - return (us->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); -} - -static int rs_is_cq_armed(struct usocket *us) -{ - return us->cq_armed; -} - -static int rs_poll_all(struct usocket *us) -{ - return 1; -} - -/* - * We use hardware flow control to prevent over running the remote - * receive queue. However, data transfers still require space in - * the remote rmsg queue, or we risk losing notification that data - * has been transfered. - * - * Be careful with race conditions in the check below. The target SGL - * may be updated by a remote RDMA write. - */ -static int rs_can_send(struct usocket *us) -{ - return us->sqe_avail && (us->sbuf_bytes_avail >= RS_SNDLOWAT) && - (us->sseq_no != us->sseq_comp) && - (us->target_sgl[us->target_sge].length != 0); -} - -static int rs_conn_can_send(struct usocket *us) -{ - return rs_can_send(us) || !(us->state & rs_connect_wr); -} - -static int rs_conn_can_send_ctrl(struct usocket *us) -{ - return us->ctrl_avail || !(us->state & rs_connected); -} - -static int rs_have_rdata(struct usocket *us) -{ - return (us->rmsg_head != us->rmsg_tail); -} - -static int rs_conn_have_rdata(struct usocket *us) -{ - return rs_have_rdata(us) || !(us->state & rs_connect_rd); -} - -static int rs_conn_all_sends_done(struct usocket *us) -{ - return ((us->sqe_avail + us->ctrl_avail) == us->sq_size) || - !(us->state & rs_connected); -} - -static ssize_t rs_peek(struct usocket *us, void *buf, size_t len) -{ - size_t left = len; - uint32_t end_size, rsize; - int rmsg_head, rbuf_offset; - - rmsg_head = us->rmsg_head; - rbuf_offset = us->rbuf_offset; - - for (; left && (rmsg_head != us->rmsg_tail); left -= rsize) { - if (left < us->rmsg[rmsg_head].data) { - rsize = left; - } else { - rsize = us->rmsg[rmsg_head].data; - if (++rmsg_head == us->rq_size + 1) - rmsg_head = 0; - } - - end_size = us->rbuf_size - rbuf_offset; - if (rsize > end_size) { - memcpy(buf, &us->rbuf[rbuf_offset], end_size); - rbuf_offset = 0; - buf += end_size; - rsize -= end_size; - left -= end_size; - } - memcpy(buf, &us->rbuf[rbuf_offset], rsize); - rbuf_offset += rsize; - buf += rsize; - } - - return len - left; -} - -/* - * Continue to receive any queued data even if the remote side has disconnected. - */ -ssize_t rrecv(int socket, void *buf, size_t len, int flags) -{ - struct usocket *us; - size_t left = len; - uint32_t end_size, rsize; - int ret; - - us = idm_at(&idm, socket); - if (us->state & rs_opening) { - ret = rs_do_connect(us); - if (ret) { - if (errno == EINPROGRESS) - errno = EAGAIN; - return ret; - } - } - fastlock_acquire(&us->rlock); - do { - if (!rs_have_rdata(us)) { - ret = rs_get_comp(us, rs_nonblocking(us, flags), - rs_conn_have_rdata); - if (ret) - break; - } - - ret = 0; - if (flags & MSG_PEEK) { - left = len - rs_peek(us, buf, left); - break; - } - - for (; left && rs_have_rdata(us); left -= rsize) { - if (left < us->rmsg[us->rmsg_head].data) { - rsize = left; - us->rmsg[us->rmsg_head].data -= left; - } else { - us->rseq_no++; - rsize = us->rmsg[us->rmsg_head].data; - if (++us->rmsg_head == us->rq_size + 1) - us->rmsg_head = 0; - } - - end_size = us->rbuf_size - us->rbuf_offset; - if (rsize > end_size) { - memcpy(buf, &us->rbuf[us->rbuf_offset], end_size); - us->rbuf_offset = 0; - buf += end_size; - rsize -= end_size; - left -= end_size; - us->rbuf_bytes_avail += end_size; - } - memcpy(buf, &us->rbuf[us->rbuf_offset], rsize); - us->rbuf_offset += rsize; - buf += rsize; - us->rbuf_bytes_avail += rsize; - } - - } while (left && (flags & MSG_WAITALL) && (us->state & rs_connect_rd)); - - fastlock_release(&us->rlock); - return ret ? ret : len - left; -} - -ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, - struct sockaddr *src_addr, socklen_t *addrlen) -{ - int ret; - - ret = rrecv(socket, buf, len, flags); - if (ret > 0 && src_addr) - rgetpeername(socket, src_addr, addrlen); - - return ret; -} - -/* - * Simple, straightforward implementation for now that only tries to fill - * in the first vector. - */ -static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags) -{ - return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags); -} - -ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags) -{ - if (msg->msg_control && msg->msg_controllen) - return ERR(ENOTSUP); - - return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); -} - -ssize_t rread(int socket, void *buf, size_t count) -{ - return rrecv(socket, buf, count, 0); -} - -ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt) -{ - return rrecvv(socket, iov, iovcnt, 0); -} - -static int rs_send_iomaps(struct usocket *us, int flags) -{ - struct rs_iomap_mr *iomr; - struct ibv_sge sge; - struct rs_iomap iom; - int ret; - - fastlock_acquire(&us->iomap_lock); - while (!dlist_empty(&us->iomap_queue)) { - if (!rs_can_send(us)) { - ret = rs_get_comp(us, rs_nonblocking(us, flags), - rs_conn_can_send); - if (ret) - break; - if (!(us->state & rs_connect_wr)) { - ret = ERR(ECONNRESET); - break; - } - } - - iomr = container_of(us->iomap_queue.next, struct rs_iomap_mr, entry); - if (!(us->opts & RS_OPT_SWAP_SGL)) { - iom.offset = iomr->offset; - iom.sge.addr = (uintptr_t) iomr->mr->addr; - iom.sge.length = iomr->mr->length; - iom.sge.key = iomr->mr->rkey; - } else { - iom.offset = bswap_64(iomr->offset); - iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr); - iom.sge.length = bswap_32(iomr->mr->length); - iom.sge.key = bswap_32(iomr->mr->rkey); - } - - if (us->sq_inline >= sizeof iom) { - sge.addr = (uintptr_t) &iom; - sge.length = sizeof iom; - sge.lkey = 0; - ret = rs_write_iomap(us, iomr, &sge, 1, IBV_SEND_INLINE); - } else if (rs_sbuf_left(us) >= sizeof iom) { - memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom, sizeof iom); - us->ssgl[0].length = sizeof iom; - ret = rs_write_iomap(us, iomr, us->ssgl, 1, 0); - if (rs_sbuf_left(us) > sizeof iom) - us->ssgl[0].addr += sizeof iom; - else - us->ssgl[0].addr = (uintptr_t) us->sbuf; - } else { - us->ssgl[0].length = rs_sbuf_left(us); - memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom, - us->ssgl[0].length); - us->ssgl[1].length = sizeof iom - us->ssgl[0].length; - memcpy(us->sbuf, ((void *) &iom) + us->ssgl[0].length, - us->ssgl[1].length); - ret = rs_write_iomap(us, iomr, us->ssgl, 2, 0); - us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; - } - dlist_remove(&iomr->entry); - dlist_insert_tail(&iomr->entry, &us->iomap_list); - if (ret) - break; - } - - us->iomap_pending = !dlist_empty(&us->iomap_queue); - fastlock_release(&us->iomap_lock); - return ret; -} - -/* - * We overlap sending the data, by posting a small work request immediately, - * then increasing the size of the send on each iteration. - */ -ssize_t rsend(int socket, const void *buf, size_t len, int flags) -{ - struct usocket *us; - struct ibv_sge sge; - size_t left = len; - uint32_t xfer_size, olen = RS_OLAP_START_SIZE; - int ret = 0; - - us = idm_at(&idm, socket); - if (us->state & rs_opening) { - ret = rs_do_connect(us); - if (ret) { - if (errno == EINPROGRESS) - errno = EAGAIN; - return ret; - } - } - - fastlock_acquire(&us->slock); - if (us->iomap_pending) { - ret = rs_send_iomaps(us, flags); - if (ret) - goto out; - } - for (; left; left -= xfer_size, buf += xfer_size) { - if (!rs_can_send(us)) { - ret = rs_get_comp(us, rs_nonblocking(us, flags), - rs_conn_can_send); - if (ret) - break; - if (!(us->state & rs_connect_wr)) { - ret = ERR(ECONNRESET); - break; - } - } - - if (olen < left) { - xfer_size = olen; - if (olen < RS_MAX_TRANSFER) - olen <<= 1; - } else { - xfer_size = left; - } - - if (xfer_size > us->sbuf_bytes_avail) - xfer_size = us->sbuf_bytes_avail; - if (xfer_size > us->target_sgl[us->target_sge].length) - xfer_size = us->target_sgl[us->target_sge].length; - - if (xfer_size <= us->sq_inline) { - sge.addr = (uintptr_t) buf; - sge.length = xfer_size; - sge.lkey = 0; - ret = rs_write_data(us, &sge, 1, xfer_size, IBV_SEND_INLINE); - } else if (xfer_size <= rs_sbuf_left(us)) { - memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf, xfer_size); - us->ssgl[0].length = xfer_size; - ret = rs_write_data(us, us->ssgl, 1, xfer_size, 0); - if (xfer_size < rs_sbuf_left(us)) - us->ssgl[0].addr += xfer_size; - else - us->ssgl[0].addr = (uintptr_t) us->sbuf; - } else { - us->ssgl[0].length = rs_sbuf_left(us); - memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf, - us->ssgl[0].length); - us->ssgl[1].length = xfer_size - us->ssgl[0].length; - memcpy(us->sbuf, buf + us->ssgl[0].length, us->ssgl[1].length); - ret = rs_write_data(us, us->ssgl, 2, xfer_size, 0); - us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; - } - if (ret) - break; - } -out: - fastlock_release(&us->slock); - - return (ret && left == len) ? ret : len - left; -} - -ssize_t rsendto(int socket, const void *buf, size_t len, int flags, - const struct sockaddr *dest_addr, socklen_t addrlen) -{ - if (dest_addr || addrlen) - return ERR(EISCONN); - - return rsend(socket, buf, len, flags); -} - -static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) -{ - size_t size; - - while (len) { - size = (*iov)->iov_len - *offset; - if (size > len) { - memcpy (dst, (*iov)->iov_base + *offset, len); - *offset += len; - break; - } - - memcpy(dst, (*iov)->iov_base + *offset, size); - len -= size; - dst += size; - (*iov)++; - *offset = 0; - } -} - -static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags) -{ - struct usocket *us; - const struct iovec *cur_iov; - size_t left, len, offset = 0; - uint32_t xfer_size, olen = RS_OLAP_START_SIZE; - int i, ret = 0; - - us = idm_at(&idm, socket); - if (us->state & rs_opening) { - ret = rs_do_connect(us); - if (ret) { - if (errno == EINPROGRESS) - errno = EAGAIN; - return ret; - } - } - - cur_iov = iov; - len = iov[0].iov_len; - for (i = 1; i < iovcnt; i++) - len += iov[i].iov_len; - left = len; - - fastlock_acquire(&us->slock); - if (us->iomap_pending) { - ret = rs_send_iomaps(us, flags); - if (ret) - goto out; - } - for (; left; left -= xfer_size) { - if (!rs_can_send(us)) { - ret = rs_get_comp(us, rs_nonblocking(us, flags), - rs_conn_can_send); - if (ret) - break; - if (!(us->state & rs_connect_wr)) { - ret = ERR(ECONNRESET); - break; - } - } - - if (olen < left) { - xfer_size = olen; - if (olen < RS_MAX_TRANSFER) - olen <<= 1; - } else { - xfer_size = left; - } - - if (xfer_size > us->sbuf_bytes_avail) - xfer_size = us->sbuf_bytes_avail; - if (xfer_size > us->target_sgl[us->target_sge].length) - xfer_size = us->target_sgl[us->target_sge].length; - - if (xfer_size <= rs_sbuf_left(us)) { - rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr, - &cur_iov, &offset, xfer_size); - us->ssgl[0].length = xfer_size; - ret = rs_write_data(us, us->ssgl, 1, xfer_size, - xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0); - if (xfer_size < rs_sbuf_left(us)) - us->ssgl[0].addr += xfer_size; - else - us->ssgl[0].addr = (uintptr_t) us->sbuf; - } else { - us->ssgl[0].length = rs_sbuf_left(us); - rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr, &cur_iov, - &offset, us->ssgl[0].length); - us->ssgl[1].length = xfer_size - us->ssgl[0].length; - rs_copy_iov(us->sbuf, &cur_iov, &offset, us->ssgl[1].length); - ret = rs_write_data(us, us->ssgl, 2, xfer_size, - xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0); - us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; - } - if (ret) - break; - } -out: - fastlock_release(&us->slock); - - return (ret && left == len) ? ret : len - left; -} - -ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) -{ - if (msg->msg_control && msg->msg_controllen) - return ERR(ENOTSUP); - - return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); -} - -ssize_t rwrite(int socket, const void *buf, size_t count) -{ - return rsend(socket, buf, count, 0); -} - -ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt) -{ - return rsendv(socket, iov, iovcnt, 0); -} - -static struct pollfd *rs_fds_alloc(nfds_t nfds) -{ - static __thread struct pollfd *rfds; - static __thread nfds_t rnfds; - - if (nfds > rnfds) { - if (rfds) - free(rfds); - - rfds = malloc(sizeof *rfds * nfds); - rnfds = rfds ? nfds : 0; - } - - return rfds; -} - -static int rs_poll_rs(struct usocket *us, int events, - int nonblock, int (*test)(struct usocket *us)) -{ - struct pollfd fds; - short revents; - int ret; - -check_cq: - if ((us->state & rs_connected) || (us->state == rs_disconnected) || - (us->state & rs_error)) { - rs_process_cq(us, nonblock, test); - - revents = 0; - if ((events & POLLIN) && rs_conn_have_rdata(us)) - revents |= POLLIN; - if ((events & POLLOUT) && rs_can_send(us)) - revents |= POLLOUT; - if (!(us->state & rs_connected)) { - if (us->state == rs_disconnected) - revents |= POLLHUP; - else - revents |= POLLERR; - } - - return revents; - } - - if (us->state == rs_listening) { - fds.fd = us->cm_id->channel->fd; - fds.events = events; - fds.revents = 0; - poll(&fds, 1, 0); - return fds.revents; - } - - if (us->state & rs_opening) { - ret = rs_do_connect(us); - if (ret) { - if (errno == EINPROGRESS) { - errno = 0; - return 0; - } else { - return POLLOUT; - } - } - goto check_cq; - } - - if (us->state == rs_connect_error) - return (us->err && events & POLLOUT) ? POLLOUT : 0; - - return 0; -} - -static int rs_poll_check(struct pollfd *fds, nfds_t nfds) -{ - struct usocket *us; - int i, cnt = 0; - - for (i = 0; i < nfds; i++) { - us = idm_lookup(&idm, fds[i].fd); - if (us) - fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all); - else - poll(&fds[i], 1, 0); - - if (fds[i].revents) - cnt++; - } - return cnt; -} - -static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) -{ - struct usocket *us; - int i; - - for (i = 0; i < nfds; i++) { - us = idm_lookup(&idm, fds[i].fd); - if (us) { - fds[i].revents = rs_poll_rs(us, fds[i].events, 0, rs_is_cq_armed); - if (fds[i].revents) - return 1; - - if (us->state >= rs_connected) - rfds[i].fd = us->cm_id->recv_cq_channel->fd; - else - rfds[i].fd = us->cm_id->channel->fd; - - rfds[i].events = POLLIN; - } else { - rfds[i].fd = fds[i].fd; - rfds[i].events = fds[i].events; - } - rfds[i].revents = 0; - - } - return 0; -} - -static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) -{ - struct usocket *us; - int i, cnt = 0; - - for (i = 0; i < nfds; i++) { - if (!rfds[i].revents) - continue; - - us = idm_lookup(&idm, fds[i].fd); - if (us) { - rs_get_cq_event(us); - fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all); - } else { - fds[i].revents = rfds[i].revents; - } - if (fds[i].revents) - cnt++; - } - return cnt; -} - -/* - * We need to poll *all* fd's that the user specifies at least once. - * Note that we may receive events on an usocket that may not be reported - * to the user (e.g. connection events or credit updates). Process those - * events, then return to polling until we find ones of interest. - */ -int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) -{ - struct timeval s, e; - struct pollfd *rfds; - uint32_t poll_time = 0; - int ret; - - do { - ret = rs_poll_check(fds, nfds); - if (ret || !timeout) - return ret; - - if (!poll_time) - gettimeofday(&s, NULL); - - gettimeofday(&e, NULL); - poll_time = (e.tv_sec - s.tv_sec) * 1000000 + - (e.tv_usec - s.tv_usec) + 1; - } while (poll_time <= polling_time); - - rfds = rs_fds_alloc(nfds); - if (!rfds) - return ERR(ENOMEM); - - do { - ret = rs_poll_arm(rfds, fds, nfds); - if (ret) - break; - - ret = poll(rfds, nfds, timeout); - if (ret <= 0) - break; - - ret = rs_poll_events(rfds, fds, nfds); - } while (!ret); - - return ret; -} - -static struct pollfd * -rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds) -{ - struct pollfd *fds; - int fd, i = 0; - - fds = calloc(*nfds, sizeof *fds); - if (!fds) - return NULL; - - for (fd = 0; fd < *nfds; fd++) { - if (readfds && FD_ISSET(fd, readfds)) { - fds[i].fd = fd; - fds[i].events = POLLIN; - } - - if (writefds && FD_ISSET(fd, writefds)) { - fds[i].fd = fd; - fds[i].events |= POLLOUT; - } - - if (exceptfds && FD_ISSET(fd, exceptfds)) - fds[i].fd = fd; - - if (fds[i].fd) - i++; - } - - *nfds = i; - return fds; -} - -static int -rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds, - fd_set *writefds, fd_set *exceptfds) -{ - int i, cnt = 0; - - for (i = 0; i < nfds; i++) { - if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) { - FD_SET(fds[i].fd, readfds); - cnt++; - } - - if (writefds && (fds[i].revents & POLLOUT)) { - FD_SET(fds[i].fd, writefds); - cnt++; - } - - if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) { - FD_SET(fds[i].fd, exceptfds); - cnt++; - } - } - return cnt; -} - -static int rs_convert_timeout(struct timeval *timeout) -{ - return !timeout ? -1 : - timeout->tv_sec * 1000 + timeout->tv_usec / 1000; -} - -int rselect(int nfds, fd_set *readfds, fd_set *writefds, - fd_set *exceptfds, struct timeval *timeout) -{ - struct pollfd *fds; - int ret; - - fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds); - if (!fds) - return ERR(ENOMEM); - - ret = rpoll(fds, nfds, rs_convert_timeout(timeout)); - - if (readfds) - FD_ZERO(readfds); - if (writefds) - FD_ZERO(writefds); - if (exceptfds) - FD_ZERO(exceptfds); - - if (ret > 0) - ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds); - - free(fds); - return ret; -} - -/* - * For graceful disconnect, notify the remote side that we're - * disconnecting and wait until all outstanding sends complete. - */ -int rshutdown(int socket, int how) -{ - struct usocket *us; - int ctrl, ret = 0; - - us = idm_at(&idm, socket); - if (how == SHUT_RD) { - us->state &= ~rs_connect_rd; - return 0; - } - - if (us->fd_flags & O_NONBLOCK) - rs_set_nonblocking(us, 0); - - if (us->state & rs_connected) { - if (how == SHUT_RDWR) { - ctrl = RS_CTRL_DISCONNECT; - us->state &= ~(rs_connect_rd | rs_connect_wr); - } else { - us->state &= ~rs_connect_wr; - ctrl = (us->state & rs_connect_rd) ? - RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; - } - if (!us->ctrl_avail) { - ret = rs_process_cq(us, 0, rs_conn_can_send_ctrl); - if (ret) - return ret; - } - - if ((us->state & rs_connected) && us->ctrl_avail) { - us->ctrl_avail--; - ret = rs_post_write_msg(us, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); - } - } - - if (us->state & rs_connected) - rs_process_cq(us, 0, rs_conn_all_sends_done); - - if ((us->fd_flags & O_NONBLOCK) && (us->state & rs_connected)) - rs_set_nonblocking(us, 1); - - return 0; -} - -int rclose(int socket) -{ - struct usocket *us; - - us = idm_at(&idm, socket); - if (us->state & rs_connected) - rshutdown(socket, SHUT_RDWR); - - rs_free(us); - return 0; -} - -static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len) -{ - socklen_t size; - - if (src->sa_family == AF_INET) { - size = min(*len, sizeof(struct sockaddr_in)); - *len = sizeof(struct sockaddr_in); - } else { - size = min(*len, sizeof(struct sockaddr_in6)); - *len = sizeof(struct sockaddr_in6); - } - memcpy(dst, src, size); -} - -int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) -{ - struct usocket *us; - - us = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_peer_addr(us->cm_id), addrlen); - return 0; -} - -int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) -{ - struct usocket *us; - - us = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_local_addr(us->cm_id), addrlen); - return 0; -} - -int rsetsockopt(int socket, int level, int optname, - const void *optval, socklen_t optlen) -{ - struct usocket *us; - int ret, opt_on = 0; - uint64_t *opts = NULL; - - ret = ERR(ENOTSUP); - us = idm_at(&idm, socket); - switch (level) { - case SOL_SOCKET: - opts = &us->so_opts; - switch (optname) { - case SO_REUSEADDR: - ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID, - RDMA_OPTION_ID_REUSEADDR, - (void *) optval, optlen); - if (ret && ((errno == ENOSYS) || ((us->state != rs_init) && - us->cm_id->context && - (us->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB)))) - ret = 0; - opt_on = *(int *) optval; - break; - case SO_RCVBUF: - if (!us->rbuf) - us->rbuf_size = (*(uint32_t *) optval) << 1; - ret = 0; - break; - case SO_SNDBUF: - if (!us->sbuf) - us->sbuf_size = (*(uint32_t *) optval) << 1; - if (us->sbuf_size < RS_SNDLOWAT) - us->sbuf_size = RS_SNDLOWAT << 1; - ret = 0; - break; - case SO_LINGER: - /* Invert value so default so_opt = 0 is on */ - opt_on = !((struct linger *) optval)->l_onoff; - ret = 0; - break; - case SO_KEEPALIVE: - opt_on = *(int *) optval; - ret = 0; - break; - case SO_OOBINLINE: - opt_on = *(int *) optval; - ret = 0; - break; - default: - break; - } - break; - case IPPROTO_TCP: - opts = &us->tcp_opts; - switch (optname) { - case TCP_NODELAY: - opt_on = *(int *) optval; - ret = 0; - break; - case TCP_MAXSEG: - ret = 0; - break; - default: - break; - } - break; - case IPPROTO_IPV6: - opts = &us->ipv6_opts; - switch (optname) { - case IPV6_V6ONLY: - ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID, - RDMA_OPTION_ID_AFONLY, - (void *) optval, optlen); - opt_on = *(int *) optval; - break; - default: - break; - } - break; - case SOL_RDMA: - if (us->state >= rs_opening) { - ret = ERR(EINVAL); - break; - } - - switch (optname) { - case RDMA_SQSIZE: - us->sq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE); - break; - case RDMA_RQSIZE: - us->rq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE); - break; - case RDMA_INLINE: - us->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE); - if (us->sq_inline < RS_MIN_INLINE) - us->sq_inline = RS_MIN_INLINE; - break; - case RDMA_IOMAPSIZE: - us->target_iomap_size = (uint16_t) rs_scale_to_value( - (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8); - break; - default: - break; - } - break; - default: - break; - } - - if (!ret && opts) { - if (opt_on) - *opts |= (1 << optname); - else - *opts &= ~(1 << optname); - } - - return ret; -} - -int rgetsockopt(int socket, int level, int optname, - void *optval, socklen_t *optlen) -{ - struct usocket *us; - int ret = 0; - - us = idm_at(&idm, socket); - switch (level) { - case SOL_SOCKET: - switch (optname) { - case SO_REUSEADDR: - case SO_KEEPALIVE: - case SO_OOBINLINE: - *((int *) optval) = !!(us->so_opts & (1 << optname)); - *optlen = sizeof(int); - break; - case SO_RCVBUF: - *((int *) optval) = us->rbuf_size; - *optlen = sizeof(int); - break; - case SO_SNDBUF: - *((int *) optval) = us->sbuf_size; - *optlen = sizeof(int); - break; - case SO_LINGER: - /* Value is inverted so default so_opt = 0 is on */ - ((struct linger *) optval)->l_onoff = - !(us->so_opts & (1 << optname)); - ((struct linger *) optval)->l_linger = 0; - *optlen = sizeof(struct linger); - break; - case SO_ERROR: - *((int *) optval) = us->err; - *optlen = sizeof(int); - us->err = 0; - break; - default: - ret = ENOTSUP; - break; - } - break; - case IPPROTO_TCP: - switch (optname) { - case TCP_NODELAY: - *((int *) optval) = !!(us->tcp_opts & (1 << optname)); - *optlen = sizeof(int); - break; - case TCP_MAXSEG: - *((int *) optval) = (us->cm_id && us->cm_id->route.num_paths) ? - 1 << (7 + us->cm_id->route.path_rec->mtu) : - 2048; - *optlen = sizeof(int); - break; - default: - ret = ENOTSUP; - break; - } - break; - case IPPROTO_IPV6: - switch (optname) { - case IPV6_V6ONLY: - *((int *) optval) = !!(us->ipv6_opts & (1 << optname)); - *optlen = sizeof(int); - break; - default: - ret = ENOTSUP; - break; - } - break; - case SOL_RDMA: - switch (optname) { - case RDMA_SQSIZE: - *((int *) optval) = us->sq_size; - *optlen = sizeof(int); - break; - case RDMA_RQSIZE: - *((int *) optval) = us->rq_size; - *optlen = sizeof(int); - break; - case RDMA_INLINE: - *((int *) optval) = us->sq_inline; - *optlen = sizeof(int); - break; - case RDMA_IOMAPSIZE: - *((int *) optval) = us->target_iomap_size; - *optlen = sizeof(int); - break; - default: - ret = ENOTSUP; - break; - } - break; - default: - ret = ENOTSUP; - break; - } - - return rdma_seterrno(ret); -} - -int ufcntl(int socket, int cmd, ... /* arg */ ) -{ - struct usocket *us; - va_list args; - long param; - int ret = 0; - - us = idm_at(&idm, socket); - va_start(args, cmd); - switch (cmd) { - case F_GETFL: - ret = (int) us->fd_flags; - break; - case F_SETFL: - param = va_arg(args, long); - if (param & O_NONBLOCK) - ret = rs_set_nonblocking(us, O_NONBLOCK); - - if (!ret) - us->fd_flags |= param; - break; - default: - ret = ERR(ENOTSUP); - break; - } - va_end(args); - return ret; -} -- 2.41.0