From: Sean Hefty Date: Fri, 16 Nov 2012 04:55:17 +0000 (-0800) Subject: refresh (create temporary patch) X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=3e4e1e0bfe3fff921e721b5bc57aa726e4726197;p=~shefty%2Flibrdmacm.git refresh (create temporary patch) --- diff --git a/meta b/meta index 9baf99f5..f1f4c22a 100644 --- a/meta +++ b/meta @@ -1,8 +1,9 @@ Version: 1 -Previous: 32f1d3133ceb558ad29cd409072d3e9006e2f316 -Head: 38e18d02bb5e7702937670fcc11b4d61e06f86d1 +Previous: 5fb5289e3257a985e42816bbc6c7884787065a2b +Head: 8d8e192aa985114a4714ea91a9ec1136033bca83 Applied: usocket: 38e18d02bb5e7702937670fcc11b4d61e06f86d1 + refresh-temp: 8d8e192aa985114a4714ea91a9ec1136033bca83 Unapplied: iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed resv-rs-len: 7b6ff5c4894f54b221d877adcd709795dffb2fe9 diff --git a/patches/refresh-temp b/patches/refresh-temp new file mode 100644 index 00000000..2edf7037 --- /dev/null +++ b/patches/refresh-temp @@ -0,0 +1,3018 @@ +Bottom: 10bba9bde633e5c6c120294156e4bfc3d86d57a0 +Top: 97a52629c221cba1033082bbd308ecfc4d4b6082 +Author: Sean Hefty +Date: 2012-11-15 20:55:16 -0800 + +Refresh of usocket + +--- + +diff --git a/docs/rsocket b/docs/rsocket +index 1484f65..03d49df 100644 +--- a/docs/rsocket ++++ b/docs/rsocket +@@ -1,7 +1,7 @@ +-rsocket Protocol and Design Guide 9/10/2012 ++rsocket Protocol and Design Guide 11/11/2012 + +-Overview +--------- ++Data Streaming (TCP) Overview ++----------------------------- + Rsockets is a protocol over RDMA that supports a socket-level API + for applications. For details on the current state of the + implementation, readers should refer to the rsocket man page. This +@@ -189,3 +189,47 @@ registered remote data buffer. + From host A's perspective, the transfer appears as a normal send/write + operation, with the data stream redirected directly into the receiving + application's buffer. ++ ++ ++ ++Datagram Overview ++----------------- ++THe rsocket API supports datagram sockets. Datagram support is handled through an ++entirely different protocol and internal implementation. Unlike connected rsockets, ++datagram rsockets are not necessarily bound to a network (IP) address. A datagram ++socket may use any number of network (IP) addresses, including those which map to ++different RDMA devices. As a result, a single datagram rsocket must support ++using multiple RDMA devices and ports, and a datagram rsocket references a single ++UDP socket, plus zero or more UD QPs. ++ ++Rsockets uses headers inserted before user data sent over UDP sockets to resolve ++remote UD QP numbers. When a user first attempts to send a datagram to a remote ++address (IP and UDP port), rsockets will take the following steps. First, it ++will allocate and store the destination address into a lookup table for future ++use. Then it will resolve which local network address should be used when sending ++to the specified destination. It will allocate a UD QP on the RDMA device ++associated with the local address. Finally, rsockets will send the user's ++datagram to the remote UDP socket, but inserting a header before the datagram. ++The header specifies the UD QP number associated with the local network address ++(IP and UDP port) of the send. ++ ++Under many circumstances, the rsocket UDP header also provides enough path ++information for the receiver to send the reply using a UD QP. However, certain ++fabric topologies may require contacting a subnet administrator. Whenever ++rsockets lacks sufficient information to send data to a remote peer using ++a UD QP, it will automatically fall back to using a standard UDP socket. This ++helps minimize the latency for performing a given send, while lenghty address ++and route resolution protocols complete. ++ ++Currently, rsockets allocates a single UD QP, with an infrastructure is provided ++to support more. Future enhancements may add support for multiple UD QPs, each ++associated with a single network (IP) address. Multiplexing different network ++addresses over a single UD QP and using shared receive queues are other possible ++enhancements. ++ ++Because rsockets may use multiple UD QPs, buffer management can become an issue. ++The total size of receive buffers is specified by the mem_default configuration ++files, but may be overridden with rsetsockopt. The buffer is divided into ++MTU sized messages and distributed among the allocated QPs. As new QPs are ++created, the receive buffers may be redistributed by posting loopback sends ++on a QP in order to free the receive buffers for reposting to a different QP. +\ No newline at end of file +diff --git a/src/rsocket.c b/src/rsocket.c +index 58fcb8e..99e638c 100644 +--- a/src/rsocket.c ++++ b/src/rsocket.c +@@ -110,6 +110,12 @@ struct rs_msg { + uint32_t data; + }; + ++struct ds_qp; ++ ++struct ds_msg { ++ struct ds_qp *qp; ++}; ++ + struct rs_sge { + uint64_t addr; + uint32_t key; +@@ -169,13 +175,63 @@ enum rs_state { + + #define RS_OPT_SWAP_SGL 1 + +-struct rsocket { ++union socket_addr { ++ struct sockaddr sa; ++ struct sockaddr_in sin; ++ struct sockaddr_in6 sin6; ++}; ++ ++struct ds_qp { + struct rdma_cm_id *cm_id; ++ int rbuf_cnt; ++ uint16_t lid; ++ uint8_t sl; ++}; ++ ++struct ds_dest { ++ union socket_addr addr; ++ struct ds_qp *qp; ++ struct ibv_ah *ah; ++ uint32_t qpn; ++ atomic_t refcnt; ++}; ++ ++struct rsocket { ++ int type; ++ int index; + fastlock_t slock; + fastlock_t rlock; + fastlock_t cq_lock; + fastlock_t cq_wait_lock; +- fastlock_t iomap_lock; ++ fastlock_t map_lock; /* acquire slock first if needed */ ++ ++ union { ++ /* data stream */ ++ struct { ++ struct rdma_cm_id *cm_id; ++ ++ int remote_sge; ++ struct rs_sge remote_sgl; ++ struct rs_sge remote_iomap; ++ ++ struct ibv_mr *target_mr; ++ int target_sge; ++ int target_iomap_size; ++ void *target_buffer_list; ++ volatile struct rs_sge *target_sgl; ++ struct rs_iomap *target_iomap; ++ }; ++ /* datagram */ ++ struct { ++ struct ds_qp *qp_array; ++ void *dest_map; ++ struct ds_dest *conn_dest; ++ struct pollfd *fds; ++ nfds_t nfds; ++ int fd; ++ int sbytes_needed; ++ }; ++ }; + + int opts; + long fd_flags; +@@ -186,7 +242,7 @@ struct rsocket { + int cq_armed; + int retries; + int err; +- int index; ++ + int ctrl_avail; + int sqe_avail; + int sbuf_bytes_avail; +@@ -203,34 +259,37 @@ struct rsocket { + int rbuf_offset; + int rmsg_head; + int rmsg_tail; +- struct rs_msg *rmsg; +- +- int remote_sge; +- struct rs_sge remote_sgl; +- struct rs_sge remote_iomap; ++ union { ++ struct rs_msg *rmsg; ++ struct ds_msg *dmsg; ++ }; + + struct rs_iomap_mr *remote_iomappings; + dlist_entry iomap_list; + dlist_entry iomap_queue; + int iomap_pending; + +- struct ibv_mr *target_mr; +- int target_sge; +- int target_iomap_size; +- void *target_buffer_list; +- volatile struct rs_sge *target_sgl; +- struct rs_iomap *target_iomap; +- + uint32_t rbuf_size; +- struct ibv_mr *rmr; ++ struct ibv_mr *rmr; + uint8_t *rbuf; + + uint32_t sbuf_size; +- struct ibv_mr *smr; ++ struct ibv_mr *smr; + struct ibv_sge ssgl[2]; + uint8_t *sbuf; + }; + ++#define DS_UDP_TAG 0x5555555555555555ULL ++ ++struct ds_udp_header { ++ uint64_t tag; ++ uint8_t version; ++ uint8_t sl; ++ uint16_t slid; ++ uint32_t qpn; /* upper 8-bits reserved */ ++}; ++ ++ + static int rs_value_to_scale(int value, int bits) + { + return value <= (1 << (bits - 1)) ? +@@ -306,10 +365,10 @@ out: + pthread_mutex_unlock(&mut); + } + +-static int rs_insert(struct rsocket *rs) ++static int rs_insert(struct rsocket *rs, index) + { + pthread_mutex_lock(&mut); +- rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs); ++ rs->index = idm_set(&idm, index, rs); + pthread_mutex_unlock(&mut); + return rs->index; + } +@@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs) + pthread_mutex_unlock(&mut); + } + +-static struct rsocket *rs_alloc(struct rsocket *inherited_rs) ++static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) + { + struct rsocket *rs; + +@@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) + if (!rs) + return NULL; + ++ rs->type = type; + rs->index = -1; ++ rs->fd = -1; + if (inherited_rs) { + rs->sbuf_size = inherited_rs->sbuf_size; + rs->rbuf_size = inherited_rs->rbuf_size; +@@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) + fastlock_init(&rs->rlock); + fastlock_init(&rs->cq_lock); + fastlock_init(&rs->cq_wait_lock); +- fastlock_init(&rs->iomap_lock); ++ fastlock_init(&rs->map_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); + return rs; +@@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs) + rdma_destroy_id(rs->cm_id); + } + +- fastlock_destroy(&rs->iomap_lock); ++ if (rs->fd >= 0) ++ close(rs->fd); ++ if (rs->fds) ++ free(rs->fds); ++ ++ fastlock_destroy(&rs->map_lock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); + fastlock_destroy(&rs->rlock); +@@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) + rs->sseq_comp = ntohs(conn->credits); + } + ++static int ds_init(struct rsocket *rs, int domain) ++{ ++ rs->fd = socket(domain, SOCK_DGRAM, 0); ++ if (rs->fd < 0) ++ return rs->fd; ++ ++ rs->fds = calloc(1, sizeof *fds); ++ if (!rs->fds) ++ return ERR(ENOMEM); ++ rs->nfds = 1; ++ ++ return 0; ++} ++ + int rsocket(int domain, int type, int protocol) + { + struct rsocket *rs; +- int ret; ++ int index, ret; + + if ((domain != PF_INET && domain != PF_INET6) || +- (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) ++ ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || ++ (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || ++ (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) + return ERR(ENOTSUP); + + rs_configure(); +- rs = rs_alloc(NULL); ++ rs = rs_alloc(NULL, type); + if (!rs) + return ERR(ENOMEM); + +- ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); +- if (ret) +- goto err; ++ if (type == SOCK_STREAM) { ++ ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); ++ if (ret) ++ goto err; ++ ++ rs->cm_id->route.addr.src_addr.sa_family = domain; ++ index = rs->cm_id->channel->fd; ++ } else { ++ ret = ds_init(rs, domain); ++ if (ret) ++ goto err; + +- ret = rs_insert(rs); ++ index = rs->fd; ++ } ++ ++ ret = rs_insert(rs, index); + if (ret < 0) + goto err; + +- rs->cm_id->route.addr.src_addr.sa_family = domain; + return rs->index; + + err: +@@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) + int ret; + + rs = idm_at(&idm, socket); +- ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); +- if (!ret) +- rs->state = rs_bound; ++ if (rs->type == SOCK_STREAM) { ++ ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); ++ if (!ret) ++ rs->state = rs_bound; ++ } else { ++ ret = bind(rs->fd, addr, addrlen); ++ } + return ret; + } + +@@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) + int ret; + + rs = idm_at(&idm, socket); +- new_rs = rs_alloc(rs); ++ new_rs = rs_alloc(rs, rs->type); + if (!new_rs) + return ERR(ENOMEM); + +@@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) + if (ret) + goto err; + +- ret = rs_insert(new_rs); ++ ret = rs_insert(new_rs, rs->cm_id->channel->fd); + if (ret < 0) + goto err; + +@@ -854,6 +950,66 @@ connected: + return ret; + } + ++static int ds_compare_dest(const void *dst1, const void *dst2) ++{ ++ const struct sockaddr *sa1, *sa2; ++ size_t len; ++ ++ sa1 = (const struct sockaddr *) dst1; ++ sa2 = (const struct sockaddr *) dst2; ++ ++ len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? ++ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); ++ return memcmp(dst1, dst2, len); ++} ++ ++/* Caller must hold map_lock around accessing source address */ ++static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr, ++ socklen_t dst_len, socklen_t *src_len) ++{ ++ static union socket_addr src_addr; ++ int sock, ret; ++ ++ sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0); ++ if (sock < 0) ++ return NULL; ++ ++ ret = connect(sock, &dst_addr->sa, dst_len); ++ if (ret) ++ goto out; ++ ++ *src_len = sizeof src_addr; ++ ret = getsockname(sock, &src_addr.sa, src_len); ++ ++out: ++ close(sock); ++ return ret ? NULL : &src_addr; ++} ++ ++static struct ds_qp *ds_get_qp(struct rsocket *rs, ++ const struct sockaddr *src_addr, socklen_t addrlen) ++{ ++ union socket_addr *addr; ++ socklen_t len; ++ ++} ++ ++static int ds_connect(struct rsocket *rs, ++ const struct sockaddr *dest_addr, socklen_t addrlen) ++{ ++ struct ds_dest **dest; ++ ++ fastlock_acquire(&rs->map_lock); ++ dest = tfind(dest_addr, rs->dest_map, ds_compare_dest); ++ if (dest) { ++ rs->conn_dest = *dest; ++ goto out; ++ } ++out: ++ fastlock_release(&rs->map_lock); ++ return 0; ++} ++ + int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) + { + struct rsocket *rs; +@@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs, + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); + } + ++static int ds_post_send(struct rsocket *rs, ++ struct ibv_sge *sgl, int nsge, ++ uint64_t wr_id, int flags) ++{ ++ struct ibv_send_wr wr, *bad; ++ ++ wr.wr_id = wr_id; ++ wr.next = NULL; ++ wr.sg_list = sgl; ++ wr.num_sge = nsge; ++ wr.opcode = IBV_WR_SEND; ++ wr.send_flags = flags; ++ wr.wr.ud.ah = rs->conn_dest->ah; ++ wr.wr.ud.remote_qpn = rs->conn_dest->qpn; ++ wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; ++ ++ return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); ++} ++ + /* + * Update target SGE before sending data. Otherwise the remote side may + * update the entry before we do. +@@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs, + flags, addr, rkey); + } + ++static int ds_send_data(struct rsocket *rs, ++ struct ibv_sge *sgl, int nsge, ++ uint32_t length, int flags) ++{ ++ rs->sqe_avail--; ++ rs->sbuf_bytes_avail -= length; ++ return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags); ++} ++ + static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, + struct ibv_sge *sgl, int nsge, uint32_t length, int flags) + { +@@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs) + (rs->target_sgl[rs->target_sge].length != 0); + } + ++static int ds_can_send(struct rsocket *rs) ++{ ++ return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed); ++} ++ + static int rs_conn_can_send(struct rsocket *rs) + { + return rs_can_send(rs) || !(rs->state & rs_connect_wr); +@@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) + struct rs_iomap iom; + int ret; + +- fastlock_acquire(&rs->iomap_lock); ++ fastlock_acquire(&rs->map_lock); + while (!dlist_empty(&rs->iomap_queue)) { + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), +@@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) + } + + rs->iomap_pending = !dlist_empty(&rs->iomap_queue); +- fastlock_release(&rs->iomap_lock); ++ fastlock_release(&rs->map_lock); + return ret; + } + ++static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, ++ int iovcnt, int flags) ++{ ++ struct ds_udp_header hdr; ++ struct msghdr msg; ++ struct iovec miov[4]; ++ struct ds_qp *qp; ++ ++ if (iovcnt > 4) ++ return ERR(ENOTSUP); ++ ++ qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL; ++ hdr.tag = htonll(DS_UDP_TAG); ++ hdr.version = 1; ++ if (qp) { ++ hdr.sl = qp->sl; ++ hdr.slid = htons(qp->lid); ++ hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF); ++ } else { ++ hdr.sl = 0; ++ hdr.slid = 0; ++ hdr.qpn = 0; ++ } ++ ++ miov[0].iov_base = &hdr; ++ miov[0].iov_len = sizeof hdr; ++ memcpy(&miov[1], iov, sizeof *iov * iovcnt); ++ ++ memset(&msg, 0, sizeof msg); ++ /* TODO: specify name if needed */ ++ msg.msg_iov = miov; ++ msg.msg_iovlen = iovcnt + 1; ++ return sendmsg(rs->fd, msg, flags); ++} ++ ++static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags) ++{ ++ struct iovec iov; ++ iov.iov_base = buf; ++ iov_iov_len = len; ++ return ds_sendv_udp(s, &iov, 1, flags); ++} ++ ++static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) ++{ ++ struct ibv_sge sge; ++ int ret = 0; ++ ++ if (!rs->conn_dest || !rs->conn_dest->ah) ++ return ds_send_udp(rs, buf, len, flags); ++ ++ rs->sbytes_needed = len; ++ if (!ds_can_send(rs)) { ++ ret = rs_get_comp(rs, 1, ds_can_send); ++ if (ret) ++ return ds_send_udp(rs, buf, len, flags); ++ } ++ ++ if (len <= rs->sq_inline) { ++ sge.addr = (uintptr_t) buf; ++ sge.length = len; ++ sge.lkey = 0; ++ ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE); ++ } else if (len <= rs_sbuf_left(rs)) { ++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len); ++ rs->ssgl[0].length = len; ++ ret = ds_send_data(rs, rs->ssgl, 1, len, 0); ++ if (len < rs_sbuf_left(rs)) ++ rs->ssgl[0].addr += len; ++ else ++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf; ++ } else { ++ rs->ssgl[0].length = rs_sbuf_left(rs); ++ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, ++ rs->ssgl[0].length); ++ rs->ssgl[1].length = len - rs->ssgl[0].length; ++ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); ++ ret = ds_send_data(rs, rs->ssgl, 2, len, 0); ++ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; ++ } ++ ++ return ret ? ret : len; ++} ++ + /* + * We overlap sending the data, by posting a small work request immediately, + * then increasing the size of the send on each iteration. +@@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) + int ret = 0; + + rs = idm_at(&idm, socket); ++ if (rs->type == SOCK_DGRAM) { ++ fastlock_acquire(&rs->slock); ++ ret = dsend(rs, buf, len, flags); ++ fastlock_release(&rs->slock); ++ return ret; ++ } ++ + if (rs->state & rs_opening) { + ret = rs_do_connect(rs); + if (ret) { +@@ -1537,10 +1817,21 @@ out: + ssize_t rsendto(int socket, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) + { +- if (dest_addr || addrlen) +- return ERR(EISCONN); ++ struct rsocket *rs; ++ ++ rs = idm_at(&idm, socket); ++ if (rs->type == SOCK_STREAM) { ++ if (dest_addr || addrlen) ++ return ERR(EISCONN); ++ ++ return rsend(socket, buf, len, flags); ++ } + +- return rsend(socket, buf, len, flags); ++ fastlock_acquire(&rs->slock); ++ ds_connect(rs, dest_addr, addrlen); ++ ret = dsend(rs, buf, len, flags); ++ fastlock_release(&rs->slock); ++ return ret; + } + + static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) +@@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) + if (msg->msg_control && msg->msg_controllen) + return ERR(ENOTSUP); + +- return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); ++ return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags); + } + + ssize_t rwrite(int socket, const void *buf, size_t count) +@@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) + struct rsocket *rs; + + rs = idm_at(&idm, socket); +- rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); +- return 0; ++ if (rs->type == SOCK_STREAM) { ++ rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); ++ return 0; ++ } else { ++ return getpeername(rs->fs, addr, addrlen); ++ } + } + + int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) +@@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) + struct rsocket *rs; + + rs = idm_at(&idm, socket); +- rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); +- return 0; ++ if (rs->type == SOCK_STREAM) { ++ rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); ++ return 0; ++ } else { ++ return getsockname(rs->fd, addr, addrlen); ++ } + } + + int rsetsockopt(int socket, int level, int optname, +@@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname, + + ret = ERR(ENOTSUP); + rs = idm_at(&idm, socket); ++ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { ++ ret = setsockopt(rs->fd, optname, optval, optlen); ++ if (ret) ++ return ret; ++ } ++ + switch (level) { + case SOL_SOCKET: + opts = &rs->so_opts; +@@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname, + int ret = 0; + + rs = idm_at(&idm, socket); ++ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) ++ return getsockopt(rs->fd, level, optname, optval, optlen); ++ + switch (level) { + case SOL_SOCKET: + switch (optname) { +@@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse + if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) + return ERR(EINVAL); + +- fastlock_acquire(&rs->iomap_lock); ++ fastlock_acquire(&rs->map_lock); + if (prot & PROT_WRITE) { + iomr = rs_get_iomap_mr(rs); + access |= IBV_ACCESS_REMOTE_WRITE; +@@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + } + out: +- fastlock_release(&rs->iomap_lock); ++ fastlock_release(&rs->map_lock); + return offset; + } + +@@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len) + int ret = 0; + + rs = idm_at(&idm, socket); +- fastlock_acquire(&rs->iomap_lock); ++ fastlock_acquire(&rs->map_lock); + + for (entry = rs->iomap_list.next; entry != &rs->iomap_list; + entry = entry->next) { +@@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len) + } + ret = ERR(EINVAL); + out: +- fastlock_release(&rs->iomap_lock); ++ fastlock_release(&rs->map_lock); + return ret; + } + +@@ -2475,3 +2783,24 @@ out: + + return (ret && left == count) ? ret : count - left; + } ++ ++ssize_t urecvfrom(int socket, void *buf, size_t len, int flags, ++ struct sockaddr *src_addr, socklen_t *addrlen) ++{ ++ int ret; ++ ++ ret = rrecv(socket, buf, len, flags); ++ if (ret > 0 && src_addr) ++ rgetpeername(socket, src_addr, addrlen); ++ ++ return ret; ++} ++ ++ssize_t usendto(int socket, const void *buf, size_t len, int flags, ++ const struct sockaddr *dest_addr, socklen_t addrlen) ++{ ++ if (dest_addr || addrlen) ++ return ERR(EISCONN); ++ ++ return usend(socket, buf, len, flags); ++} +diff --git a/src/usocket.c b/src/usocket.c +deleted file mode 100644 +index 87da990..0000000 +--- a/src/usocket.c ++++ /dev/null +@@ -1,2253 +0,0 @@ +-/* +- * Copyright (c) 2012 Intel Corporation. All rights reserved. +- * +- * This software is available to you under a choice of one of two +- * licenses. You may choose to be licensed under the terms of the GNU +- * General Public License (GPL) Version 2, available from the file +- * COPYING in the main directory of this source tree, or the +- * OpenIB.org BSD license below: +- * +- * Redistribution and use in source and binary forms, with or +- * without modification, are permitted provided that the following +- * conditions are met: +- * +- * - Redistributions of source code must retain the above +- * copyright notice, this list of conditions and the following +- * disclaimer. +- * +- * - Redistributions in binary form must reproduce the above +- * copyright notice, this list of conditions and the following +- * disclaimer in the documentation and/or other materials +- * provided with the distribution. +- * +- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +- * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +- * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +- * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +- * SOFTWARE. +- * +- */ +- +-#if HAVE_CONFIG_H +-# include +-#endif /* HAVE_CONFIG_H */ +- +-#include +-#include +-#include +-#include +-#include +-#include +-#include +-#include +-#include +-#include +- +-#include +-#include +-#include +-#include "cma.h" +-#include "indexer.h" +- +-//#define RS_SNDLOWAT 64 +-//#define RS_QP_MAX_SIZE 0xFFFE +-//#define RS_SGL_SIZE 2 +-//static struct index_map idm; +-//static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +- +-//static uint16_t def_inline = 64; +-//static uint16_t def_sqsize = 384; +-//static uint16_t def_rqsize = 384; +-//static uint32_t def_mem = (1 << 17); +-//static uint32_t def_wmem = (1 << 17); +-//static uint32_t polling_time = 10; +- +-//enum { +-// RS_OP_DATA, +-// RS_OP_RSVD_DATA_MORE, +-// RS_OP_WRITE, /* opcode is not transmitted over the network */ +-// RS_OP_RSVD_DRA_MORE, +-// RS_OP_SGL, +-// RS_OP_RSVD, +-// RS_OP_IOMAP_SGL, +-// RS_OP_CTRL +-//}; +-//#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) +-//#define rs_msg_op(imm_data) (imm_data >> 29) +-//#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) +- +-struct rs_msg { +- uint32_t op; +- uint32_t data; +-}; +- +-struct rs_sge { +- uint64_t addr; +- uint32_t key; +- uint32_t length; +-}; +- +-struct rs_iomap { +- uint64_t offset; +- struct rs_sge sge; +-}; +- +-struct rs_iomap_mr { +- uint64_t offset; +- struct ibv_mr *mr; +- dlist_entry entry; +- atomic_t refcnt; +- int index; /* -1 if mapping is local and not in iomap_list */ +-}; +- +-#define RS_MIN_INLINE (sizeof(struct rs_sge)) +-#define rs_host_is_net() (1 == htonl(1)) +-#define RS_CONN_FLAG_NET (1 << 0) +-#define RS_CONN_FLAG_IOMAP (1 << 1) +- +-struct rs_conn_data { +- uint8_t version; +- uint8_t flags; +- uint16_t credits; +- uint8_t reserved[3]; +- uint8_t target_iomap_size; +- struct rs_sge target_sgl; +- struct rs_sge data_buf; +-}; +- +-#define RS_RECV_WR_ID (~((uint64_t) 0)) +- +-/* +- * usocket states are ordered as passive, connecting, connected, disconnected. +- */ +-enum rs_state { +- rs_init, +- rs_bound = 0x0001, +- rs_listening = 0x0002, +- rs_opening = 0x0004, +- rs_resolving_addr = rs_opening | 0x0010, +- rs_resolving_route = rs_opening | 0x0020, +- rs_connecting = rs_opening | 0x0040, +- rs_accepting = rs_opening | 0x0080, +- rs_connected = 0x0100, +- rs_connect_wr = 0x0200, +- rs_connect_rd = 0x0400, +- rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr, +- rs_connect_error = 0x0800, +- rs_disconnected = 0x1000, +- rs_error = 0x2000, +-}; +- +-#define RS_OPT_SWAP_SGL 1 +- +-struct usocket { +- struct rdma_cm_id *cm_id; +- fastlock_t slock; +- fastlock_t rlock; +- fastlock_t cq_lock; +- fastlock_t cq_wait_lock; +- fastlock_t iomap_lock; +- +- int opts; +- long fd_flags; +- uint64_t so_opts; +- uint64_t tcp_opts; +- uint64_t ipv6_opts; +- int state; +- int cq_armed; +- int retries; +- int err; +- int index; +- int ctrl_avail; +- int sqe_avail; +- int sbuf_bytes_avail; +- uint16_t sseq_no; +- uint16_t sseq_comp; +- uint16_t sq_size; +- uint16_t sq_inline; +- +- uint16_t rq_size; +- uint16_t rseq_no; +- uint16_t rseq_comp; +- int rbuf_bytes_avail; +- int rbuf_free_offset; +- int rbuf_offset; +- int rmsg_head; +- int rmsg_tail; +- struct rs_msg *rmsg; +- +- int remote_sge; +- struct rs_sge remote_sgl; +- struct rs_sge remote_iomap; +- +- struct rs_iomap_mr *remote_iomappings; +- dlist_entry iomap_list; +- dlist_entry iomap_queue; +- int iomap_pending; +- +- struct ibv_mr *target_mr; +- int target_sge; +- int target_iomap_size; +- void *target_buffer_list; +- volatile struct rs_sge *target_sgl; +- struct rs_iomap *target_iomap; +- +- uint32_t rbuf_size; +- struct ibv_mr *rmr; +- uint8_t *rbuf; +- +- uint32_t sbuf_size; +- struct ibv_mr *smr; +- struct ibv_sge ssgl[2]; +- uint8_t *sbuf; +-}; +- +-static int rs_value_to_scale(int value, int bits) +-{ +- return value <= (1 << (bits - 1)) ? +- value : (1 << (bits - 1)) | (value >> bits); +-} +- +-static int rs_scale_to_value(int value, int bits) +-{ +- return value <= (1 << (bits - 1)) ? +- value : (value & ~(1 << (bits - 1))) << bits; +-} +- +-void rs_configure(void) +-{ +- FILE *f; +- static int init; +- +- if (init) +- return; +- +- pthread_mutex_lock(&mut); +- if (init) +- goto out; +- +- if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) { +- (void) fscanf(f, "%u", &polling_time); +- fclose(f); +- } +- +- if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) { +- (void) fscanf(f, "%hu", &def_inline); +- fclose(f); +- +- if (def_inline < RS_MIN_INLINE) +- def_inline = RS_MIN_INLINE; +- } +- +- if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) { +- (void) fscanf(f, "%hu", &def_sqsize); +- fclose(f); +- } +- +- if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) { +- (void) fscanf(f, "%hu", &def_rqsize); +- fclose(f); +- } +- +- if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) { +- (void) fscanf(f, "%u", &def_mem); +- fclose(f); +- +- if (def_mem < 1) +- def_mem = 1; +- } +- +- if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) { +- (void) fscanf(f, "%u", &def_wmem); +- fclose(f); +- if (def_wmem < RS_SNDLOWAT) +- def_wmem = RS_SNDLOWAT << 1; +- } +- +- if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) { +- (void) fscanf(f, "%hu", &def_iomap_size); +- fclose(f); +- +- /* round to supported values */ +- def_iomap_size = (uint8_t) rs_value_to_scale( +- (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8); +- } +- init = 1; +-out: +- pthread_mutex_unlock(&mut); +-} +- +-static int rs_insert(struct usocket *us) +-{ +- pthread_mutex_lock(&mut); +- us->index = idm_set(&idm, us->cm_id->channel->fd, us); +- pthread_mutex_unlock(&mut); +- return us->index; +-} +- +-static void rs_remove(struct usocket *us) +-{ +- pthread_mutex_lock(&mut); +- idm_clear(&idm, us->index); +- pthread_mutex_unlock(&mut); +-} +- +-static struct usocket *rs_alloc(struct usocket *inherited_rs) +-{ +- struct usocket *us; +- +- us = calloc(1, sizeof *us); +- if (!us) +- return NULL; +- +- us->index = -1; +- if (inherited_rs) { +- us->sbuf_size = inherited_rs->sbuf_size; +- us->rbuf_size = inherited_rs->rbuf_size; +- us->sq_inline = inherited_rs->sq_inline; +- us->sq_size = inherited_rs->sq_size; +- us->rq_size = inherited_rs->rq_size; +- us->ctrl_avail = inherited_rs->ctrl_avail; +- us->target_iomap_size = inherited_rs->target_iomap_size; +- } else { +- us->sbuf_size = def_wmem; +- us->rbuf_size = def_mem; +- us->sq_inline = def_inline; +- us->sq_size = def_sqsize; +- us->rq_size = def_rqsize; +- us->ctrl_avail = RS_QP_CTRL_SIZE; +- us->target_iomap_size = def_iomap_size; +- } +- fastlock_init(&us->slock); +- fastlock_init(&us->rlock); +- fastlock_init(&us->cq_lock); +- fastlock_init(&us->cq_wait_lock); +- fastlock_init(&us->iomap_lock); +- dlist_init(&us->iomap_list); +- dlist_init(&us->iomap_queue); +- return us; +-} +- +-static int rs_set_nonblocking(struct usocket *us, long arg) +-{ +- int ret = 0; +- +- if (us->cm_id->recv_cq_channel) +- ret = fcntl(us->cm_id->recv_cq_channel->fd, F_SETFL, arg); +- +- if (!ret && us->state < rs_connected) +- ret = fcntl(us->cm_id->channel->fd, F_SETFL, arg); +- +- return ret; +-} +- +-static void rs_set_qp_size(struct usocket *us) +-{ +- uint16_t max_size; +- +- max_size = min(ucma_max_qpsize(us->cm_id), RS_QP_MAX_SIZE); +- +- if (us->sq_size > max_size) +- us->sq_size = max_size; +- else if (us->sq_size < 2) +- us->sq_size = 2; +- if (us->sq_size <= (RS_QP_CTRL_SIZE << 2)) +- us->ctrl_avail = 1; +- +- if (us->rq_size > max_size) +- us->rq_size = max_size; +- else if (us->rq_size < 2) +- us->rq_size = 2; +-} +- +-static int rs_init_bufs(struct usocket *us) +-{ +- size_t len; +- +- us->rmsg = calloc(us->rq_size + 1, sizeof(*us->rmsg)); +- if (!us->rmsg) +- return -1; +- +- us->sbuf = calloc(us->sbuf_size, sizeof(*us->sbuf)); +- if (!us->sbuf) +- return -1; +- +- us->smr = rdma_reg_msgs(us->cm_id, us->sbuf, us->sbuf_size); +- if (!us->smr) +- return -1; +- +- len = sizeof(*us->target_sgl) * RS_SGL_SIZE + +- sizeof(*us->target_iomap) * us->target_iomap_size; +- us->target_buffer_list = malloc(len); +- if (!us->target_buffer_list) +- return -1; +- +- us->target_mr = rdma_reg_write(us->cm_id, us->target_buffer_list, len); +- if (!us->target_mr) +- return -1; +- +- memset(us->target_buffer_list, 0, len); +- us->target_sgl = us->target_buffer_list; +- if (us->target_iomap_size) +- us->target_iomap = (struct rs_iomap *) (us->target_sgl + RS_SGL_SIZE); +- +- us->rbuf = calloc(us->rbuf_size, sizeof(*us->rbuf)); +- if (!us->rbuf) +- return -1; +- +- us->rmr = rdma_reg_write(us->cm_id, us->rbuf, us->rbuf_size); +- if (!us->rmr) +- return -1; +- +- us->ssgl[0].addr = us->ssgl[1].addr = (uintptr_t) us->sbuf; +- us->sbuf_bytes_avail = us->sbuf_size; +- us->ssgl[0].lkey = us->ssgl[1].lkey = us->smr->lkey; +- +- us->rbuf_free_offset = us->rbuf_size >> 1; +- us->rbuf_bytes_avail = us->rbuf_size >> 1; +- us->sqe_avail = us->sq_size - us->ctrl_avail; +- us->rseq_comp = us->rq_size >> 1; +- return 0; +-} +- +-static int rs_create_cq(struct usocket *us) +-{ +- us->cm_id->recv_cq_channel = ibv_create_comp_channel(us->cm_id->verbs); +- if (!us->cm_id->recv_cq_channel) +- return -1; +- +- us->cm_id->recv_cq = ibv_create_cq(us->cm_id->verbs, us->sq_size + us->rq_size, +- us->cm_id, us->cm_id->recv_cq_channel, 0); +- if (!us->cm_id->recv_cq) +- goto err1; +- +- if (us->fd_flags & O_NONBLOCK) { +- if (rs_set_nonblocking(us, O_NONBLOCK)) +- goto err2; +- } +- +- us->cm_id->send_cq_channel = us->cm_id->recv_cq_channel; +- us->cm_id->send_cq = us->cm_id->recv_cq; +- return 0; +- +-err2: +- ibv_destroy_cq(us->cm_id->recv_cq); +- us->cm_id->recv_cq = NULL; +-err1: +- ibv_destroy_comp_channel(us->cm_id->recv_cq_channel); +- us->cm_id->recv_cq_channel = NULL; +- return -1; +-} +- +-static inline int +-rs_post_recv(struct usocket *us) +-{ +- struct ibv_recv_wr wr, *bad; +- +- wr.wr_id = RS_RECV_WR_ID; +- wr.next = NULL; +- wr.sg_list = NULL; +- wr.num_sge = 0; +- +- return rdma_seterrno(ibv_post_recv(us->cm_id->qp, &wr, &bad)); +-} +- +-static int rs_create_ep(struct usocket *us) +-{ +- struct ibv_qp_init_attr qp_attr; +- int i, ret; +- +- rs_set_qp_size(us); +- ret = rs_init_bufs(us); +- if (ret) +- return ret; +- +- ret = rs_create_cq(us); +- if (ret) +- return ret; +- +- memset(&qp_attr, 0, sizeof qp_attr); +- qp_attr.qp_context = us; +- qp_attr.send_cq = us->cm_id->send_cq; +- qp_attr.recv_cq = us->cm_id->recv_cq; +- qp_attr.qp_type = IBV_QPT_RC; +- qp_attr.sq_sig_all = 1; +- qp_attr.cap.max_send_wr = us->sq_size; +- qp_attr.cap.max_recv_wr = us->rq_size; +- qp_attr.cap.max_send_sge = 2; +- qp_attr.cap.max_recv_sge = 1; +- qp_attr.cap.max_inline_data = us->sq_inline; +- +- ret = rdma_create_qp(us->cm_id, NULL, &qp_attr); +- if (ret) +- return ret; +- +- for (i = 0; i < us->rq_size; i++) { +- ret = rs_post_recv(us); +- if (ret) +- return ret; +- } +- return 0; +-} +- +-static void rs_release_iomap_mr(struct rs_iomap_mr *iomr) +-{ +- if (atomic_dec(&iomr->refcnt)) +- return; +- +- dlist_remove(&iomr->entry); +- ibv_dereg_mr(iomr->mr); +- if (iomr->index >= 0) +- iomr->mr = NULL; +- else +- free(iomr); +-} +- +-static void rs_free_iomappings(struct usocket *us) +-{ +- struct rs_iomap_mr *iomr; +- +- while (!dlist_empty(&us->iomap_list)) { +- iomr = container_of(us->iomap_list.next, +- struct rs_iomap_mr, entry); +- riounmap(us->index, iomr->mr->addr, iomr->mr->length); +- } +- while (!dlist_empty(&us->iomap_queue)) { +- iomr = container_of(us->iomap_queue.next, +- struct rs_iomap_mr, entry); +- riounmap(us->index, iomr->mr->addr, iomr->mr->length); +- } +-} +- +-static void rs_free(struct usocket *us) +-{ +- if (us->index >= 0) +- rs_remove(us); +- +- if (us->rmsg) +- free(us->rmsg); +- +- if (us->sbuf) { +- if (us->smr) +- rdma_dereg_mr(us->smr); +- free(us->sbuf); +- } +- +- if (us->rbuf) { +- if (us->rmr) +- rdma_dereg_mr(us->rmr); +- free(us->rbuf); +- } +- +- if (us->target_buffer_list) { +- if (us->target_mr) +- rdma_dereg_mr(us->target_mr); +- free(us->target_buffer_list); +- } +- +- if (us->cm_id) { +- rs_free_iomappings(us); +- if (us->cm_id->qp) +- rdma_destroy_qp(us->cm_id); +- rdma_destroy_id(us->cm_id); +- } +- +- fastlock_destroy(&us->iomap_lock); +- fastlock_destroy(&us->cq_wait_lock); +- fastlock_destroy(&us->cq_lock); +- fastlock_destroy(&us->rlock); +- fastlock_destroy(&us->slock); +- free(us); +-} +- +-static void rs_set_conn_data(struct usocket *us, struct rdma_conn_param *param, +- struct rs_conn_data *conn) +-{ +- conn->version = 1; +- conn->flags = RS_CONN_FLAG_IOMAP | +- (rs_host_is_net() ? RS_CONN_FLAG_NET : 0); +- conn->credits = htons(us->rq_size); +- memset(conn->reserved, 0, sizeof conn->reserved); +- conn->target_iomap_size = (uint8_t) rs_value_to_scale(us->target_iomap_size, 8); +- +- conn->target_sgl.addr = htonll((uintptr_t) us->target_sgl); +- conn->target_sgl.length = htonl(RS_SGL_SIZE); +- conn->target_sgl.key = htonl(us->target_mr->rkey); +- +- conn->data_buf.addr = htonll((uintptr_t) us->rbuf); +- conn->data_buf.length = htonl(us->rbuf_size >> 1); +- conn->data_buf.key = htonl(us->rmr->rkey); +- +- param->private_data = conn; +- param->private_data_len = sizeof *conn; +-} +- +-static void rs_save_conn_data(struct usocket *us, struct rs_conn_data *conn) +-{ +- us->remote_sgl.addr = ntohll(conn->target_sgl.addr); +- us->remote_sgl.length = ntohl(conn->target_sgl.length); +- us->remote_sgl.key = ntohl(conn->target_sgl.key); +- us->remote_sge = 1; +- if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) || +- (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET))) +- us->opts = RS_OPT_SWAP_SGL; +- +- if (conn->flags & RS_CONN_FLAG_IOMAP) { +- us->remote_iomap.addr = us->remote_sgl.addr + +- sizeof(us->remote_sgl) * us->remote_sgl.length; +- us->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8); +- us->remote_iomap.key = us->remote_sgl.key; +- } +- +- us->target_sgl[0].addr = ntohll(conn->data_buf.addr); +- us->target_sgl[0].length = ntohl(conn->data_buf.length); +- us->target_sgl[0].key = ntohl(conn->data_buf.key); +- +- us->sseq_comp = ntohs(conn->credits); +-} +- +-int usocket(int domain, int type, int protocol) +-{ +- struct usocket *us; +- int ret; +- +- if ((domain != PF_INET && domain != PF_INET6) || +- (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) +- return ERR(ENOTSUP); +- +- rs_configure(); +- us = rs_alloc(NULL); +- if (!us) +- return ERR(ENOMEM); +- +- ret = rdma_create_id(NULL, &us->cm_id, us, RDMA_PS_TCP); +- if (ret) +- goto err; +- +- ret = rs_insert(us); +- if (ret < 0) +- goto err; +- +- us->cm_id->route.addr.src_addr.sa_family = domain; +- return us->index; +- +-err: +- rs_free(us); +- return ret; +-} +- +-int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) +-{ +- struct usocket *us; +- int ret; +- +- us = idm_at(&idm, socket); +- ret = rdma_bind_addr(us->cm_id, (struct sockaddr *) addr); +- if (!ret) +- us->state = rs_bound; +- return ret; +-} +- +-int rlisten(int socket, int backlog) +-{ +- struct usocket *us; +- int ret; +- +- us = idm_at(&idm, socket); +- ret = rdma_listen(us->cm_id, backlog); +- if (!ret) +- us->state = rs_listening; +- return ret; +-} +- +-/* +- * Nonblocking is usually not inherited between sockets, but we need to +- * inherit it here to establish the connection only. This is needed to +- * prevent rdma_accept from blocking until the remote side finishes +- * establishing the connection. If we were to allow rdma_accept to block, +- * then a single thread cannot establish a connection with itself, or +- * two threads which try to connect to each other can deadlock trying to +- * form a connection. +- * +- * Data transfers on the new socket remain blocking unless the user +- * specifies otherwise through rfcntl. +- */ +-int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) +-{ +- struct usocket *us, *new_rs; +- struct rdma_conn_param param; +- struct rs_conn_data *creq, cresp; +- int ret; +- +- us = idm_at(&idm, socket); +- new_rs = rs_alloc(us); +- if (!new_rs) +- return ERR(ENOMEM); +- +- ret = rdma_get_request(us->cm_id, &new_rs->cm_id); +- if (ret) +- goto err; +- +- ret = rs_insert(new_rs); +- if (ret < 0) +- goto err; +- +- creq = (struct rs_conn_data *) new_rs->cm_id->event->param.conn.private_data; +- if (creq->version != 1) { +- ret = ERR(ENOTSUP); +- goto err; +- } +- +- if (us->fd_flags & O_NONBLOCK) +- rs_set_nonblocking(new_rs, O_NONBLOCK); +- +- ret = rs_create_ep(new_rs); +- if (ret) +- goto err; +- +- rs_save_conn_data(new_rs, creq); +- param = new_rs->cm_id->event->param.conn; +- rs_set_conn_data(new_rs, ¶m, &cresp); +- ret = rdma_accept(new_rs->cm_id, ¶m); +- if (!ret) +- new_rs->state = rs_connect_rdwr; +- else if (errno == EAGAIN || errno == EWOULDBLOCK) +- new_rs->state = rs_accepting; +- else +- goto err; +- +- if (addr && addrlen) +- rgetpeername(new_rs->index, addr, addrlen); +- return new_rs->index; +- +-err: +- rs_free(new_rs); +- return ret; +-} +- +-static int rs_do_connect(struct usocket *us) +-{ +- struct rdma_conn_param param; +- struct rs_conn_data creq, *cresp; +- int to, ret; +- +- switch (us->state) { +- case rs_init: +- case rs_bound: +-resolve_addr: +- to = 1000 << us->retries++; +- ret = rdma_resolve_addr(us->cm_id, NULL, +- &us->cm_id->route.addr.dst_addr, to); +- if (!ret) +- goto resolve_route; +- if (errno == EAGAIN || errno == EWOULDBLOCK) +- us->state = rs_resolving_addr; +- break; +- case rs_resolving_addr: +- ret = ucma_complete(us->cm_id); +- if (ret) { +- if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES) +- goto resolve_addr; +- break; +- } +- +- us->retries = 0; +-resolve_route: +- to = 1000 << us->retries++; +- ret = rdma_resolve_route(us->cm_id, to); +- if (!ret) +- goto do_connect; +- if (errno == EAGAIN || errno == EWOULDBLOCK) +- us->state = rs_resolving_route; +- break; +- case rs_resolving_route: +- ret = ucma_complete(us->cm_id); +- if (ret) { +- if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES) +- goto resolve_route; +- break; +- } +-do_connect: +- ret = rs_create_ep(us); +- if (ret) +- break; +- +- memset(¶m, 0, sizeof param); +- rs_set_conn_data(us, ¶m, &creq); +- param.flow_control = 1; +- param.retry_count = 7; +- param.rnr_retry_count = 7; +- us->retries = 0; +- +- ret = rdma_connect(us->cm_id, ¶m); +- if (!ret) +- goto connected; +- if (errno == EAGAIN || errno == EWOULDBLOCK) +- us->state = rs_connecting; +- break; +- case rs_connecting: +- ret = ucma_complete(us->cm_id); +- if (ret) +- break; +-connected: +- cresp = (struct rs_conn_data *) us->cm_id->event->param.conn.private_data; +- if (cresp->version != 1) { +- ret = ERR(ENOTSUP); +- break; +- } +- +- rs_save_conn_data(us, cresp); +- us->state = rs_connect_rdwr; +- break; +- case rs_accepting: +- if (!(us->fd_flags & O_NONBLOCK)) +- rs_set_nonblocking(us, 0); +- +- ret = ucma_complete(us->cm_id); +- if (ret) +- break; +- +- us->state = rs_connect_rdwr; +- break; +- default: +- ret = ERR(EINVAL); +- break; +- } +- +- if (ret) { +- if (errno == EAGAIN || errno == EWOULDBLOCK) { +- errno = EINPROGRESS; +- } else { +- us->state = rs_connect_error; +- us->err = errno; +- } +- } +- return ret; +-} +- +-int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) +-{ +- struct usocket *us; +- +- us = idm_at(&idm, socket); +- memcpy(&us->cm_id->route.addr.dst_addr, addr, addrlen); +- return rs_do_connect(us); +-} +- +-static int rs_post_write_msg(struct usocket *us, +- struct ibv_sge *sgl, int nsge, +- uint32_t imm_data, int flags, +- uint64_t addr, uint32_t rkey) +-{ +- struct ibv_send_wr wr, *bad; +- +- wr.wr_id = (uint64_t) imm_data; +- wr.next = NULL; +- wr.sg_list = sgl; +- wr.num_sge = nsge; +- wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; +- wr.send_flags = flags; +- wr.imm_data = htonl(imm_data); +- wr.wr.rdma.remote_addr = addr; +- wr.wr.rdma.rkey = rkey; +- +- return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad)); +-} +- +-static int rs_post_write(struct usocket *us, +- struct ibv_sge *sgl, int nsge, +- uint64_t wr_id, int flags, +- uint64_t addr, uint32_t rkey) +-{ +- struct ibv_send_wr wr, *bad; +- +- wr.wr_id = wr_id; +- wr.next = NULL; +- wr.sg_list = sgl; +- wr.num_sge = nsge; +- wr.opcode = IBV_WR_RDMA_WRITE; +- wr.send_flags = flags; +- wr.wr.rdma.remote_addr = addr; +- wr.wr.rdma.rkey = rkey; +- +- return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad)); +-} +- +-/* +- * Update target SGE before sending data. Otherwise the remote side may +- * update the entry before we do. +- */ +-static int rs_write_data(struct usocket *us, +- struct ibv_sge *sgl, int nsge, +- uint32_t length, int flags) +-{ +- uint64_t addr; +- uint32_t rkey; +- +- us->sseq_no++; +- us->sqe_avail--; +- us->sbuf_bytes_avail -= length; +- +- addr = us->target_sgl[us->target_sge].addr; +- rkey = us->target_sgl[us->target_sge].key; +- +- us->target_sgl[us->target_sge].addr += length; +- us->target_sgl[us->target_sge].length -= length; +- +- if (!us->target_sgl[us->target_sge].length) { +- if (++us->target_sge == RS_SGL_SIZE) +- us->target_sge = 0; +- } +- +- return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_DATA, length), +- flags, addr, rkey); +-} +- +-static int rs_write_direct(struct usocket *us, struct rs_iomap *iom, uint64_t offset, +- struct ibv_sge *sgl, int nsge, uint32_t length, int flags) +-{ +- uint64_t addr; +- +- us->sqe_avail--; +- us->sbuf_bytes_avail -= length; +- +- addr = iom->sge.addr + offset - iom->offset; +- return rs_post_write(us, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), +- flags, addr, iom->sge.key); +-} +- +-static int rs_write_iomap(struct usocket *us, struct rs_iomap_mr *iomr, +- struct ibv_sge *sgl, int nsge, int flags) +-{ +- uint64_t addr; +- +- us->sseq_no++; +- us->sqe_avail--; +- us->sbuf_bytes_avail -= sizeof(struct rs_iomap); +- +- addr = us->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); +- return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), +- flags, addr, us->remote_iomap.key); +-} +- +-static uint32_t rs_sbuf_left(struct usocket *us) +-{ +- return (uint32_t) (((uint64_t) (uintptr_t) &us->sbuf[us->sbuf_size]) - +- us->ssgl[0].addr); +-} +- +-static void rs_send_credits(struct usocket *us) +-{ +- struct ibv_sge ibsge; +- struct rs_sge sge; +- +- us->ctrl_avail--; +- us->rseq_comp = us->rseq_no + (us->rq_size >> 1); +- if (us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) { +- if (!(us->opts & RS_OPT_SWAP_SGL)) { +- sge.addr = (uintptr_t) &us->rbuf[us->rbuf_free_offset]; +- sge.key = us->rmr->rkey; +- sge.length = us->rbuf_size >> 1; +- } else { +- sge.addr = bswap_64((uintptr_t) &us->rbuf[us->rbuf_free_offset]); +- sge.key = bswap_32(us->rmr->rkey); +- sge.length = bswap_32(us->rbuf_size >> 1); +- } +- +- ibsge.addr = (uintptr_t) &sge; +- ibsge.lkey = 0; +- ibsge.length = sizeof(sge); +- +- rs_post_write_msg(us, &ibsge, 1, +- rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size), +- IBV_SEND_INLINE, +- us->remote_sgl.addr + +- us->remote_sge * sizeof(struct rs_sge), +- us->remote_sgl.key); +- +- us->rbuf_bytes_avail -= us->rbuf_size >> 1; +- us->rbuf_free_offset += us->rbuf_size >> 1; +- if (us->rbuf_free_offset >= us->rbuf_size) +- us->rbuf_free_offset = 0; +- if (++us->remote_sge == us->remote_sgl.length) +- us->remote_sge = 0; +- } else { +- rs_post_write_msg(us, NULL, 0, +- rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size), +- 0, 0, 0); +- } +-} +- +-static int rs_give_credits(struct usocket *us) +-{ +- return ((us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) || +- ((short) ((short) us->rseq_no - (short) us->rseq_comp) >= 0)) && +- us->ctrl_avail && (us->state & rs_connected); +-} +- +-static void rs_update_credits(struct usocket *us) +-{ +- if (rs_give_credits(us)) +- rs_send_credits(us); +-} +- +-static int rs_poll_cq(struct usocket *us) +-{ +- struct ibv_wc wc; +- uint32_t imm_data; +- int ret, rcnt = 0; +- +- while ((ret = ibv_poll_cq(us->cm_id->recv_cq, 1, &wc)) > 0) { +- if (wc.wr_id == RS_RECV_WR_ID) { +- if (wc.status != IBV_WC_SUCCESS) +- continue; +- rcnt++; +- +- imm_data = ntohl(wc.imm_data); +- switch (rs_msg_op(imm_data)) { +- case RS_OP_SGL: +- us->sseq_comp = (uint16_t) rs_msg_data(imm_data); +- break; +- case RS_OP_IOMAP_SGL: +- /* The iomap was updated, that's nice to know. */ +- break; +- case RS_OP_CTRL: +- if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { +- us->state = rs_disconnected; +- return 0; +- } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { +- us->state &= ~rs_connect_rd; +- } +- break; +- case RS_OP_WRITE: +- /* We really shouldn't be here. */ +- break; +- default: +- us->rmsg[us->rmsg_tail].op = rs_msg_op(imm_data); +- us->rmsg[us->rmsg_tail].data = rs_msg_data(imm_data); +- if (++us->rmsg_tail == us->rq_size + 1) +- us->rmsg_tail = 0; +- break; +- } +- } else { +- switch (rs_msg_op((uint32_t) wc.wr_id)) { +- case RS_OP_SGL: +- us->ctrl_avail++; +- break; +- case RS_OP_CTRL: +- us->ctrl_avail++; +- if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) +- us->state = rs_disconnected; +- break; +- case RS_OP_IOMAP_SGL: +- us->sqe_avail++; +- us->sbuf_bytes_avail += sizeof(struct rs_iomap); +- break; +- default: +- us->sqe_avail++; +- us->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); +- break; +- } +- if (wc.status != IBV_WC_SUCCESS && (us->state & rs_connected)) { +- us->state = rs_error; +- us->err = EIO; +- } +- } +- } +- +- if (us->state & rs_connected) { +- while (!ret && rcnt--) +- ret = rs_post_recv(us); +- +- if (ret) { +- us->state = rs_error; +- us->err = errno; +- } +- } +- return ret; +-} +- +-static int rs_get_cq_event(struct usocket *us) +-{ +- struct ibv_cq *cq; +- void *context; +- int ret; +- +- if (!us->cq_armed) +- return 0; +- +- ret = ibv_get_cq_event(us->cm_id->recv_cq_channel, &cq, &context); +- if (!ret) { +- ibv_ack_cq_events(us->cm_id->recv_cq, 1); +- us->cq_armed = 0; +- } else if (errno != EAGAIN) { +- us->state = rs_error; +- } +- +- return ret; +-} +- +-/* +- * Although we serialize rsend and rrecv calls with respect to themselves, +- * both calls may run simultaneously and need to poll the CQ for completions. +- * We need to serialize access to the CQ, but rsend and rrecv need to +- * allow each other to make forward progress. +- * +- * For example, rsend may need to wait for credits from the remote side, +- * which could be stalled until the remote process calls rrecv. This should +- * not block rrecv from receiving data from the remote side however. +- * +- * We handle this by using two locks. The cq_lock protects against polling +- * the CQ and processing completions. The cq_wait_lock serializes access to +- * waiting on the CQ. +- */ +-static int rs_process_cq(struct usocket *us, int nonblock, int (*test)(struct usocket *us)) +-{ +- int ret; +- +- fastlock_acquire(&us->cq_lock); +- do { +- rs_update_credits(us); +- ret = rs_poll_cq(us); +- if (test(us)) { +- ret = 0; +- break; +- } else if (ret) { +- break; +- } else if (nonblock) { +- ret = ERR(EWOULDBLOCK); +- } else if (!us->cq_armed) { +- ibv_req_notify_cq(us->cm_id->recv_cq, 0); +- us->cq_armed = 1; +- } else { +- rs_update_credits(us); +- fastlock_acquire(&us->cq_wait_lock); +- fastlock_release(&us->cq_lock); +- +- ret = rs_get_cq_event(us); +- fastlock_release(&us->cq_wait_lock); +- fastlock_acquire(&us->cq_lock); +- } +- } while (!ret); +- +- rs_update_credits(us); +- fastlock_release(&us->cq_lock); +- return ret; +-} +- +-static int rs_get_comp(struct usocket *us, int nonblock, int (*test)(struct usocket *us)) +-{ +- struct timeval s, e; +- uint32_t poll_time = 0; +- int ret; +- +- do { +- ret = rs_process_cq(us, 1, test); +- if (!ret || nonblock || errno != EWOULDBLOCK) +- return ret; +- +- if (!poll_time) +- gettimeofday(&s, NULL); +- +- gettimeofday(&e, NULL); +- poll_time = (e.tv_sec - s.tv_sec) * 1000000 + +- (e.tv_usec - s.tv_usec) + 1; +- } while (poll_time <= polling_time); +- +- ret = rs_process_cq(us, 0, test); +- return ret; +-} +- +-static int rs_nonblocking(struct usocket *us, int flags) +-{ +- return (us->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); +-} +- +-static int rs_is_cq_armed(struct usocket *us) +-{ +- return us->cq_armed; +-} +- +-static int rs_poll_all(struct usocket *us) +-{ +- return 1; +-} +- +-/* +- * We use hardware flow control to prevent over running the remote +- * receive queue. However, data transfers still require space in +- * the remote rmsg queue, or we risk losing notification that data +- * has been transfered. +- * +- * Be careful with race conditions in the check below. The target SGL +- * may be updated by a remote RDMA write. +- */ +-static int rs_can_send(struct usocket *us) +-{ +- return us->sqe_avail && (us->sbuf_bytes_avail >= RS_SNDLOWAT) && +- (us->sseq_no != us->sseq_comp) && +- (us->target_sgl[us->target_sge].length != 0); +-} +- +-static int rs_conn_can_send(struct usocket *us) +-{ +- return rs_can_send(us) || !(us->state & rs_connect_wr); +-} +- +-static int rs_conn_can_send_ctrl(struct usocket *us) +-{ +- return us->ctrl_avail || !(us->state & rs_connected); +-} +- +-static int rs_have_rdata(struct usocket *us) +-{ +- return (us->rmsg_head != us->rmsg_tail); +-} +- +-static int rs_conn_have_rdata(struct usocket *us) +-{ +- return rs_have_rdata(us) || !(us->state & rs_connect_rd); +-} +- +-static int rs_conn_all_sends_done(struct usocket *us) +-{ +- return ((us->sqe_avail + us->ctrl_avail) == us->sq_size) || +- !(us->state & rs_connected); +-} +- +-static ssize_t rs_peek(struct usocket *us, void *buf, size_t len) +-{ +- size_t left = len; +- uint32_t end_size, rsize; +- int rmsg_head, rbuf_offset; +- +- rmsg_head = us->rmsg_head; +- rbuf_offset = us->rbuf_offset; +- +- for (; left && (rmsg_head != us->rmsg_tail); left -= rsize) { +- if (left < us->rmsg[rmsg_head].data) { +- rsize = left; +- } else { +- rsize = us->rmsg[rmsg_head].data; +- if (++rmsg_head == us->rq_size + 1) +- rmsg_head = 0; +- } +- +- end_size = us->rbuf_size - rbuf_offset; +- if (rsize > end_size) { +- memcpy(buf, &us->rbuf[rbuf_offset], end_size); +- rbuf_offset = 0; +- buf += end_size; +- rsize -= end_size; +- left -= end_size; +- } +- memcpy(buf, &us->rbuf[rbuf_offset], rsize); +- rbuf_offset += rsize; +- buf += rsize; +- } +- +- return len - left; +-} +- +-/* +- * Continue to receive any queued data even if the remote side has disconnected. +- */ +-ssize_t rrecv(int socket, void *buf, size_t len, int flags) +-{ +- struct usocket *us; +- size_t left = len; +- uint32_t end_size, rsize; +- int ret; +- +- us = idm_at(&idm, socket); +- if (us->state & rs_opening) { +- ret = rs_do_connect(us); +- if (ret) { +- if (errno == EINPROGRESS) +- errno = EAGAIN; +- return ret; +- } +- } +- fastlock_acquire(&us->rlock); +- do { +- if (!rs_have_rdata(us)) { +- ret = rs_get_comp(us, rs_nonblocking(us, flags), +- rs_conn_have_rdata); +- if (ret) +- break; +- } +- +- ret = 0; +- if (flags & MSG_PEEK) { +- left = len - rs_peek(us, buf, left); +- break; +- } +- +- for (; left && rs_have_rdata(us); left -= rsize) { +- if (left < us->rmsg[us->rmsg_head].data) { +- rsize = left; +- us->rmsg[us->rmsg_head].data -= left; +- } else { +- us->rseq_no++; +- rsize = us->rmsg[us->rmsg_head].data; +- if (++us->rmsg_head == us->rq_size + 1) +- us->rmsg_head = 0; +- } +- +- end_size = us->rbuf_size - us->rbuf_offset; +- if (rsize > end_size) { +- memcpy(buf, &us->rbuf[us->rbuf_offset], end_size); +- us->rbuf_offset = 0; +- buf += end_size; +- rsize -= end_size; +- left -= end_size; +- us->rbuf_bytes_avail += end_size; +- } +- memcpy(buf, &us->rbuf[us->rbuf_offset], rsize); +- us->rbuf_offset += rsize; +- buf += rsize; +- us->rbuf_bytes_avail += rsize; +- } +- +- } while (left && (flags & MSG_WAITALL) && (us->state & rs_connect_rd)); +- +- fastlock_release(&us->rlock); +- return ret ? ret : len - left; +-} +- +-ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, +- struct sockaddr *src_addr, socklen_t *addrlen) +-{ +- int ret; +- +- ret = rrecv(socket, buf, len, flags); +- if (ret > 0 && src_addr) +- rgetpeername(socket, src_addr, addrlen); +- +- return ret; +-} +- +-/* +- * Simple, straightforward implementation for now that only tries to fill +- * in the first vector. +- */ +-static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags) +-{ +- return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags); +-} +- +-ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags) +-{ +- if (msg->msg_control && msg->msg_controllen) +- return ERR(ENOTSUP); +- +- return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); +-} +- +-ssize_t rread(int socket, void *buf, size_t count) +-{ +- return rrecv(socket, buf, count, 0); +-} +- +-ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt) +-{ +- return rrecvv(socket, iov, iovcnt, 0); +-} +- +-static int rs_send_iomaps(struct usocket *us, int flags) +-{ +- struct rs_iomap_mr *iomr; +- struct ibv_sge sge; +- struct rs_iomap iom; +- int ret; +- +- fastlock_acquire(&us->iomap_lock); +- while (!dlist_empty(&us->iomap_queue)) { +- if (!rs_can_send(us)) { +- ret = rs_get_comp(us, rs_nonblocking(us, flags), +- rs_conn_can_send); +- if (ret) +- break; +- if (!(us->state & rs_connect_wr)) { +- ret = ERR(ECONNRESET); +- break; +- } +- } +- +- iomr = container_of(us->iomap_queue.next, struct rs_iomap_mr, entry); +- if (!(us->opts & RS_OPT_SWAP_SGL)) { +- iom.offset = iomr->offset; +- iom.sge.addr = (uintptr_t) iomr->mr->addr; +- iom.sge.length = iomr->mr->length; +- iom.sge.key = iomr->mr->rkey; +- } else { +- iom.offset = bswap_64(iomr->offset); +- iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr); +- iom.sge.length = bswap_32(iomr->mr->length); +- iom.sge.key = bswap_32(iomr->mr->rkey); +- } +- +- if (us->sq_inline >= sizeof iom) { +- sge.addr = (uintptr_t) &iom; +- sge.length = sizeof iom; +- sge.lkey = 0; +- ret = rs_write_iomap(us, iomr, &sge, 1, IBV_SEND_INLINE); +- } else if (rs_sbuf_left(us) >= sizeof iom) { +- memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom, sizeof iom); +- us->ssgl[0].length = sizeof iom; +- ret = rs_write_iomap(us, iomr, us->ssgl, 1, 0); +- if (rs_sbuf_left(us) > sizeof iom) +- us->ssgl[0].addr += sizeof iom; +- else +- us->ssgl[0].addr = (uintptr_t) us->sbuf; +- } else { +- us->ssgl[0].length = rs_sbuf_left(us); +- memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom, +- us->ssgl[0].length); +- us->ssgl[1].length = sizeof iom - us->ssgl[0].length; +- memcpy(us->sbuf, ((void *) &iom) + us->ssgl[0].length, +- us->ssgl[1].length); +- ret = rs_write_iomap(us, iomr, us->ssgl, 2, 0); +- us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; +- } +- dlist_remove(&iomr->entry); +- dlist_insert_tail(&iomr->entry, &us->iomap_list); +- if (ret) +- break; +- } +- +- us->iomap_pending = !dlist_empty(&us->iomap_queue); +- fastlock_release(&us->iomap_lock); +- return ret; +-} +- +-/* +- * We overlap sending the data, by posting a small work request immediately, +- * then increasing the size of the send on each iteration. +- */ +-ssize_t rsend(int socket, const void *buf, size_t len, int flags) +-{ +- struct usocket *us; +- struct ibv_sge sge; +- size_t left = len; +- uint32_t xfer_size, olen = RS_OLAP_START_SIZE; +- int ret = 0; +- +- us = idm_at(&idm, socket); +- if (us->state & rs_opening) { +- ret = rs_do_connect(us); +- if (ret) { +- if (errno == EINPROGRESS) +- errno = EAGAIN; +- return ret; +- } +- } +- +- fastlock_acquire(&us->slock); +- if (us->iomap_pending) { +- ret = rs_send_iomaps(us, flags); +- if (ret) +- goto out; +- } +- for (; left; left -= xfer_size, buf += xfer_size) { +- if (!rs_can_send(us)) { +- ret = rs_get_comp(us, rs_nonblocking(us, flags), +- rs_conn_can_send); +- if (ret) +- break; +- if (!(us->state & rs_connect_wr)) { +- ret = ERR(ECONNRESET); +- break; +- } +- } +- +- if (olen < left) { +- xfer_size = olen; +- if (olen < RS_MAX_TRANSFER) +- olen <<= 1; +- } else { +- xfer_size = left; +- } +- +- if (xfer_size > us->sbuf_bytes_avail) +- xfer_size = us->sbuf_bytes_avail; +- if (xfer_size > us->target_sgl[us->target_sge].length) +- xfer_size = us->target_sgl[us->target_sge].length; +- +- if (xfer_size <= us->sq_inline) { +- sge.addr = (uintptr_t) buf; +- sge.length = xfer_size; +- sge.lkey = 0; +- ret = rs_write_data(us, &sge, 1, xfer_size, IBV_SEND_INLINE); +- } else if (xfer_size <= rs_sbuf_left(us)) { +- memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf, xfer_size); +- us->ssgl[0].length = xfer_size; +- ret = rs_write_data(us, us->ssgl, 1, xfer_size, 0); +- if (xfer_size < rs_sbuf_left(us)) +- us->ssgl[0].addr += xfer_size; +- else +- us->ssgl[0].addr = (uintptr_t) us->sbuf; +- } else { +- us->ssgl[0].length = rs_sbuf_left(us); +- memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf, +- us->ssgl[0].length); +- us->ssgl[1].length = xfer_size - us->ssgl[0].length; +- memcpy(us->sbuf, buf + us->ssgl[0].length, us->ssgl[1].length); +- ret = rs_write_data(us, us->ssgl, 2, xfer_size, 0); +- us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; +- } +- if (ret) +- break; +- } +-out: +- fastlock_release(&us->slock); +- +- return (ret && left == len) ? ret : len - left; +-} +- +-ssize_t rsendto(int socket, const void *buf, size_t len, int flags, +- const struct sockaddr *dest_addr, socklen_t addrlen) +-{ +- if (dest_addr || addrlen) +- return ERR(EISCONN); +- +- return rsend(socket, buf, len, flags); +-} +- +-static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) +-{ +- size_t size; +- +- while (len) { +- size = (*iov)->iov_len - *offset; +- if (size > len) { +- memcpy (dst, (*iov)->iov_base + *offset, len); +- *offset += len; +- break; +- } +- +- memcpy(dst, (*iov)->iov_base + *offset, size); +- len -= size; +- dst += size; +- (*iov)++; +- *offset = 0; +- } +-} +- +-static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags) +-{ +- struct usocket *us; +- const struct iovec *cur_iov; +- size_t left, len, offset = 0; +- uint32_t xfer_size, olen = RS_OLAP_START_SIZE; +- int i, ret = 0; +- +- us = idm_at(&idm, socket); +- if (us->state & rs_opening) { +- ret = rs_do_connect(us); +- if (ret) { +- if (errno == EINPROGRESS) +- errno = EAGAIN; +- return ret; +- } +- } +- +- cur_iov = iov; +- len = iov[0].iov_len; +- for (i = 1; i < iovcnt; i++) +- len += iov[i].iov_len; +- left = len; +- +- fastlock_acquire(&us->slock); +- if (us->iomap_pending) { +- ret = rs_send_iomaps(us, flags); +- if (ret) +- goto out; +- } +- for (; left; left -= xfer_size) { +- if (!rs_can_send(us)) { +- ret = rs_get_comp(us, rs_nonblocking(us, flags), +- rs_conn_can_send); +- if (ret) +- break; +- if (!(us->state & rs_connect_wr)) { +- ret = ERR(ECONNRESET); +- break; +- } +- } +- +- if (olen < left) { +- xfer_size = olen; +- if (olen < RS_MAX_TRANSFER) +- olen <<= 1; +- } else { +- xfer_size = left; +- } +- +- if (xfer_size > us->sbuf_bytes_avail) +- xfer_size = us->sbuf_bytes_avail; +- if (xfer_size > us->target_sgl[us->target_sge].length) +- xfer_size = us->target_sgl[us->target_sge].length; +- +- if (xfer_size <= rs_sbuf_left(us)) { +- rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr, +- &cur_iov, &offset, xfer_size); +- us->ssgl[0].length = xfer_size; +- ret = rs_write_data(us, us->ssgl, 1, xfer_size, +- xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0); +- if (xfer_size < rs_sbuf_left(us)) +- us->ssgl[0].addr += xfer_size; +- else +- us->ssgl[0].addr = (uintptr_t) us->sbuf; +- } else { +- us->ssgl[0].length = rs_sbuf_left(us); +- rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr, &cur_iov, +- &offset, us->ssgl[0].length); +- us->ssgl[1].length = xfer_size - us->ssgl[0].length; +- rs_copy_iov(us->sbuf, &cur_iov, &offset, us->ssgl[1].length); +- ret = rs_write_data(us, us->ssgl, 2, xfer_size, +- xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0); +- us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length; +- } +- if (ret) +- break; +- } +-out: +- fastlock_release(&us->slock); +- +- return (ret && left == len) ? ret : len - left; +-} +- +-ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) +-{ +- if (msg->msg_control && msg->msg_controllen) +- return ERR(ENOTSUP); +- +- return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); +-} +- +-ssize_t rwrite(int socket, const void *buf, size_t count) +-{ +- return rsend(socket, buf, count, 0); +-} +- +-ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt) +-{ +- return rsendv(socket, iov, iovcnt, 0); +-} +- +-static struct pollfd *rs_fds_alloc(nfds_t nfds) +-{ +- static __thread struct pollfd *rfds; +- static __thread nfds_t rnfds; +- +- if (nfds > rnfds) { +- if (rfds) +- free(rfds); +- +- rfds = malloc(sizeof *rfds * nfds); +- rnfds = rfds ? nfds : 0; +- } +- +- return rfds; +-} +- +-static int rs_poll_rs(struct usocket *us, int events, +- int nonblock, int (*test)(struct usocket *us)) +-{ +- struct pollfd fds; +- short revents; +- int ret; +- +-check_cq: +- if ((us->state & rs_connected) || (us->state == rs_disconnected) || +- (us->state & rs_error)) { +- rs_process_cq(us, nonblock, test); +- +- revents = 0; +- if ((events & POLLIN) && rs_conn_have_rdata(us)) +- revents |= POLLIN; +- if ((events & POLLOUT) && rs_can_send(us)) +- revents |= POLLOUT; +- if (!(us->state & rs_connected)) { +- if (us->state == rs_disconnected) +- revents |= POLLHUP; +- else +- revents |= POLLERR; +- } +- +- return revents; +- } +- +- if (us->state == rs_listening) { +- fds.fd = us->cm_id->channel->fd; +- fds.events = events; +- fds.revents = 0; +- poll(&fds, 1, 0); +- return fds.revents; +- } +- +- if (us->state & rs_opening) { +- ret = rs_do_connect(us); +- if (ret) { +- if (errno == EINPROGRESS) { +- errno = 0; +- return 0; +- } else { +- return POLLOUT; +- } +- } +- goto check_cq; +- } +- +- if (us->state == rs_connect_error) +- return (us->err && events & POLLOUT) ? POLLOUT : 0; +- +- return 0; +-} +- +-static int rs_poll_check(struct pollfd *fds, nfds_t nfds) +-{ +- struct usocket *us; +- int i, cnt = 0; +- +- for (i = 0; i < nfds; i++) { +- us = idm_lookup(&idm, fds[i].fd); +- if (us) +- fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all); +- else +- poll(&fds[i], 1, 0); +- +- if (fds[i].revents) +- cnt++; +- } +- return cnt; +-} +- +-static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +-{ +- struct usocket *us; +- int i; +- +- for (i = 0; i < nfds; i++) { +- us = idm_lookup(&idm, fds[i].fd); +- if (us) { +- fds[i].revents = rs_poll_rs(us, fds[i].events, 0, rs_is_cq_armed); +- if (fds[i].revents) +- return 1; +- +- if (us->state >= rs_connected) +- rfds[i].fd = us->cm_id->recv_cq_channel->fd; +- else +- rfds[i].fd = us->cm_id->channel->fd; +- +- rfds[i].events = POLLIN; +- } else { +- rfds[i].fd = fds[i].fd; +- rfds[i].events = fds[i].events; +- } +- rfds[i].revents = 0; +- +- } +- return 0; +-} +- +-static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) +-{ +- struct usocket *us; +- int i, cnt = 0; +- +- for (i = 0; i < nfds; i++) { +- if (!rfds[i].revents) +- continue; +- +- us = idm_lookup(&idm, fds[i].fd); +- if (us) { +- rs_get_cq_event(us); +- fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all); +- } else { +- fds[i].revents = rfds[i].revents; +- } +- if (fds[i].revents) +- cnt++; +- } +- return cnt; +-} +- +-/* +- * We need to poll *all* fd's that the user specifies at least once. +- * Note that we may receive events on an usocket that may not be reported +- * to the user (e.g. connection events or credit updates). Process those +- * events, then return to polling until we find ones of interest. +- */ +-int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) +-{ +- struct timeval s, e; +- struct pollfd *rfds; +- uint32_t poll_time = 0; +- int ret; +- +- do { +- ret = rs_poll_check(fds, nfds); +- if (ret || !timeout) +- return ret; +- +- if (!poll_time) +- gettimeofday(&s, NULL); +- +- gettimeofday(&e, NULL); +- poll_time = (e.tv_sec - s.tv_sec) * 1000000 + +- (e.tv_usec - s.tv_usec) + 1; +- } while (poll_time <= polling_time); +- +- rfds = rs_fds_alloc(nfds); +- if (!rfds) +- return ERR(ENOMEM); +- +- do { +- ret = rs_poll_arm(rfds, fds, nfds); +- if (ret) +- break; +- +- ret = poll(rfds, nfds, timeout); +- if (ret <= 0) +- break; +- +- ret = rs_poll_events(rfds, fds, nfds); +- } while (!ret); +- +- return ret; +-} +- +-static struct pollfd * +-rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds) +-{ +- struct pollfd *fds; +- int fd, i = 0; +- +- fds = calloc(*nfds, sizeof *fds); +- if (!fds) +- return NULL; +- +- for (fd = 0; fd < *nfds; fd++) { +- if (readfds && FD_ISSET(fd, readfds)) { +- fds[i].fd = fd; +- fds[i].events = POLLIN; +- } +- +- if (writefds && FD_ISSET(fd, writefds)) { +- fds[i].fd = fd; +- fds[i].events |= POLLOUT; +- } +- +- if (exceptfds && FD_ISSET(fd, exceptfds)) +- fds[i].fd = fd; +- +- if (fds[i].fd) +- i++; +- } +- +- *nfds = i; +- return fds; +-} +- +-static int +-rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds, +- fd_set *writefds, fd_set *exceptfds) +-{ +- int i, cnt = 0; +- +- for (i = 0; i < nfds; i++) { +- if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) { +- FD_SET(fds[i].fd, readfds); +- cnt++; +- } +- +- if (writefds && (fds[i].revents & POLLOUT)) { +- FD_SET(fds[i].fd, writefds); +- cnt++; +- } +- +- if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) { +- FD_SET(fds[i].fd, exceptfds); +- cnt++; +- } +- } +- return cnt; +-} +- +-static int rs_convert_timeout(struct timeval *timeout) +-{ +- return !timeout ? -1 : +- timeout->tv_sec * 1000 + timeout->tv_usec / 1000; +-} +- +-int rselect(int nfds, fd_set *readfds, fd_set *writefds, +- fd_set *exceptfds, struct timeval *timeout) +-{ +- struct pollfd *fds; +- int ret; +- +- fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds); +- if (!fds) +- return ERR(ENOMEM); +- +- ret = rpoll(fds, nfds, rs_convert_timeout(timeout)); +- +- if (readfds) +- FD_ZERO(readfds); +- if (writefds) +- FD_ZERO(writefds); +- if (exceptfds) +- FD_ZERO(exceptfds); +- +- if (ret > 0) +- ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds); +- +- free(fds); +- return ret; +-} +- +-/* +- * For graceful disconnect, notify the remote side that we're +- * disconnecting and wait until all outstanding sends complete. +- */ +-int rshutdown(int socket, int how) +-{ +- struct usocket *us; +- int ctrl, ret = 0; +- +- us = idm_at(&idm, socket); +- if (how == SHUT_RD) { +- us->state &= ~rs_connect_rd; +- return 0; +- } +- +- if (us->fd_flags & O_NONBLOCK) +- rs_set_nonblocking(us, 0); +- +- if (us->state & rs_connected) { +- if (how == SHUT_RDWR) { +- ctrl = RS_CTRL_DISCONNECT; +- us->state &= ~(rs_connect_rd | rs_connect_wr); +- } else { +- us->state &= ~rs_connect_wr; +- ctrl = (us->state & rs_connect_rd) ? +- RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; +- } +- if (!us->ctrl_avail) { +- ret = rs_process_cq(us, 0, rs_conn_can_send_ctrl); +- if (ret) +- return ret; +- } +- +- if ((us->state & rs_connected) && us->ctrl_avail) { +- us->ctrl_avail--; +- ret = rs_post_write_msg(us, NULL, 0, +- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); +- } +- } +- +- if (us->state & rs_connected) +- rs_process_cq(us, 0, rs_conn_all_sends_done); +- +- if ((us->fd_flags & O_NONBLOCK) && (us->state & rs_connected)) +- rs_set_nonblocking(us, 1); +- +- return 0; +-} +- +-int rclose(int socket) +-{ +- struct usocket *us; +- +- us = idm_at(&idm, socket); +- if (us->state & rs_connected) +- rshutdown(socket, SHUT_RDWR); +- +- rs_free(us); +- return 0; +-} +- +-static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len) +-{ +- socklen_t size; +- +- if (src->sa_family == AF_INET) { +- size = min(*len, sizeof(struct sockaddr_in)); +- *len = sizeof(struct sockaddr_in); +- } else { +- size = min(*len, sizeof(struct sockaddr_in6)); +- *len = sizeof(struct sockaddr_in6); +- } +- memcpy(dst, src, size); +-} +- +-int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) +-{ +- struct usocket *us; +- +- us = idm_at(&idm, socket); +- rs_copy_addr(addr, rdma_get_peer_addr(us->cm_id), addrlen); +- return 0; +-} +- +-int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) +-{ +- struct usocket *us; +- +- us = idm_at(&idm, socket); +- rs_copy_addr(addr, rdma_get_local_addr(us->cm_id), addrlen); +- return 0; +-} +- +-int rsetsockopt(int socket, int level, int optname, +- const void *optval, socklen_t optlen) +-{ +- struct usocket *us; +- int ret, opt_on = 0; +- uint64_t *opts = NULL; +- +- ret = ERR(ENOTSUP); +- us = idm_at(&idm, socket); +- switch (level) { +- case SOL_SOCKET: +- opts = &us->so_opts; +- switch (optname) { +- case SO_REUSEADDR: +- ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID, +- RDMA_OPTION_ID_REUSEADDR, +- (void *) optval, optlen); +- if (ret && ((errno == ENOSYS) || ((us->state != rs_init) && +- us->cm_id->context && +- (us->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB)))) +- ret = 0; +- opt_on = *(int *) optval; +- break; +- case SO_RCVBUF: +- if (!us->rbuf) +- us->rbuf_size = (*(uint32_t *) optval) << 1; +- ret = 0; +- break; +- case SO_SNDBUF: +- if (!us->sbuf) +- us->sbuf_size = (*(uint32_t *) optval) << 1; +- if (us->sbuf_size < RS_SNDLOWAT) +- us->sbuf_size = RS_SNDLOWAT << 1; +- ret = 0; +- break; +- case SO_LINGER: +- /* Invert value so default so_opt = 0 is on */ +- opt_on = !((struct linger *) optval)->l_onoff; +- ret = 0; +- break; +- case SO_KEEPALIVE: +- opt_on = *(int *) optval; +- ret = 0; +- break; +- case SO_OOBINLINE: +- opt_on = *(int *) optval; +- ret = 0; +- break; +- default: +- break; +- } +- break; +- case IPPROTO_TCP: +- opts = &us->tcp_opts; +- switch (optname) { +- case TCP_NODELAY: +- opt_on = *(int *) optval; +- ret = 0; +- break; +- case TCP_MAXSEG: +- ret = 0; +- break; +- default: +- break; +- } +- break; +- case IPPROTO_IPV6: +- opts = &us->ipv6_opts; +- switch (optname) { +- case IPV6_V6ONLY: +- ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID, +- RDMA_OPTION_ID_AFONLY, +- (void *) optval, optlen); +- opt_on = *(int *) optval; +- break; +- default: +- break; +- } +- break; +- case SOL_RDMA: +- if (us->state >= rs_opening) { +- ret = ERR(EINVAL); +- break; +- } +- +- switch (optname) { +- case RDMA_SQSIZE: +- us->sq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE); +- break; +- case RDMA_RQSIZE: +- us->rq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE); +- break; +- case RDMA_INLINE: +- us->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE); +- if (us->sq_inline < RS_MIN_INLINE) +- us->sq_inline = RS_MIN_INLINE; +- break; +- case RDMA_IOMAPSIZE: +- us->target_iomap_size = (uint16_t) rs_scale_to_value( +- (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8); +- break; +- default: +- break; +- } +- break; +- default: +- break; +- } +- +- if (!ret && opts) { +- if (opt_on) +- *opts |= (1 << optname); +- else +- *opts &= ~(1 << optname); +- } +- +- return ret; +-} +- +-int rgetsockopt(int socket, int level, int optname, +- void *optval, socklen_t *optlen) +-{ +- struct usocket *us; +- int ret = 0; +- +- us = idm_at(&idm, socket); +- switch (level) { +- case SOL_SOCKET: +- switch (optname) { +- case SO_REUSEADDR: +- case SO_KEEPALIVE: +- case SO_OOBINLINE: +- *((int *) optval) = !!(us->so_opts & (1 << optname)); +- *optlen = sizeof(int); +- break; +- case SO_RCVBUF: +- *((int *) optval) = us->rbuf_size; +- *optlen = sizeof(int); +- break; +- case SO_SNDBUF: +- *((int *) optval) = us->sbuf_size; +- *optlen = sizeof(int); +- break; +- case SO_LINGER: +- /* Value is inverted so default so_opt = 0 is on */ +- ((struct linger *) optval)->l_onoff = +- !(us->so_opts & (1 << optname)); +- ((struct linger *) optval)->l_linger = 0; +- *optlen = sizeof(struct linger); +- break; +- case SO_ERROR: +- *((int *) optval) = us->err; +- *optlen = sizeof(int); +- us->err = 0; +- break; +- default: +- ret = ENOTSUP; +- break; +- } +- break; +- case IPPROTO_TCP: +- switch (optname) { +- case TCP_NODELAY: +- *((int *) optval) = !!(us->tcp_opts & (1 << optname)); +- *optlen = sizeof(int); +- break; +- case TCP_MAXSEG: +- *((int *) optval) = (us->cm_id && us->cm_id->route.num_paths) ? +- 1 << (7 + us->cm_id->route.path_rec->mtu) : +- 2048; +- *optlen = sizeof(int); +- break; +- default: +- ret = ENOTSUP; +- break; +- } +- break; +- case IPPROTO_IPV6: +- switch (optname) { +- case IPV6_V6ONLY: +- *((int *) optval) = !!(us->ipv6_opts & (1 << optname)); +- *optlen = sizeof(int); +- break; +- default: +- ret = ENOTSUP; +- break; +- } +- break; +- case SOL_RDMA: +- switch (optname) { +- case RDMA_SQSIZE: +- *((int *) optval) = us->sq_size; +- *optlen = sizeof(int); +- break; +- case RDMA_RQSIZE: +- *((int *) optval) = us->rq_size; +- *optlen = sizeof(int); +- break; +- case RDMA_INLINE: +- *((int *) optval) = us->sq_inline; +- *optlen = sizeof(int); +- break; +- case RDMA_IOMAPSIZE: +- *((int *) optval) = us->target_iomap_size; +- *optlen = sizeof(int); +- break; +- default: +- ret = ENOTSUP; +- break; +- } +- break; +- default: +- ret = ENOTSUP; +- break; +- } +- +- return rdma_seterrno(ret); +-} +- +-int ufcntl(int socket, int cmd, ... /* arg */ ) +-{ +- struct usocket *us; +- va_list args; +- long param; +- int ret = 0; +- +- us = idm_at(&idm, socket); +- va_start(args, cmd); +- switch (cmd) { +- case F_GETFL: +- ret = (int) us->fd_flags; +- break; +- case F_SETFL: +- param = va_arg(args, long); +- if (param & O_NONBLOCK) +- ret = rs_set_nonblocking(us, O_NONBLOCK); +- +- if (!ret) +- us->fd_flags |= param; +- break; +- default: +- ret = ERR(ENOTSUP); +- break; +- } +- va_end(args); +- return ret; +-}