]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of usocket
authorSean Hefty <sean.hefty@intel.com>
Fri, 16 Nov 2012 04:55:16 +0000 (20:55 -0800)
committerSean Hefty <sean.hefty@intel.com>
Fri, 16 Nov 2012 04:55:16 +0000 (20:55 -0800)
docs/rsocket
src/rsocket.c
src/usocket.c [deleted file]

index 1484f65b7b5a8ee67e8a7d3b551363d924d64f95..03d49df7824fca62f5a670f5df5870df25e298a4 100644 (file)
@@ -1,7 +1,7 @@
-rsocket Protocol and Design Guide               9/10/2012
+rsocket Protocol and Design Guide               11/11/2012
 
-Overview
---------
+Data Streaming (TCP) Overview
+-----------------------------
 Rsockets is a protocol over RDMA that supports a socket-level API
 for applications.  For details on the current state of the
 implementation, readers should refer to the rsocket man page.  This
@@ -189,3 +189,47 @@ registered remote data buffer.
 From host A's perspective, the transfer appears as a normal send/write
 operation, with the data stream redirected directly into the receiving
 application's buffer.
+
+
+
+Datagram Overview
+-----------------
+THe rsocket API supports datagram sockets.  Datagram support is handled through an
+entirely different protocol and internal implementation.  Unlike connected rsockets,
+datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
+socket may use any number of network (IP) addresses, including those which map to
+different RDMA devices.  As a result, a single datagram rsocket must support
+using multiple RDMA devices and ports, and a datagram rsocket references a single
+UDP socket, plus zero or more UD QPs.
+
+Rsockets uses headers inserted before user data sent over UDP sockets to resolve
+remote UD QP numbers.  When a user first attempts to send a datagram to a remote
+address (IP and UDP port), rsockets will take the following steps.  First, it
+will allocate and store the destination address into a lookup table for future
+use.  Then it will resolve which local network address should be used when sending
+to the specified destination.  It will allocate a UD QP on the RDMA device
+associated with the local address.  Finally, rsockets will send the user's
+datagram to the remote UDP socket, but inserting a header before the datagram.
+The header specifies the UD QP number associated with the local network address
+(IP and UDP port) of the send.
+
+Under many circumstances, the rsocket UDP header also provides enough path
+information for the receiver to send the reply using a UD QP.  However, certain
+fabric topologies may require contacting a subnet administrator.  Whenever
+rsockets lacks sufficient information to send data to a remote peer using
+a UD QP, it will automatically fall back to using a standard UDP socket.  This
+helps minimize the latency for performing a given send, while lenghty address
+and route resolution protocols complete.
+
+Currently, rsockets allocates a single UD QP, with an infrastructure is provided
+to support more.  Future enhancements may add support for multiple UD QPs, each
+associated with a single network (IP) address.  Multiplexing different network
+addresses over a single UD QP and using shared receive queues are other possible
+enhancements.
+
+Because rsockets may use multiple UD QPs, buffer management can become an issue.
+The total size of receive buffers is specified by the mem_default configuration
+files, but may be overridden with rsetsockopt.  The buffer is divided into
+MTU sized messages and distributed among the allocated QPs.  As new QPs are
+created, the receive buffers may be redistributed by posting loopback sends
+on a QP in order to free the receive buffers for reposting to a different QP.  
\ No newline at end of file
index 58fcb8e583234b87e3e992375bee14bb4595a557..99e638c88a563343c0d157c2607f1ca9855e4034 100644 (file)
@@ -110,6 +110,12 @@ struct rs_msg {
        uint32_t data;
 };
 
+struct ds_qp;
+
+struct ds_msg {
+       struct ds_qp *qp;
+};
+
 struct rs_sge {
        uint64_t addr;
        uint32_t key;
@@ -169,13 +175,63 @@ enum rs_state {
 
 #define RS_OPT_SWAP_SGL 1
 
-struct rsocket {
+union socket_addr {
+       struct sockaddr         sa;
+       struct sockaddr_in      sin;
+       struct sockaddr_in6     sin6;
+};
+
+struct ds_qp {
        struct rdma_cm_id *cm_id;
+       int               rbuf_cnt;
+       uint16_t          lid;
+       uint8_t           sl;
+};
+
+struct ds_dest {
+       union socket_addr addr;
+       struct ds_qp      *qp;
+       struct ibv_ah     *ah;
+       uint32_t           qpn;
+       atomic_t           refcnt;
+};
+
+struct rsocket {
+       int               type;
+       int               index;
        fastlock_t        slock;
        fastlock_t        rlock;
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
-       fastlock_t        iomap_lock;
+       fastlock_t        map_lock; /* acquire slock first if needed */
+
+       union {
+               /* data stream */
+               struct {
+                       struct rdma_cm_id *cm_id;
+
+                       int               remote_sge;
+                       struct rs_sge     remote_sgl;
+                       struct rs_sge     remote_iomap;
+
+                       struct ibv_mr     *target_mr;
+                       int               target_sge;
+                       int               target_iomap_size;
+                       void              *target_buffer_list;
+                       volatile struct rs_sge    *target_sgl;
+                       struct rs_iomap   *target_iomap;
+               };
+               /* datagram */
+               struct {
+                       struct ds_qp      *qp_array;
+                       void              *dest_map;
+                       struct ds_dest    *conn_dest;
+                       struct pollfd     *fds;
+                       nfds_t            nfds;
+                       int               fd;
+                       int               sbytes_needed;
+               };
+       };
 
        int               opts;
        long              fd_flags;
@@ -186,7 +242,7 @@ struct rsocket {
        int               cq_armed;
        int               retries;
        int               err;
-       int               index;
+
        int               ctrl_avail;
        int               sqe_avail;
        int               sbuf_bytes_avail;
@@ -203,34 +259,37 @@ struct rsocket {
        int               rbuf_offset;
        int               rmsg_head;
        int               rmsg_tail;
-       struct rs_msg     *rmsg;
-
-       int               remote_sge;
-       struct rs_sge     remote_sgl;
-       struct rs_sge     remote_iomap;
+       union {
+               struct rs_msg     *rmsg;
+               struct ds_msg     *dmsg;
+       };
 
        struct rs_iomap_mr *remote_iomappings;
        dlist_entry       iomap_list;
        dlist_entry       iomap_queue;
        int               iomap_pending;
 
-       struct ibv_mr    *target_mr;
-       int               target_sge;
-       int               target_iomap_size;
-       void             *target_buffer_list;
-       volatile struct rs_sge    *target_sgl;
-       struct rs_iomap  *target_iomap;
-
        uint32_t          rbuf_size;
-       struct ibv_mr    *rmr;
+       struct ibv_mr     *rmr;
        uint8_t           *rbuf;
 
        uint32_t          sbuf_size;
-       struct ibv_mr    *smr;
+       struct ibv_mr     *smr;
        struct ibv_sge    ssgl[2];
        uint8_t           *sbuf;
 };
 
+#define DS_UDP_TAG 0x5555555555555555ULL
+
+struct ds_udp_header {
+       uint64_t          tag;
+       uint8_t           version;
+       uint8_t           sl;
+       uint16_t          slid;
+       uint32_t          qpn;  /* upper 8-bits reserved */
+};
+
+
 static int rs_value_to_scale(int value, int bits)
 {
        return value <= (1 << (bits - 1)) ?
@@ -306,10 +365,10 @@ out:
        pthread_mutex_unlock(&mut);
 }
 
-static int rs_insert(struct rsocket *rs)
+static int rs_insert(struct rsocket *rs, index)
 {
        pthread_mutex_lock(&mut);
-       rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs);
+       rs->index = idm_set(&idm, index, rs);
        pthread_mutex_unlock(&mut);
        return rs->index;
 }
@@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
 }
 
-static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
 {
        struct rsocket *rs;
 
@@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
 
+       rs->type = type;
        rs->index = -1;
+       rs->fd = -1;
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
@@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
-       fastlock_init(&rs->iomap_lock);
+       fastlock_init(&rs->map_lock);
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
@@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
 
-       fastlock_destroy(&rs->iomap_lock);
+       if (rs->fd >= 0)
+               close(rs->fd);
+       if (rs->fds)
+               free(rs->fds);
+
+       fastlock_destroy(&rs->map_lock);
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
@@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
 }
 
+static int ds_init(struct rsocket *rs, int domain)
+{
+       rs->fd = socket(domain, SOCK_DGRAM, 0);
+       if (rs->fd < 0)
+               return rs->fd;
+
+       rs->fds = calloc(1, sizeof *fds);
+       if (!rs->fds)
+               return ERR(ENOMEM);
+       rs->nfds = 1;
+
+       return 0;
+}
+
 int rsocket(int domain, int type, int protocol)
 {
        struct rsocket *rs;
-       int ret;
+       int index, ret;
 
        if ((domain != PF_INET && domain != PF_INET6) ||
-           (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
+           ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
+           (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
+           (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
                return ERR(ENOTSUP);
 
        rs_configure();
-       rs = rs_alloc(NULL);
+       rs = rs_alloc(NULL, type);
        if (!rs)
                return ERR(ENOMEM);
 
-       ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
-       if (ret)
-               goto err;
+       if (type == SOCK_STREAM) {
+               ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+               if (ret)
+                       goto err;
+
+               rs->cm_id->route.addr.src_addr.sa_family = domain;
+               index = rs->cm_id->channel->fd;
+       } else {
+               ret = ds_init(rs, domain);
+               if (ret)
+                       goto err;
 
-       ret = rs_insert(rs);
+               index = rs->fd;
+       }
+
+       ret = rs_insert(rs, index);
        if (ret < 0)
                goto err;
 
-       rs->cm_id->route.addr.src_addr.sa_family = domain;
        return rs->index;
 
 err:
@@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
 
        rs = idm_at(&idm, socket);
-       ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
-       if (!ret)
-               rs->state = rs_bound;
+       if (rs->type == SOCK_STREAM) {
+               ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
+               if (!ret)
+                       rs->state = rs_bound;
+       } else {
+               ret = bind(rs->fd, addr, addrlen);
+       }
        return ret;
 }
 
@@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
 
        rs = idm_at(&idm, socket);
-       new_rs = rs_alloc(rs);
+       new_rs = rs_alloc(rs, rs->type);
        if (!new_rs)
                return ERR(ENOMEM);
 
@@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
 
-       ret = rs_insert(new_rs);
+       ret = rs_insert(new_rs, rs->cm_id->channel->fd);
        if (ret < 0)
                goto err;
 
@@ -854,6 +950,66 @@ connected:
        return ret;
 }
 
+static int ds_compare_dest(const void *dst1, const void *dst2)
+{
+       const struct sockaddr *sa1, *sa2;
+       size_t len;
+
+       sa1 = (const struct sockaddr *) dst1;
+       sa2 = (const struct sockaddr *) dst2;
+
+       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
+       return memcmp(dst1, dst2, len);
+}
+
+/* Caller must hold map_lock around accessing source address */
+static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
+                                         socklen_t dst_len, socklen_t *src_len)
+{
+       static union socket_addr src_addr;
+       int sock, ret;
+
+       sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
+       if (sock < 0)
+               return NULL;
+
+       ret = connect(sock, &dst_addr->sa, dst_len);
+       if (ret)
+               goto out;
+
+       *src_len = sizeof src_addr;
+       ret = getsockname(sock, &src_addr.sa, src_len);
+
+out:
+       close(sock);
+       return ret ? NULL : &src_addr;
+}
+
+static struct ds_qp *ds_get_qp(struct rsocket *rs,
+                              const struct sockaddr *src_addr, socklen_t addrlen)
+{
+       union socket_addr *addr;
+       socklen_t len;
+
+}
+
+static int ds_connect(struct rsocket *rs,
+                     const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+       struct ds_dest **dest;
+
+       fastlock_acquire(&rs->map_lock);
+       dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
+       if (dest) {
+               rs->conn_dest = *dest;
+               goto out;
+       }
+out:
+       fastlock_release(&rs->map_lock);
+       return 0;
+}
+
 int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 {
        struct rsocket *rs;
@@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int ds_post_send(struct rsocket *rs,
+                       struct ibv_sge *sgl, int nsge,
+                       uint64_t wr_id, int flags)
+{
+       struct ibv_send_wr wr, *bad;
+
+       wr.wr_id = wr_id;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+       wr.opcode = IBV_WR_SEND;
+       wr.send_flags = flags;
+       wr.wr.ud.ah = rs->conn_dest->ah;
+       wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+       wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
+
+       return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+}
+
 /*
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
@@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs,
                                 flags, addr, rkey);
 }
 
+static int ds_send_data(struct rsocket *rs,
+                       struct ibv_sge *sgl, int nsge,
+                       uint32_t length, int flags)
+{
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= length;
+       return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
+}
+
 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
                           struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
 {
@@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
 }
 
+static int ds_can_send(struct rsocket *rs)
+{
+       return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
+}
+
 static int rs_conn_can_send(struct rsocket *rs)
 {
        return rs_can_send(rs) || !(rs->state & rs_connect_wr);
@@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
 
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
        while (!dlist_empty(&rs->iomap_queue)) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
@@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
 
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return ret;
 }
 
+static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+                           int iovcnt, int flags)
+{
+       struct ds_udp_header hdr;
+       struct msghdr msg;
+       struct iovec miov[4];
+       struct ds_qp *qp;
+
+       if (iovcnt > 4)
+               return ERR(ENOTSUP);
+
+       qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
+       hdr.tag = htonll(DS_UDP_TAG);
+       hdr.version = 1;
+       if (qp) {
+               hdr.sl = qp->sl;
+               hdr.slid = htons(qp->lid);
+               hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
+       } else {
+               hdr.sl = 0;
+               hdr.slid = 0;
+               hdr.qpn = 0;
+       }
+
+       miov[0].iov_base = &hdr;
+       miov[0].iov_len = sizeof hdr;
+       memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+       memset(&msg, 0, sizeof msg);
+       /* TODO: specify name if needed */
+       msg.msg_iov = miov;
+       msg.msg_iovlen = iovcnt + 1;
+       return sendmsg(rs->fd, msg, flags);
+}
+
+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+       struct iovec iov;
+       iov.iov_base = buf;
+       iov_iov_len = len;
+       return ds_sendv_udp(s, &iov, 1, flags);
+}
+
+static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+       struct ibv_sge sge;
+       int ret = 0;
+
+       if (!rs->conn_dest || !rs->conn_dest->ah)
+               return ds_send_udp(rs, buf, len, flags);
+
+       rs->sbytes_needed = len;
+       if (!ds_can_send(rs)) {
+               ret = rs_get_comp(rs, 1, ds_can_send);
+               if (ret)
+                       return ds_send_udp(rs, buf, len, flags);
+       }
+
+       if (len <= rs->sq_inline) {
+               sge.addr = (uintptr_t) buf;
+               sge.length = len;
+               sge.lkey = 0;
+               ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
+       } else if (len <= rs_sbuf_left(rs)) {
+               memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
+               rs->ssgl[0].length = len;
+               ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
+               if (len < rs_sbuf_left(rs))
+                       rs->ssgl[0].addr += len;
+               else
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+       } else {
+               rs->ssgl[0].length = rs_sbuf_left(rs);
+               memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+                       rs->ssgl[0].length);
+               rs->ssgl[1].length = len - rs->ssgl[0].length;
+               memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+               ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
+               rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+       }
+
+       return ret ? ret : len;
+}
+
 /*
  * We overlap sending the data, by posting a small work request immediately,
  * then increasing the size of the send on each iteration.
@@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
 
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM) {
+               fastlock_acquire(&rs->slock);
+               ret = dsend(rs, buf, len, flags);
+               fastlock_release(&rs->slock);
+               return ret;
+       }
+
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
@@ -1537,10 +1817,21 @@ out:
 ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
 {
-       if (dest_addr || addrlen)
-               return ERR(EISCONN);
+       struct rsocket *rs;
+
+       rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_STREAM) {
+               if (dest_addr || addrlen)
+                       return ERR(EISCONN);
+
+               return rsend(socket, buf, len, flags);
+       }
 
-       return rsend(socket, buf, len, flags);
+       fastlock_acquire(&rs->slock);
+       ds_connect(rs, dest_addr, addrlen);
+       ret = dsend(rs, buf, len, flags);
+       fastlock_release(&rs->slock);
+       return ret;
 }
 
 static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
@@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
 
-       return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
+       return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
 }
 
 ssize_t rwrite(int socket, const void *buf, size_t count)
@@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
 
        rs = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
-       return 0;
+       if (rs->type == SOCK_STREAM) {
+               rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+               return 0;
+       } else {
+               return getpeername(rs->fs, addr, addrlen);
+       }
 }
 
 int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
@@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
 
        rs = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
-       return 0;
+       if (rs->type == SOCK_STREAM) {
+               rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+               return 0;
+       } else {
+               return getsockname(rs->fd, addr, addrlen);
+       }
 }
 
 int rsetsockopt(int socket, int level, int optname,
@@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname,
 
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
+               ret = setsockopt(rs->fd, optname, optval, optlen);
+               if (ret)
+                       return ret;
+       }
+
        switch (level) {
        case SOL_SOCKET:
                opts = &rs->so_opts;
@@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname,
        int ret = 0;
 
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
+               return getsockopt(rs->fd, level, optname, optval, optlen);
+
        switch (level) {
        case SOL_SOCKET:
                switch (optname) {
@@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
 
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
@@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
 out:
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return offset;
 }
 
@@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
 
        rs = idm_at(&idm, socket);
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
 
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
@@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
 out:
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return ret;
 }
 
@@ -2475,3 +2783,24 @@ out:
 
        return (ret && left == count) ? ret : count - left;
 }
+
+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
+                 struct sockaddr *src_addr, socklen_t *addrlen)
+{
+       int ret;
+
+       ret = rrecv(socket, buf, len, flags);
+       if (ret > 0 && src_addr)
+               rgetpeername(socket, src_addr, addrlen);
+
+       return ret;
+}
+
+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
+               const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+       if (dest_addr || addrlen)
+               return ERR(EISCONN);
+
+       return usend(socket, buf, len, flags);
+}
diff --git a/src/usocket.c b/src/usocket.c
deleted file mode 100644 (file)
index 87da990..0000000
+++ /dev/null
@@ -1,2253 +0,0 @@
-/*
- * Copyright (c) 2012 Intel Corporation.  All rights reserved.
- *
- * This software is available to you under a choice of one of two
- * licenses.  You may choose to be licensed under the terms of the GNU
- * General Public License (GPL) Version 2, available from the file
- * COPYING in the main directory of this source tree, or the
- * OpenIB.org BSD license below:
- *
- *     Redistribution and use in source and binary forms, with or
- *     without modification, are permitted provided that the following
- *     conditions are met:
- *
- *      - Redistributions of source code must retain the above
- *        copyright notice, this list of conditions and the following
- *        disclaimer.
- *
- *      - Redistributions in binary form must reproduce the above
- *        copyright notice, this list of conditions and the following
- *        disclaimer in the documentation and/or other materials
- *        provided with the distribution.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
- * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
- * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- *
- */
-
-#if HAVE_CONFIG_H
-#  include <config.h>
-#endif /* HAVE_CONFIG_H */
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/time.h>
-#include <stdarg.h>
-#include <netdb.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <string.h>
-#include <netinet/in.h>
-
-#include <rdma/rdma_cma.h>
-#include <rdma/rdma_verbs.h>
-#include <rdma/usocket.h>
-#include "cma.h"
-#include "indexer.h"
-
-//#define RS_SNDLOWAT 64
-//#define RS_QP_MAX_SIZE 0xFFFE
-//#define RS_SGL_SIZE 2
-//static struct index_map idm;
-//static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-
-//static uint16_t def_inline = 64;
-//static uint16_t def_sqsize = 384;
-//static uint16_t def_rqsize = 384;
-//static uint32_t def_mem = (1 << 17);
-//static uint32_t def_wmem = (1 << 17);
-//static uint32_t polling_time = 10;
-
-//enum {
-//     RS_OP_DATA,
-//     RS_OP_RSVD_DATA_MORE,
-//     RS_OP_WRITE, /* opcode is not transmitted over the network */
-//     RS_OP_RSVD_DRA_MORE,
-//     RS_OP_SGL,
-//     RS_OP_RSVD,
-//     RS_OP_IOMAP_SGL,
-//     RS_OP_CTRL
-//};
-//#define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
-//#define rs_msg_op(imm_data)   (imm_data >> 29)
-//#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-
-struct rs_msg {
-       uint32_t op;
-       uint32_t data;
-};
-
-struct rs_sge {
-       uint64_t addr;
-       uint32_t key;
-       uint32_t length;
-};
-
-struct rs_iomap {
-       uint64_t offset;
-       struct rs_sge sge;
-};
-
-struct rs_iomap_mr {
-       uint64_t offset;
-       struct ibv_mr *mr;
-       dlist_entry entry;
-       atomic_t refcnt;
-       int index;      /* -1 if mapping is local and not in iomap_list */
-};
-
-#define RS_MIN_INLINE      (sizeof(struct rs_sge))
-#define rs_host_is_net()   (1 == htonl(1))
-#define RS_CONN_FLAG_NET   (1 << 0)
-#define RS_CONN_FLAG_IOMAP (1 << 1)
-
-struct rs_conn_data {
-       uint8_t           version;
-       uint8_t           flags;
-       uint16_t          credits;
-       uint8_t           reserved[3];
-       uint8_t           target_iomap_size;
-       struct rs_sge     target_sgl;
-       struct rs_sge     data_buf;
-};
-
-#define RS_RECV_WR_ID (~((uint64_t) 0))
-
-/*
- * usocket states are ordered as passive, connecting, connected, disconnected.
- */
-enum rs_state {
-       rs_init,
-       rs_bound           =                0x0001,
-       rs_listening       =                0x0002,
-       rs_opening         =                0x0004,
-       rs_resolving_addr  = rs_opening |   0x0010,
-       rs_resolving_route = rs_opening |   0x0020,
-       rs_connecting      = rs_opening |   0x0040,
-       rs_accepting       = rs_opening |   0x0080,
-       rs_connected       =                0x0100,
-       rs_connect_wr      =                0x0200,
-       rs_connect_rd      =                0x0400,
-       rs_connect_rdwr    = rs_connected | rs_connect_rd | rs_connect_wr,
-       rs_connect_error   =                0x0800,
-       rs_disconnected    =                0x1000,
-       rs_error           =                0x2000,
-};
-
-#define RS_OPT_SWAP_SGL 1
-
-struct usocket {
-       struct rdma_cm_id *cm_id;
-       fastlock_t        slock;
-       fastlock_t        rlock;
-       fastlock_t        cq_lock;
-       fastlock_t        cq_wait_lock;
-       fastlock_t        iomap_lock;
-
-       int               opts;
-       long              fd_flags;
-       uint64_t          so_opts;
-       uint64_t          tcp_opts;
-       uint64_t          ipv6_opts;
-       int               state;
-       int               cq_armed;
-       int               retries;
-       int               err;
-       int               index;
-       int               ctrl_avail;
-       int               sqe_avail;
-       int               sbuf_bytes_avail;
-       uint16_t          sseq_no;
-       uint16_t          sseq_comp;
-       uint16_t          sq_size;
-       uint16_t          sq_inline;
-
-       uint16_t          rq_size;
-       uint16_t          rseq_no;
-       uint16_t          rseq_comp;
-       int               rbuf_bytes_avail;
-       int               rbuf_free_offset;
-       int               rbuf_offset;
-       int               rmsg_head;
-       int               rmsg_tail;
-       struct rs_msg     *rmsg;
-
-       int               remote_sge;
-       struct rs_sge     remote_sgl;
-       struct rs_sge     remote_iomap;
-
-       struct rs_iomap_mr *remote_iomappings;
-       dlist_entry       iomap_list;
-       dlist_entry       iomap_queue;
-       int               iomap_pending;
-
-       struct ibv_mr    *target_mr;
-       int               target_sge;
-       int               target_iomap_size;
-       void             *target_buffer_list;
-       volatile struct rs_sge    *target_sgl;
-       struct rs_iomap  *target_iomap;
-
-       uint32_t          rbuf_size;
-       struct ibv_mr    *rmr;
-       uint8_t           *rbuf;
-
-       uint32_t          sbuf_size;
-       struct ibv_mr    *smr;
-       struct ibv_sge    ssgl[2];
-       uint8_t           *sbuf;
-};
-
-static int rs_value_to_scale(int value, int bits)
-{
-       return value <= (1 << (bits - 1)) ?
-              value : (1 << (bits - 1)) | (value >> bits);
-}
-
-static int rs_scale_to_value(int value, int bits)
-{
-       return value <= (1 << (bits - 1)) ?
-              value : (value & ~(1 << (bits - 1))) << bits;
-}
-
-void rs_configure(void)
-{
-       FILE *f;
-       static int init;
-
-       if (init)
-               return;
-
-       pthread_mutex_lock(&mut);
-       if (init)
-               goto out;
-
-       if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
-               (void) fscanf(f, "%u", &polling_time);
-               fclose(f);
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
-               (void) fscanf(f, "%hu", &def_inline);
-               fclose(f);
-
-               if (def_inline < RS_MIN_INLINE)
-                       def_inline = RS_MIN_INLINE;
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
-               (void) fscanf(f, "%hu", &def_sqsize);
-               fclose(f);
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) {
-               (void) fscanf(f, "%hu", &def_rqsize);
-               fclose(f);
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) {
-               (void) fscanf(f, "%u", &def_mem);
-               fclose(f);
-
-               if (def_mem < 1)
-                       def_mem = 1;
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
-               (void) fscanf(f, "%u", &def_wmem);
-               fclose(f);
-               if (def_wmem < RS_SNDLOWAT)
-                       def_wmem = RS_SNDLOWAT << 1;
-       }
-
-       if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
-               (void) fscanf(f, "%hu", &def_iomap_size);
-               fclose(f);
-
-               /* round to supported values */
-               def_iomap_size = (uint8_t) rs_value_to_scale(
-                       (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
-       }
-       init = 1;
-out:
-       pthread_mutex_unlock(&mut);
-}
-
-static int rs_insert(struct usocket *us)
-{
-       pthread_mutex_lock(&mut);
-       us->index = idm_set(&idm, us->cm_id->channel->fd, us);
-       pthread_mutex_unlock(&mut);
-       return us->index;
-}
-
-static void rs_remove(struct usocket *us)
-{
-       pthread_mutex_lock(&mut);
-       idm_clear(&idm, us->index);
-       pthread_mutex_unlock(&mut);
-}
-
-static struct usocket *rs_alloc(struct usocket *inherited_rs)
-{
-       struct usocket *us;
-
-       us = calloc(1, sizeof *us);
-       if (!us)
-               return NULL;
-
-       us->index = -1;
-       if (inherited_rs) {
-               us->sbuf_size = inherited_rs->sbuf_size;
-               us->rbuf_size = inherited_rs->rbuf_size;
-               us->sq_inline = inherited_rs->sq_inline;
-               us->sq_size = inherited_rs->sq_size;
-               us->rq_size = inherited_rs->rq_size;
-               us->ctrl_avail = inherited_rs->ctrl_avail;
-               us->target_iomap_size = inherited_rs->target_iomap_size;
-       } else {
-               us->sbuf_size = def_wmem;
-               us->rbuf_size = def_mem;
-               us->sq_inline = def_inline;
-               us->sq_size = def_sqsize;
-               us->rq_size = def_rqsize;
-               us->ctrl_avail = RS_QP_CTRL_SIZE;
-               us->target_iomap_size = def_iomap_size;
-       }
-       fastlock_init(&us->slock);
-       fastlock_init(&us->rlock);
-       fastlock_init(&us->cq_lock);
-       fastlock_init(&us->cq_wait_lock);
-       fastlock_init(&us->iomap_lock);
-       dlist_init(&us->iomap_list);
-       dlist_init(&us->iomap_queue);
-       return us;
-}
-
-static int rs_set_nonblocking(struct usocket *us, long arg)
-{
-       int ret = 0;
-
-       if (us->cm_id->recv_cq_channel)
-               ret = fcntl(us->cm_id->recv_cq_channel->fd, F_SETFL, arg);
-
-       if (!ret && us->state < rs_connected)
-               ret = fcntl(us->cm_id->channel->fd, F_SETFL, arg);
-
-       return ret;
-}
-
-static void rs_set_qp_size(struct usocket *us)
-{
-       uint16_t max_size;
-
-       max_size = min(ucma_max_qpsize(us->cm_id), RS_QP_MAX_SIZE);
-
-       if (us->sq_size > max_size)
-               us->sq_size = max_size;
-       else if (us->sq_size < 2)
-               us->sq_size = 2;
-       if (us->sq_size <= (RS_QP_CTRL_SIZE << 2))
-               us->ctrl_avail = 1;
-
-       if (us->rq_size > max_size)
-               us->rq_size = max_size;
-       else if (us->rq_size < 2)
-               us->rq_size = 2;
-}
-
-static int rs_init_bufs(struct usocket *us)
-{
-       size_t len;
-
-       us->rmsg = calloc(us->rq_size + 1, sizeof(*us->rmsg));
-       if (!us->rmsg)
-               return -1;
-
-       us->sbuf = calloc(us->sbuf_size, sizeof(*us->sbuf));
-       if (!us->sbuf)
-               return -1;
-
-       us->smr = rdma_reg_msgs(us->cm_id, us->sbuf, us->sbuf_size);
-       if (!us->smr)
-               return -1;
-
-       len = sizeof(*us->target_sgl) * RS_SGL_SIZE +
-             sizeof(*us->target_iomap) * us->target_iomap_size;
-       us->target_buffer_list = malloc(len);
-       if (!us->target_buffer_list)
-               return -1;
-
-       us->target_mr = rdma_reg_write(us->cm_id, us->target_buffer_list, len);
-       if (!us->target_mr)
-               return -1;
-
-       memset(us->target_buffer_list, 0, len);
-       us->target_sgl = us->target_buffer_list;
-       if (us->target_iomap_size)
-               us->target_iomap = (struct rs_iomap *) (us->target_sgl + RS_SGL_SIZE);
-
-       us->rbuf = calloc(us->rbuf_size, sizeof(*us->rbuf));
-       if (!us->rbuf)
-               return -1;
-
-       us->rmr = rdma_reg_write(us->cm_id, us->rbuf, us->rbuf_size);
-       if (!us->rmr)
-               return -1;
-
-       us->ssgl[0].addr = us->ssgl[1].addr = (uintptr_t) us->sbuf;
-       us->sbuf_bytes_avail = us->sbuf_size;
-       us->ssgl[0].lkey = us->ssgl[1].lkey = us->smr->lkey;
-
-       us->rbuf_free_offset = us->rbuf_size >> 1;
-       us->rbuf_bytes_avail = us->rbuf_size >> 1;
-       us->sqe_avail = us->sq_size - us->ctrl_avail;
-       us->rseq_comp = us->rq_size >> 1;
-       return 0;
-}
-
-static int rs_create_cq(struct usocket *us)
-{
-       us->cm_id->recv_cq_channel = ibv_create_comp_channel(us->cm_id->verbs);
-       if (!us->cm_id->recv_cq_channel)
-               return -1;
-
-       us->cm_id->recv_cq = ibv_create_cq(us->cm_id->verbs, us->sq_size + us->rq_size,
-                                          us->cm_id, us->cm_id->recv_cq_channel, 0);
-       if (!us->cm_id->recv_cq)
-               goto err1;
-
-       if (us->fd_flags & O_NONBLOCK) {
-               if (rs_set_nonblocking(us, O_NONBLOCK))
-                       goto err2;
-       }
-
-       us->cm_id->send_cq_channel = us->cm_id->recv_cq_channel;
-       us->cm_id->send_cq = us->cm_id->recv_cq;
-       return 0;
-
-err2:
-       ibv_destroy_cq(us->cm_id->recv_cq);
-       us->cm_id->recv_cq = NULL;
-err1:
-       ibv_destroy_comp_channel(us->cm_id->recv_cq_channel);
-       us->cm_id->recv_cq_channel = NULL;
-       return -1;
-}
-
-static inline int
-rs_post_recv(struct usocket *us)
-{
-       struct ibv_recv_wr wr, *bad;
-
-       wr.wr_id = RS_RECV_WR_ID;
-       wr.next = NULL;
-       wr.sg_list = NULL;
-       wr.num_sge = 0;
-
-       return rdma_seterrno(ibv_post_recv(us->cm_id->qp, &wr, &bad));
-}
-
-static int rs_create_ep(struct usocket *us)
-{
-       struct ibv_qp_init_attr qp_attr;
-       int i, ret;
-
-       rs_set_qp_size(us);
-       ret = rs_init_bufs(us);
-       if (ret)
-               return ret;
-
-       ret = rs_create_cq(us);
-       if (ret)
-               return ret;
-
-       memset(&qp_attr, 0, sizeof qp_attr);
-       qp_attr.qp_context = us;
-       qp_attr.send_cq = us->cm_id->send_cq;
-       qp_attr.recv_cq = us->cm_id->recv_cq;
-       qp_attr.qp_type = IBV_QPT_RC;
-       qp_attr.sq_sig_all = 1;
-       qp_attr.cap.max_send_wr = us->sq_size;
-       qp_attr.cap.max_recv_wr = us->rq_size;
-       qp_attr.cap.max_send_sge = 2;
-       qp_attr.cap.max_recv_sge = 1;
-       qp_attr.cap.max_inline_data = us->sq_inline;
-
-       ret = rdma_create_qp(us->cm_id, NULL, &qp_attr);
-       if (ret)
-               return ret;
-
-       for (i = 0; i < us->rq_size; i++) {
-               ret = rs_post_recv(us);
-               if (ret)
-                       return ret;
-       }
-       return 0;
-}
-
-static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
-{
-       if (atomic_dec(&iomr->refcnt))
-               return;
-
-       dlist_remove(&iomr->entry);
-       ibv_dereg_mr(iomr->mr);
-       if (iomr->index >= 0)
-               iomr->mr = NULL;
-       else
-               free(iomr);
-}
-
-static void rs_free_iomappings(struct usocket *us)
-{
-       struct rs_iomap_mr *iomr;
-
-       while (!dlist_empty(&us->iomap_list)) {
-               iomr = container_of(us->iomap_list.next,
-                                   struct rs_iomap_mr, entry);
-               riounmap(us->index, iomr->mr->addr, iomr->mr->length);
-       }
-       while (!dlist_empty(&us->iomap_queue)) {
-               iomr = container_of(us->iomap_queue.next,
-                                   struct rs_iomap_mr, entry);
-               riounmap(us->index, iomr->mr->addr, iomr->mr->length);
-       }
-}
-
-static void rs_free(struct usocket *us)
-{
-       if (us->index >= 0)
-               rs_remove(us);
-
-       if (us->rmsg)
-               free(us->rmsg);
-
-       if (us->sbuf) {
-               if (us->smr)
-                       rdma_dereg_mr(us->smr);
-               free(us->sbuf);
-       }
-
-       if (us->rbuf) {
-               if (us->rmr)
-                       rdma_dereg_mr(us->rmr);
-               free(us->rbuf);
-       }
-
-       if (us->target_buffer_list) {
-               if (us->target_mr)
-                       rdma_dereg_mr(us->target_mr);
-               free(us->target_buffer_list);
-       }
-
-       if (us->cm_id) {
-               rs_free_iomappings(us);
-               if (us->cm_id->qp)
-                       rdma_destroy_qp(us->cm_id);
-               rdma_destroy_id(us->cm_id);
-       }
-
-       fastlock_destroy(&us->iomap_lock);
-       fastlock_destroy(&us->cq_wait_lock);
-       fastlock_destroy(&us->cq_lock);
-       fastlock_destroy(&us->rlock);
-       fastlock_destroy(&us->slock);
-       free(us);
-}
-
-static void rs_set_conn_data(struct usocket *us, struct rdma_conn_param *param,
-                            struct rs_conn_data *conn)
-{
-       conn->version = 1;
-       conn->flags = RS_CONN_FLAG_IOMAP |
-                     (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
-       conn->credits = htons(us->rq_size);
-       memset(conn->reserved, 0, sizeof conn->reserved);
-       conn->target_iomap_size = (uint8_t) rs_value_to_scale(us->target_iomap_size, 8);
-
-       conn->target_sgl.addr = htonll((uintptr_t) us->target_sgl);
-       conn->target_sgl.length = htonl(RS_SGL_SIZE);
-       conn->target_sgl.key = htonl(us->target_mr->rkey);
-
-       conn->data_buf.addr = htonll((uintptr_t) us->rbuf);
-       conn->data_buf.length = htonl(us->rbuf_size >> 1);
-       conn->data_buf.key = htonl(us->rmr->rkey);
-
-       param->private_data = conn;
-       param->private_data_len = sizeof *conn;
-}
-
-static void rs_save_conn_data(struct usocket *us, struct rs_conn_data *conn)
-{
-       us->remote_sgl.addr = ntohll(conn->target_sgl.addr);
-       us->remote_sgl.length = ntohl(conn->target_sgl.length);
-       us->remote_sgl.key = ntohl(conn->target_sgl.key);
-       us->remote_sge = 1;
-       if ((rs_host_is_net() && !(conn->flags & RS_CONN_FLAG_NET)) ||
-           (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
-               us->opts = RS_OPT_SWAP_SGL;
-
-       if (conn->flags & RS_CONN_FLAG_IOMAP) {
-               us->remote_iomap.addr = us->remote_sgl.addr +
-                                       sizeof(us->remote_sgl) * us->remote_sgl.length;
-               us->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
-               us->remote_iomap.key = us->remote_sgl.key;
-       }
-
-       us->target_sgl[0].addr = ntohll(conn->data_buf.addr);
-       us->target_sgl[0].length = ntohl(conn->data_buf.length);
-       us->target_sgl[0].key = ntohl(conn->data_buf.key);
-
-       us->sseq_comp = ntohs(conn->credits);
-}
-
-int usocket(int domain, int type, int protocol)
-{
-       struct usocket *us;
-       int ret;
-
-       if ((domain != PF_INET && domain != PF_INET6) ||
-           (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
-               return ERR(ENOTSUP);
-
-       rs_configure();
-       us = rs_alloc(NULL);
-       if (!us)
-               return ERR(ENOMEM);
-
-       ret = rdma_create_id(NULL, &us->cm_id, us, RDMA_PS_TCP);
-       if (ret)
-               goto err;
-
-       ret = rs_insert(us);
-       if (ret < 0)
-               goto err;
-
-       us->cm_id->route.addr.src_addr.sa_family = domain;
-       return us->index;
-
-err:
-       rs_free(us);
-       return ret;
-}
-
-int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
-{
-       struct usocket *us;
-       int ret;
-
-       us = idm_at(&idm, socket);
-       ret = rdma_bind_addr(us->cm_id, (struct sockaddr *) addr);
-       if (!ret)
-               us->state = rs_bound;
-       return ret;
-}
-
-int rlisten(int socket, int backlog)
-{
-       struct usocket *us;
-       int ret;
-
-       us = idm_at(&idm, socket);
-       ret = rdma_listen(us->cm_id, backlog);
-       if (!ret)
-               us->state = rs_listening;
-       return ret;
-}
-
-/*
- * Nonblocking is usually not inherited between sockets, but we need to
- * inherit it here to establish the connection only.  This is needed to
- * prevent rdma_accept from blocking until the remote side finishes
- * establishing the connection.  If we were to allow rdma_accept to block,
- * then a single thread cannot establish a connection with itself, or
- * two threads which try to connect to each other can deadlock trying to
- * form a connection.
- *
- * Data transfers on the new socket remain blocking unless the user
- * specifies otherwise through rfcntl.
- */
-int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
-{
-       struct usocket *us, *new_rs;
-       struct rdma_conn_param param;
-       struct rs_conn_data *creq, cresp;
-       int ret;
-
-       us = idm_at(&idm, socket);
-       new_rs = rs_alloc(us);
-       if (!new_rs)
-               return ERR(ENOMEM);
-
-       ret = rdma_get_request(us->cm_id, &new_rs->cm_id);
-       if (ret)
-               goto err;
-
-       ret = rs_insert(new_rs);
-       if (ret < 0)
-               goto err;
-
-       creq = (struct rs_conn_data *) new_rs->cm_id->event->param.conn.private_data;
-       if (creq->version != 1) {
-               ret = ERR(ENOTSUP);
-               goto err;
-       }
-
-       if (us->fd_flags & O_NONBLOCK)
-               rs_set_nonblocking(new_rs, O_NONBLOCK);
-
-       ret = rs_create_ep(new_rs);
-       if (ret)
-               goto err;
-
-       rs_save_conn_data(new_rs, creq);
-       param = new_rs->cm_id->event->param.conn;
-       rs_set_conn_data(new_rs, &param, &cresp);
-       ret = rdma_accept(new_rs->cm_id, &param);
-       if (!ret)
-               new_rs->state = rs_connect_rdwr;
-       else if (errno == EAGAIN || errno == EWOULDBLOCK)
-               new_rs->state = rs_accepting;
-       else
-               goto err;
-
-       if (addr && addrlen)
-               rgetpeername(new_rs->index, addr, addrlen);
-       return new_rs->index;
-
-err:
-       rs_free(new_rs);
-       return ret;
-}
-
-static int rs_do_connect(struct usocket *us)
-{
-       struct rdma_conn_param param;
-       struct rs_conn_data creq, *cresp;
-       int to, ret;
-
-       switch (us->state) {
-       case rs_init:
-       case rs_bound:
-resolve_addr:
-               to = 1000 << us->retries++;
-               ret = rdma_resolve_addr(us->cm_id, NULL,
-                                       &us->cm_id->route.addr.dst_addr, to);
-               if (!ret)
-                       goto resolve_route;
-               if (errno == EAGAIN || errno == EWOULDBLOCK)
-                       us->state = rs_resolving_addr;
-               break;
-       case rs_resolving_addr:
-               ret = ucma_complete(us->cm_id);
-               if (ret) {
-                       if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES)
-                               goto resolve_addr;
-                       break;
-               }
-
-               us->retries = 0;
-resolve_route:
-               to = 1000 << us->retries++;
-               ret = rdma_resolve_route(us->cm_id, to);
-               if (!ret)
-                       goto do_connect;
-               if (errno == EAGAIN || errno == EWOULDBLOCK)
-                       us->state = rs_resolving_route;
-               break;
-       case rs_resolving_route:
-               ret = ucma_complete(us->cm_id);
-               if (ret) {
-                       if (errno == ETIMEDOUT && us->retries <= RS_CONN_RETRIES)
-                               goto resolve_route;
-                       break;
-               }
-do_connect:
-               ret = rs_create_ep(us);
-               if (ret)
-                       break;
-
-               memset(&param, 0, sizeof param);
-               rs_set_conn_data(us, &param, &creq);
-               param.flow_control = 1;
-               param.retry_count = 7;
-               param.rnr_retry_count = 7;
-               us->retries = 0;
-
-               ret = rdma_connect(us->cm_id, &param);
-               if (!ret)
-                       goto connected;
-               if (errno == EAGAIN || errno == EWOULDBLOCK)
-                       us->state = rs_connecting;
-               break;
-       case rs_connecting:
-               ret = ucma_complete(us->cm_id);
-               if (ret)
-                       break;
-connected:
-               cresp = (struct rs_conn_data *) us->cm_id->event->param.conn.private_data;
-               if (cresp->version != 1) {
-                       ret = ERR(ENOTSUP);
-                       break;
-               }
-
-               rs_save_conn_data(us, cresp);
-               us->state = rs_connect_rdwr;
-               break;
-       case rs_accepting:
-               if (!(us->fd_flags & O_NONBLOCK))
-                       rs_set_nonblocking(us, 0);
-
-               ret = ucma_complete(us->cm_id);
-               if (ret)
-                       break;
-
-               us->state = rs_connect_rdwr;
-               break;
-       default:
-               ret = ERR(EINVAL);
-               break;
-       }
-
-       if (ret) {
-               if (errno == EAGAIN || errno == EWOULDBLOCK) {
-                       errno = EINPROGRESS;
-               } else {
-                       us->state = rs_connect_error;
-                       us->err = errno;
-               }
-       }
-       return ret;
-}
-
-int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-{
-       struct usocket *us;
-
-       us = idm_at(&idm, socket);
-       memcpy(&us->cm_id->route.addr.dst_addr, addr, addrlen);
-       return rs_do_connect(us);
-}
-
-static int rs_post_write_msg(struct usocket *us,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags,
-                        uint64_t addr, uint32_t rkey)
-{
-       struct ibv_send_wr wr, *bad;
-
-       wr.wr_id = (uint64_t) imm_data;
-       wr.next = NULL;
-       wr.sg_list = sgl;
-       wr.num_sge = nsge;
-       wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
-       wr.send_flags = flags;
-       wr.imm_data = htonl(imm_data);
-       wr.wr.rdma.remote_addr = addr;
-       wr.wr.rdma.rkey = rkey;
-
-       return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad));
-}
-
-static int rs_post_write(struct usocket *us,
-                        struct ibv_sge *sgl, int nsge,
-                        uint64_t wr_id, int flags,
-                        uint64_t addr, uint32_t rkey)
-{
-       struct ibv_send_wr wr, *bad;
-
-       wr.wr_id = wr_id;
-       wr.next = NULL;
-       wr.sg_list = sgl;
-       wr.num_sge = nsge;
-       wr.opcode = IBV_WR_RDMA_WRITE;
-       wr.send_flags = flags;
-       wr.wr.rdma.remote_addr = addr;
-       wr.wr.rdma.rkey = rkey;
-
-       return rdma_seterrno(ibv_post_send(us->cm_id->qp, &wr, &bad));
-}
-
-/*
- * Update target SGE before sending data.  Otherwise the remote side may
- * update the entry before we do.
- */
-static int rs_write_data(struct usocket *us,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t length, int flags)
-{
-       uint64_t addr;
-       uint32_t rkey;
-
-       us->sseq_no++;
-       us->sqe_avail--;
-       us->sbuf_bytes_avail -= length;
-
-       addr = us->target_sgl[us->target_sge].addr;
-       rkey = us->target_sgl[us->target_sge].key;
-
-       us->target_sgl[us->target_sge].addr += length;
-       us->target_sgl[us->target_sge].length -= length;
-
-       if (!us->target_sgl[us->target_sge].length) {
-               if (++us->target_sge == RS_SGL_SIZE)
-                       us->target_sge = 0;
-       }
-
-       return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-                                flags, addr, rkey);
-}
-
-static int rs_write_direct(struct usocket *us, struct rs_iomap *iom, uint64_t offset,
-                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
-{
-       uint64_t addr;
-
-       us->sqe_avail--;
-       us->sbuf_bytes_avail -= length;
-
-       addr = iom->sge.addr + offset - iom->offset;
-       return rs_post_write(us, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
-                            flags, addr, iom->sge.key);
-}
-
-static int rs_write_iomap(struct usocket *us, struct rs_iomap_mr *iomr,
-                         struct ibv_sge *sgl, int nsge, int flags)
-{
-       uint64_t addr;
-
-       us->sseq_no++;
-       us->sqe_avail--;
-       us->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-
-       addr = us->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-       return rs_post_write_msg(us, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-                                flags, addr, us->remote_iomap.key);
-}
-
-static uint32_t rs_sbuf_left(struct usocket *us)
-{
-       return (uint32_t) (((uint64_t) (uintptr_t) &us->sbuf[us->sbuf_size]) -
-                          us->ssgl[0].addr);
-}
-
-static void rs_send_credits(struct usocket *us)
-{
-       struct ibv_sge ibsge;
-       struct rs_sge sge;
-
-       us->ctrl_avail--;
-       us->rseq_comp = us->rseq_no + (us->rq_size >> 1);
-       if (us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) {
-               if (!(us->opts & RS_OPT_SWAP_SGL)) {
-                       sge.addr = (uintptr_t) &us->rbuf[us->rbuf_free_offset];
-                       sge.key = us->rmr->rkey;
-                       sge.length = us->rbuf_size >> 1;
-               } else {
-                       sge.addr = bswap_64((uintptr_t) &us->rbuf[us->rbuf_free_offset]);
-                       sge.key = bswap_32(us->rmr->rkey);
-                       sge.length = bswap_32(us->rbuf_size >> 1);
-               }
-
-               ibsge.addr = (uintptr_t) &sge;
-               ibsge.lkey = 0;
-               ibsge.length = sizeof(sge);
-
-               rs_post_write_msg(us, &ibsge, 1,
-                                 rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size),
-                                 IBV_SEND_INLINE,
-                                 us->remote_sgl.addr +
-                                 us->remote_sge * sizeof(struct rs_sge),
-                                 us->remote_sgl.key);
-
-               us->rbuf_bytes_avail -= us->rbuf_size >> 1;
-               us->rbuf_free_offset += us->rbuf_size >> 1;
-               if (us->rbuf_free_offset >= us->rbuf_size)
-                       us->rbuf_free_offset = 0;
-               if (++us->remote_sge == us->remote_sgl.length)
-                       us->remote_sge = 0;
-       } else {
-               rs_post_write_msg(us, NULL, 0,
-                                 rs_msg_set(RS_OP_SGL, us->rseq_no + us->rq_size),
-                                 0, 0, 0);
-       }
-}
-
-static int rs_give_credits(struct usocket *us)
-{
-       return ((us->rbuf_bytes_avail >= (us->rbuf_size >> 1)) ||
-               ((short) ((short) us->rseq_no - (short) us->rseq_comp) >= 0)) &&
-              us->ctrl_avail && (us->state & rs_connected);
-}
-
-static void rs_update_credits(struct usocket *us)
-{
-       if (rs_give_credits(us))
-               rs_send_credits(us);
-}
-
-static int rs_poll_cq(struct usocket *us)
-{
-       struct ibv_wc wc;
-       uint32_t imm_data;
-       int ret, rcnt = 0;
-
-       while ((ret = ibv_poll_cq(us->cm_id->recv_cq, 1, &wc)) > 0) {
-               if (wc.wr_id == RS_RECV_WR_ID) {
-                       if (wc.status != IBV_WC_SUCCESS)
-                               continue;
-                       rcnt++;
-
-                       imm_data = ntohl(wc.imm_data);
-                       switch (rs_msg_op(imm_data)) {
-                       case RS_OP_SGL:
-                               us->sseq_comp = (uint16_t) rs_msg_data(imm_data);
-                               break;
-                       case RS_OP_IOMAP_SGL:
-                               /* The iomap was updated, that's nice to know. */
-                               break;
-                       case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
-                                       us->state = rs_disconnected;
-                                       return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-                                       us->state &= ~rs_connect_rd;
-                               }
-                               break;
-                       case RS_OP_WRITE:
-                               /* We really shouldn't be here. */
-                               break;
-                       default:
-                               us->rmsg[us->rmsg_tail].op = rs_msg_op(imm_data);
-                               us->rmsg[us->rmsg_tail].data = rs_msg_data(imm_data);
-                               if (++us->rmsg_tail == us->rq_size + 1)
-                                       us->rmsg_tail = 0;
-                               break;
-                       }
-               } else {
-                       switch  (rs_msg_op((uint32_t) wc.wr_id)) {
-                       case RS_OP_SGL:
-                               us->ctrl_avail++;
-                               break;
-                       case RS_OP_CTRL:
-                               us->ctrl_avail++;
-                               if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
-                                       us->state = rs_disconnected;
-                               break;
-                       case RS_OP_IOMAP_SGL:
-                               us->sqe_avail++;
-                               us->sbuf_bytes_avail += sizeof(struct rs_iomap);
-                               break;
-                       default:
-                               us->sqe_avail++;
-                               us->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
-                               break;
-                       }
-                       if (wc.status != IBV_WC_SUCCESS && (us->state & rs_connected)) {
-                               us->state = rs_error;
-                               us->err = EIO;
-                       }
-               }
-       }
-
-       if (us->state & rs_connected) {
-               while (!ret && rcnt--)
-                       ret = rs_post_recv(us);
-
-               if (ret) {
-                       us->state = rs_error;
-                       us->err = errno;
-               }
-       }
-       return ret;
-}
-
-static int rs_get_cq_event(struct usocket *us)
-{
-       struct ibv_cq *cq;
-       void *context;
-       int ret;
-
-       if (!us->cq_armed)
-               return 0;
-
-       ret = ibv_get_cq_event(us->cm_id->recv_cq_channel, &cq, &context);
-       if (!ret) {
-               ibv_ack_cq_events(us->cm_id->recv_cq, 1);
-               us->cq_armed = 0;
-       } else if (errno != EAGAIN) {
-               us->state = rs_error;
-       }
-
-       return ret;
-}
-
-/*
- * Although we serialize rsend and rrecv calls with respect to themselves,
- * both calls may run simultaneously and need to poll the CQ for completions.
- * We need to serialize access to the CQ, but rsend and rrecv need to
- * allow each other to make forward progress.
- *
- * For example, rsend may need to wait for credits from the remote side,
- * which could be stalled until the remote process calls rrecv.  This should
- * not block rrecv from receiving data from the remote side however.
- *
- * We handle this by using two locks.  The cq_lock protects against polling
- * the CQ and processing completions.  The cq_wait_lock serializes access to
- * waiting on the CQ.
- */
-static int rs_process_cq(struct usocket *us, int nonblock, int (*test)(struct usocket *us))
-{
-       int ret;
-
-       fastlock_acquire(&us->cq_lock);
-       do {
-               rs_update_credits(us);
-               ret = rs_poll_cq(us);
-               if (test(us)) {
-                       ret = 0;
-                       break;
-               } else if (ret) {
-                       break;
-               } else if (nonblock) {
-                       ret = ERR(EWOULDBLOCK);
-               } else if (!us->cq_armed) {
-                       ibv_req_notify_cq(us->cm_id->recv_cq, 0);
-                       us->cq_armed = 1;
-               } else {
-                       rs_update_credits(us);
-                       fastlock_acquire(&us->cq_wait_lock);
-                       fastlock_release(&us->cq_lock);
-
-                       ret = rs_get_cq_event(us);
-                       fastlock_release(&us->cq_wait_lock);
-                       fastlock_acquire(&us->cq_lock);
-               }
-       } while (!ret);
-
-       rs_update_credits(us);
-       fastlock_release(&us->cq_lock);
-       return ret;
-}
-
-static int rs_get_comp(struct usocket *us, int nonblock, int (*test)(struct usocket *us))
-{
-       struct timeval s, e;
-       uint32_t poll_time = 0;
-       int ret;
-
-       do {
-               ret = rs_process_cq(us, 1, test);
-               if (!ret || nonblock || errno != EWOULDBLOCK)
-                       return ret;
-
-               if (!poll_time)
-                       gettimeofday(&s, NULL);
-
-               gettimeofday(&e, NULL);
-               poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
-                           (e.tv_usec - s.tv_usec) + 1;
-       } while (poll_time <= polling_time);
-
-       ret = rs_process_cq(us, 0, test);
-       return ret;
-}
-
-static int rs_nonblocking(struct usocket *us, int flags)
-{
-       return (us->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-}
-
-static int rs_is_cq_armed(struct usocket *us)
-{
-       return us->cq_armed;
-}
-
-static int rs_poll_all(struct usocket *us)
-{
-       return 1;
-}
-
-/*
- * We use hardware flow control to prevent over running the remote
- * receive queue.  However, data transfers still require space in
- * the remote rmsg queue, or we risk losing notification that data
- * has been transfered.
- *
- * Be careful with race conditions in the check below.  The target SGL
- * may be updated by a remote RDMA write.
- */
-static int rs_can_send(struct usocket *us)
-{
-       return us->sqe_avail && (us->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-              (us->sseq_no != us->sseq_comp) &&
-              (us->target_sgl[us->target_sge].length != 0);
-}
-
-static int rs_conn_can_send(struct usocket *us)
-{
-       return rs_can_send(us) || !(us->state & rs_connect_wr);
-}
-
-static int rs_conn_can_send_ctrl(struct usocket *us)
-{
-       return us->ctrl_avail || !(us->state & rs_connected);
-}
-
-static int rs_have_rdata(struct usocket *us)
-{
-       return (us->rmsg_head != us->rmsg_tail);
-}
-
-static int rs_conn_have_rdata(struct usocket *us)
-{
-       return rs_have_rdata(us) || !(us->state & rs_connect_rd);
-}
-
-static int rs_conn_all_sends_done(struct usocket *us)
-{
-       return ((us->sqe_avail + us->ctrl_avail) == us->sq_size) ||
-              !(us->state & rs_connected);
-}
-
-static ssize_t rs_peek(struct usocket *us, void *buf, size_t len)
-{
-       size_t left = len;
-       uint32_t end_size, rsize;
-       int rmsg_head, rbuf_offset;
-
-       rmsg_head = us->rmsg_head;
-       rbuf_offset = us->rbuf_offset;
-
-       for (; left && (rmsg_head != us->rmsg_tail); left -= rsize) {
-               if (left < us->rmsg[rmsg_head].data) {
-                       rsize = left;
-               } else {
-                       rsize = us->rmsg[rmsg_head].data;
-                       if (++rmsg_head == us->rq_size + 1)
-                               rmsg_head = 0;
-               }
-
-               end_size = us->rbuf_size - rbuf_offset;
-               if (rsize > end_size) {
-                       memcpy(buf, &us->rbuf[rbuf_offset], end_size);
-                       rbuf_offset = 0;
-                       buf += end_size;
-                       rsize -= end_size;
-                       left -= end_size;
-               }
-               memcpy(buf, &us->rbuf[rbuf_offset], rsize);
-               rbuf_offset += rsize;
-               buf += rsize;
-       }
-
-       return len - left;
-}
-
-/*
- * Continue to receive any queued data even if the remote side has disconnected.
- */
-ssize_t rrecv(int socket, void *buf, size_t len, int flags)
-{
-       struct usocket *us;
-       size_t left = len;
-       uint32_t end_size, rsize;
-       int ret;
-
-       us = idm_at(&idm, socket);
-       if (us->state & rs_opening) {
-               ret = rs_do_connect(us);
-               if (ret) {
-                       if (errno == EINPROGRESS)
-                               errno = EAGAIN;
-                       return ret;
-               }
-       }
-       fastlock_acquire(&us->rlock);
-       do {
-               if (!rs_have_rdata(us)) {
-                       ret = rs_get_comp(us, rs_nonblocking(us, flags),
-                                         rs_conn_have_rdata);
-                       if (ret)
-                               break;
-               }
-
-               ret = 0;
-               if (flags & MSG_PEEK) {
-                       left = len - rs_peek(us, buf, left);
-                       break;
-               }
-
-               for (; left && rs_have_rdata(us); left -= rsize) {
-                       if (left < us->rmsg[us->rmsg_head].data) {
-                               rsize = left;
-                               us->rmsg[us->rmsg_head].data -= left;
-                       } else {
-                               us->rseq_no++;
-                               rsize = us->rmsg[us->rmsg_head].data;
-                               if (++us->rmsg_head == us->rq_size + 1)
-                                       us->rmsg_head = 0;
-                       }
-
-                       end_size = us->rbuf_size - us->rbuf_offset;
-                       if (rsize > end_size) {
-                               memcpy(buf, &us->rbuf[us->rbuf_offset], end_size);
-                               us->rbuf_offset = 0;
-                               buf += end_size;
-                               rsize -= end_size;
-                               left -= end_size;
-                               us->rbuf_bytes_avail += end_size;
-                       }
-                       memcpy(buf, &us->rbuf[us->rbuf_offset], rsize);
-                       us->rbuf_offset += rsize;
-                       buf += rsize;
-                       us->rbuf_bytes_avail += rsize;
-               }
-
-       } while (left && (flags & MSG_WAITALL) && (us->state & rs_connect_rd));
-
-       fastlock_release(&us->rlock);
-       return ret ? ret : len - left;
-}
-
-ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
-                 struct sockaddr *src_addr, socklen_t *addrlen)
-{
-       int ret;
-
-       ret = rrecv(socket, buf, len, flags);
-       if (ret > 0 && src_addr)
-               rgetpeername(socket, src_addr, addrlen);
-
-       return ret;
-}
-
-/*
- * Simple, straightforward implementation for now that only tries to fill
- * in the first vector.
- */
-static ssize_t rrecvv(int socket, const struct iovec *iov, int iovcnt, int flags)
-{
-       return rrecv(socket, iov[0].iov_base, iov[0].iov_len, flags);
-}
-
-ssize_t rrecvmsg(int socket, struct msghdr *msg, int flags)
-{
-       if (msg->msg_control && msg->msg_controllen)
-               return ERR(ENOTSUP);
-
-       return rrecvv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
-}
-
-ssize_t rread(int socket, void *buf, size_t count)
-{
-       return rrecv(socket, buf, count, 0);
-}
-
-ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
-{
-       return rrecvv(socket, iov, iovcnt, 0);
-}
-
-static int rs_send_iomaps(struct usocket *us, int flags)
-{
-       struct rs_iomap_mr *iomr;
-       struct ibv_sge sge;
-       struct rs_iomap iom;
-       int ret;
-
-       fastlock_acquire(&us->iomap_lock);
-       while (!dlist_empty(&us->iomap_queue)) {
-               if (!rs_can_send(us)) {
-                       ret = rs_get_comp(us, rs_nonblocking(us, flags),
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
-                       if (!(us->state & rs_connect_wr)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-               }
-
-               iomr = container_of(us->iomap_queue.next, struct rs_iomap_mr, entry);
-               if (!(us->opts & RS_OPT_SWAP_SGL)) {
-                       iom.offset = iomr->offset;
-                       iom.sge.addr = (uintptr_t) iomr->mr->addr;
-                       iom.sge.length = iomr->mr->length;
-                       iom.sge.key = iomr->mr->rkey;
-               } else {
-                       iom.offset = bswap_64(iomr->offset);
-                       iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
-                       iom.sge.length = bswap_32(iomr->mr->length);
-                       iom.sge.key = bswap_32(iomr->mr->rkey);
-               }
-
-               if (us->sq_inline >= sizeof iom) {
-                       sge.addr = (uintptr_t) &iom;
-                       sge.length = sizeof iom;
-                       sge.lkey = 0;
-                       ret = rs_write_iomap(us, iomr, &sge, 1, IBV_SEND_INLINE);
-               } else if (rs_sbuf_left(us) >= sizeof iom) {
-                       memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom, sizeof iom);
-                       us->ssgl[0].length = sizeof iom;
-                       ret = rs_write_iomap(us, iomr, us->ssgl, 1, 0);
-                       if (rs_sbuf_left(us) > sizeof iom)
-                               us->ssgl[0].addr += sizeof iom;
-                       else
-                               us->ssgl[0].addr = (uintptr_t) us->sbuf;
-               } else {
-                       us->ssgl[0].length = rs_sbuf_left(us);
-                       memcpy((void *) (uintptr_t) us->ssgl[0].addr, &iom,
-                               us->ssgl[0].length);
-                       us->ssgl[1].length = sizeof iom - us->ssgl[0].length;
-                       memcpy(us->sbuf, ((void *) &iom) + us->ssgl[0].length,
-                              us->ssgl[1].length);
-                       ret = rs_write_iomap(us, iomr, us->ssgl, 2, 0);
-                       us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length;
-               }
-               dlist_remove(&iomr->entry);
-               dlist_insert_tail(&iomr->entry, &us->iomap_list);
-               if (ret)
-                       break;
-       }
-
-       us->iomap_pending = !dlist_empty(&us->iomap_queue);
-       fastlock_release(&us->iomap_lock);
-       return ret;
-}
-
-/*
- * We overlap sending the data, by posting a small work request immediately,
- * then increasing the size of the send on each iteration.
- */
-ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-{
-       struct usocket *us;
-       struct ibv_sge sge;
-       size_t left = len;
-       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
-       int ret = 0;
-
-       us = idm_at(&idm, socket);
-       if (us->state & rs_opening) {
-               ret = rs_do_connect(us);
-               if (ret) {
-                       if (errno == EINPROGRESS)
-                               errno = EAGAIN;
-                       return ret;
-               }
-       }
-
-       fastlock_acquire(&us->slock);
-       if (us->iomap_pending) {
-               ret = rs_send_iomaps(us, flags);
-               if (ret)
-                       goto out;
-       }
-       for (; left; left -= xfer_size, buf += xfer_size) {
-               if (!rs_can_send(us)) {
-                       ret = rs_get_comp(us, rs_nonblocking(us, flags),
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
-                       if (!(us->state & rs_connect_wr)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-               }
-
-               if (olen < left) {
-                       xfer_size = olen;
-                       if (olen < RS_MAX_TRANSFER)
-                               olen <<= 1;
-               } else {
-                       xfer_size = left;
-               }
-
-               if (xfer_size > us->sbuf_bytes_avail)
-                       xfer_size = us->sbuf_bytes_avail;
-               if (xfer_size > us->target_sgl[us->target_sge].length)
-                       xfer_size = us->target_sgl[us->target_sge].length;
-
-               if (xfer_size <= us->sq_inline) {
-                       sge.addr = (uintptr_t) buf;
-                       sge.length = xfer_size;
-                       sge.lkey = 0;
-                       ret = rs_write_data(us, &sge, 1, xfer_size, IBV_SEND_INLINE);
-               } else if (xfer_size <= rs_sbuf_left(us)) {
-                       memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf, xfer_size);
-                       us->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(us, us->ssgl, 1, xfer_size, 0);
-                       if (xfer_size < rs_sbuf_left(us))
-                               us->ssgl[0].addr += xfer_size;
-                       else
-                               us->ssgl[0].addr = (uintptr_t) us->sbuf;
-               } else {
-                       us->ssgl[0].length = rs_sbuf_left(us);
-                       memcpy((void *) (uintptr_t) us->ssgl[0].addr, buf,
-                               us->ssgl[0].length);
-                       us->ssgl[1].length = xfer_size - us->ssgl[0].length;
-                       memcpy(us->sbuf, buf + us->ssgl[0].length, us->ssgl[1].length);
-                       ret = rs_write_data(us, us->ssgl, 2, xfer_size, 0);
-                       us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length;
-               }
-               if (ret)
-                       break;
-       }
-out:
-       fastlock_release(&us->slock);
-
-       return (ret && left == len) ? ret : len - left;
-}
-
-ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-               const struct sockaddr *dest_addr, socklen_t addrlen)
-{
-       if (dest_addr || addrlen)
-               return ERR(EISCONN);
-
-       return rsend(socket, buf, len, flags);
-}
-
-static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-{
-       size_t size;
-
-       while (len) {
-               size = (*iov)->iov_len - *offset;
-               if (size > len) {
-                       memcpy (dst, (*iov)->iov_base + *offset, len);
-                       *offset += len;
-                       break;
-               }
-
-               memcpy(dst, (*iov)->iov_base + *offset, size);
-               len -= size;
-               dst += size;
-               (*iov)++;
-               *offset = 0;
-       }
-}
-
-static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags)
-{
-       struct usocket *us;
-       const struct iovec *cur_iov;
-       size_t left, len, offset = 0;
-       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
-       int i, ret = 0;
-
-       us = idm_at(&idm, socket);
-       if (us->state & rs_opening) {
-               ret = rs_do_connect(us);
-               if (ret) {
-                       if (errno == EINPROGRESS)
-                               errno = EAGAIN;
-                       return ret;
-               }
-       }
-
-       cur_iov = iov;
-       len = iov[0].iov_len;
-       for (i = 1; i < iovcnt; i++)
-               len += iov[i].iov_len;
-       left = len;
-
-       fastlock_acquire(&us->slock);
-       if (us->iomap_pending) {
-               ret = rs_send_iomaps(us, flags);
-               if (ret)
-                       goto out;
-       }
-       for (; left; left -= xfer_size) {
-               if (!rs_can_send(us)) {
-                       ret = rs_get_comp(us, rs_nonblocking(us, flags),
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
-                       if (!(us->state & rs_connect_wr)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-               }
-
-               if (olen < left) {
-                       xfer_size = olen;
-                       if (olen < RS_MAX_TRANSFER)
-                               olen <<= 1;
-               } else {
-                       xfer_size = left;
-               }
-
-               if (xfer_size > us->sbuf_bytes_avail)
-                       xfer_size = us->sbuf_bytes_avail;
-               if (xfer_size > us->target_sgl[us->target_sge].length)
-                       xfer_size = us->target_sgl[us->target_sge].length;
-
-               if (xfer_size <= rs_sbuf_left(us)) {
-                       rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr,
-                                   &cur_iov, &offset, xfer_size);
-                       us->ssgl[0].length = xfer_size;
-                       ret = rs_write_data(us, us->ssgl, 1, xfer_size,
-                                           xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0);
-                       if (xfer_size < rs_sbuf_left(us))
-                               us->ssgl[0].addr += xfer_size;
-                       else
-                               us->ssgl[0].addr = (uintptr_t) us->sbuf;
-               } else {
-                       us->ssgl[0].length = rs_sbuf_left(us);
-                       rs_copy_iov((void *) (uintptr_t) us->ssgl[0].addr, &cur_iov,
-                                   &offset, us->ssgl[0].length);
-                       us->ssgl[1].length = xfer_size - us->ssgl[0].length;
-                       rs_copy_iov(us->sbuf, &cur_iov, &offset, us->ssgl[1].length);
-                       ret = rs_write_data(us, us->ssgl, 2, xfer_size,
-                                           xfer_size <= us->sq_inline ? IBV_SEND_INLINE : 0);
-                       us->ssgl[0].addr = (uintptr_t) us->sbuf + us->ssgl[1].length;
-               }
-               if (ret)
-                       break;
-       }
-out:
-       fastlock_release(&us->slock);
-
-       return (ret && left == len) ? ret : len - left;
-}
-
-ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
-{
-       if (msg->msg_control && msg->msg_controllen)
-               return ERR(ENOTSUP);
-
-       return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
-}
-
-ssize_t rwrite(int socket, const void *buf, size_t count)
-{
-       return rsend(socket, buf, count, 0);
-}
-
-ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt)
-{
-       return rsendv(socket, iov, iovcnt, 0);
-}
-
-static struct pollfd *rs_fds_alloc(nfds_t nfds)
-{
-       static __thread struct pollfd *rfds;
-       static __thread nfds_t rnfds;
-
-       if (nfds > rnfds) {
-               if (rfds)
-                       free(rfds);
-
-               rfds = malloc(sizeof *rfds * nfds);
-               rnfds = rfds ? nfds : 0;
-       }
-
-       return rfds;
-}
-
-static int rs_poll_rs(struct usocket *us, int events,
-                     int nonblock, int (*test)(struct usocket *us))
-{
-       struct pollfd fds;
-       short revents;
-       int ret;
-
-check_cq:
-       if ((us->state & rs_connected) || (us->state == rs_disconnected) ||
-           (us->state & rs_error)) {
-               rs_process_cq(us, nonblock, test);
-
-               revents = 0;
-               if ((events & POLLIN) && rs_conn_have_rdata(us))
-                       revents |= POLLIN;
-               if ((events & POLLOUT) && rs_can_send(us))
-                       revents |= POLLOUT;
-               if (!(us->state & rs_connected)) {
-                       if (us->state == rs_disconnected)
-                               revents |= POLLHUP;
-                       else
-                               revents |= POLLERR;
-               }
-
-               return revents;
-       }
-
-       if (us->state == rs_listening) {
-               fds.fd = us->cm_id->channel->fd;
-               fds.events = events;
-               fds.revents = 0;
-               poll(&fds, 1, 0);
-               return fds.revents;
-       }
-
-       if (us->state & rs_opening) {
-               ret = rs_do_connect(us);
-               if (ret) {
-                       if (errno == EINPROGRESS) {
-                               errno = 0;
-                               return 0;
-                       } else {
-                               return POLLOUT;
-                       }
-               }
-               goto check_cq;
-       }
-
-       if (us->state == rs_connect_error)
-               return (us->err && events & POLLOUT) ? POLLOUT : 0;
-
-       return 0;
-}
-
-static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
-{
-       struct usocket *us;
-       int i, cnt = 0;
-
-       for (i = 0; i < nfds; i++) {
-               us = idm_lookup(&idm, fds[i].fd);
-               if (us)
-                       fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all);
-               else
-                       poll(&fds[i], 1, 0);
-
-               if (fds[i].revents)
-                       cnt++;
-       }
-       return cnt;
-}
-
-static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
-{
-       struct usocket *us;
-       int i;
-
-       for (i = 0; i < nfds; i++) {
-               us = idm_lookup(&idm, fds[i].fd);
-               if (us) {
-                       fds[i].revents = rs_poll_rs(us, fds[i].events, 0, rs_is_cq_armed);
-                       if (fds[i].revents)
-                               return 1;
-
-                       if (us->state >= rs_connected)
-                               rfds[i].fd = us->cm_id->recv_cq_channel->fd;
-                       else
-                               rfds[i].fd = us->cm_id->channel->fd;
-
-                       rfds[i].events = POLLIN;
-               } else {
-                       rfds[i].fd = fds[i].fd;
-                       rfds[i].events = fds[i].events;
-               }
-               rfds[i].revents = 0;
-
-       }
-       return 0;
-}
-
-static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
-{
-       struct usocket *us;
-       int i, cnt = 0;
-
-       for (i = 0; i < nfds; i++) {
-               if (!rfds[i].revents)
-                       continue;
-
-               us = idm_lookup(&idm, fds[i].fd);
-               if (us) {
-                       rs_get_cq_event(us);
-                       fds[i].revents = rs_poll_rs(us, fds[i].events, 1, rs_poll_all);
-               } else {
-                       fds[i].revents = rfds[i].revents;
-               }
-               if (fds[i].revents)
-                       cnt++;
-       }
-       return cnt;
-}
-
-/*
- * We need to poll *all* fd's that the user specifies at least once.
- * Note that we may receive events on an usocket that may not be reported
- * to the user (e.g. connection events or credit updates).  Process those
- * events, then return to polling until we find ones of interest.
- */
-int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
-{
-       struct timeval s, e;
-       struct pollfd *rfds;
-       uint32_t poll_time = 0;
-       int ret;
-
-       do {
-               ret = rs_poll_check(fds, nfds);
-               if (ret || !timeout)
-                       return ret;
-
-               if (!poll_time)
-                       gettimeofday(&s, NULL);
-
-               gettimeofday(&e, NULL);
-               poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
-                           (e.tv_usec - s.tv_usec) + 1;
-       } while (poll_time <= polling_time);
-
-       rfds = rs_fds_alloc(nfds);
-       if (!rfds)
-               return ERR(ENOMEM);
-
-       do {
-               ret = rs_poll_arm(rfds, fds, nfds);
-               if (ret)
-                       break;
-
-               ret = poll(rfds, nfds, timeout);
-               if (ret <= 0)
-                       break;
-
-               ret = rs_poll_events(rfds, fds, nfds);
-       } while (!ret);
-
-       return ret;
-}
-
-static struct pollfd *
-rs_select_to_poll(int *nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds)
-{
-       struct pollfd *fds;
-       int fd, i = 0;
-
-       fds = calloc(*nfds, sizeof *fds);
-       if (!fds)
-               return NULL;
-
-       for (fd = 0; fd < *nfds; fd++) {
-               if (readfds && FD_ISSET(fd, readfds)) {
-                       fds[i].fd = fd;
-                       fds[i].events = POLLIN;
-               }
-
-               if (writefds && FD_ISSET(fd, writefds)) {
-                       fds[i].fd = fd;
-                       fds[i].events |= POLLOUT;
-               }
-
-               if (exceptfds && FD_ISSET(fd, exceptfds))
-                       fds[i].fd = fd;
-
-               if (fds[i].fd)
-                       i++;
-       }
-
-       *nfds = i;
-       return fds;
-}
-
-static int
-rs_poll_to_select(int nfds, struct pollfd *fds, fd_set *readfds,
-                 fd_set *writefds, fd_set *exceptfds)
-{
-       int i, cnt = 0;
-
-       for (i = 0; i < nfds; i++) {
-               if (readfds && (fds[i].revents & (POLLIN | POLLHUP))) {
-                       FD_SET(fds[i].fd, readfds);
-                       cnt++;
-               }
-
-               if (writefds && (fds[i].revents & POLLOUT)) {
-                       FD_SET(fds[i].fd, writefds);
-                       cnt++;
-               }
-
-               if (exceptfds && (fds[i].revents & ~(POLLIN | POLLOUT))) {
-                       FD_SET(fds[i].fd, exceptfds);
-                       cnt++;
-               }
-       }
-       return cnt;
-}
-
-static int rs_convert_timeout(struct timeval *timeout)
-{
-       return !timeout ? -1 :
-               timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
-}
-
-int rselect(int nfds, fd_set *readfds, fd_set *writefds,
-           fd_set *exceptfds, struct timeval *timeout)
-{
-       struct pollfd *fds;
-       int ret;
-
-       fds = rs_select_to_poll(&nfds, readfds, writefds, exceptfds);
-       if (!fds)
-               return ERR(ENOMEM);
-
-       ret = rpoll(fds, nfds, rs_convert_timeout(timeout));
-
-       if (readfds)
-               FD_ZERO(readfds);
-       if (writefds)
-               FD_ZERO(writefds);
-       if (exceptfds)
-               FD_ZERO(exceptfds);
-
-       if (ret > 0)
-               ret = rs_poll_to_select(nfds, fds, readfds, writefds, exceptfds);
-
-       free(fds);
-       return ret;
-}
-
-/*
- * For graceful disconnect, notify the remote side that we're
- * disconnecting and wait until all outstanding sends complete.
- */
-int rshutdown(int socket, int how)
-{
-       struct usocket *us;
-       int ctrl, ret = 0;
-
-       us = idm_at(&idm, socket);
-       if (how == SHUT_RD) {
-               us->state &= ~rs_connect_rd;
-               return 0;
-       }
-
-       if (us->fd_flags & O_NONBLOCK)
-               rs_set_nonblocking(us, 0);
-
-       if (us->state & rs_connected) {
-               if (how == SHUT_RDWR) {
-                       ctrl = RS_CTRL_DISCONNECT;
-                       us->state &= ~(rs_connect_rd | rs_connect_wr);
-               } else {
-                       us->state &= ~rs_connect_wr;
-                       ctrl = (us->state & rs_connect_rd) ?
-                               RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
-               }
-               if (!us->ctrl_avail) {
-                       ret = rs_process_cq(us, 0, rs_conn_can_send_ctrl);
-                       if (ret)
-                               return ret;
-               }
-
-               if ((us->state & rs_connected) && us->ctrl_avail) {
-                       us->ctrl_avail--;
-                       ret = rs_post_write_msg(us, NULL, 0,
-                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-               }
-       }
-
-       if (us->state & rs_connected)
-               rs_process_cq(us, 0, rs_conn_all_sends_done);
-
-       if ((us->fd_flags & O_NONBLOCK) && (us->state & rs_connected))
-               rs_set_nonblocking(us, 1);
-
-       return 0;
-}
-
-int rclose(int socket)
-{
-       struct usocket *us;
-
-       us = idm_at(&idm, socket);
-       if (us->state & rs_connected)
-               rshutdown(socket, SHUT_RDWR);
-
-       rs_free(us);
-       return 0;
-}
-
-static void rs_copy_addr(struct sockaddr *dst, struct sockaddr *src, socklen_t *len)
-{
-       socklen_t size;
-
-       if (src->sa_family == AF_INET) {
-               size = min(*len, sizeof(struct sockaddr_in));
-               *len = sizeof(struct sockaddr_in);
-       } else {
-               size = min(*len, sizeof(struct sockaddr_in6));
-               *len = sizeof(struct sockaddr_in6);
-       }
-       memcpy(dst, src, size);
-}
-
-int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
-{
-       struct usocket *us;
-
-       us = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_peer_addr(us->cm_id), addrlen);
-       return 0;
-}
-
-int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-{
-       struct usocket *us;
-
-       us = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_local_addr(us->cm_id), addrlen);
-       return 0;
-}
-
-int rsetsockopt(int socket, int level, int optname,
-               const void *optval, socklen_t optlen)
-{
-       struct usocket *us;
-       int ret, opt_on = 0;
-       uint64_t *opts = NULL;
-
-       ret = ERR(ENOTSUP);
-       us = idm_at(&idm, socket);
-       switch (level) {
-       case SOL_SOCKET:
-               opts = &us->so_opts;
-               switch (optname) {
-               case SO_REUSEADDR:
-                       ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID,
-                                             RDMA_OPTION_ID_REUSEADDR,
-                                             (void *) optval, optlen);
-                       if (ret && ((errno == ENOSYS) || ((us->state != rs_init) &&
-                           us->cm_id->context &&
-                           (us->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
-                               ret = 0;
-                       opt_on = *(int *) optval;
-                       break;
-               case SO_RCVBUF:
-                       if (!us->rbuf)
-                               us->rbuf_size = (*(uint32_t *) optval) << 1;
-                       ret = 0;
-                       break;
-               case SO_SNDBUF:
-                       if (!us->sbuf)
-                               us->sbuf_size = (*(uint32_t *) optval) << 1;
-                       if (us->sbuf_size < RS_SNDLOWAT)
-                               us->sbuf_size = RS_SNDLOWAT << 1;
-                       ret = 0;
-                       break;
-               case SO_LINGER:
-                       /* Invert value so default so_opt = 0 is on */
-                       opt_on =  !((struct linger *) optval)->l_onoff;
-                       ret = 0;
-                       break;
-               case SO_KEEPALIVE:
-                       opt_on = *(int *) optval;
-                       ret = 0;
-                       break;
-               case SO_OOBINLINE:
-                       opt_on = *(int *) optval;
-                       ret = 0;
-                       break;
-               default:
-                       break;
-               }
-               break;
-       case IPPROTO_TCP:
-               opts = &us->tcp_opts;
-               switch (optname) {
-               case TCP_NODELAY:
-                       opt_on = *(int *) optval;
-                       ret = 0;
-                       break;
-               case TCP_MAXSEG:
-                       ret = 0;
-                       break;
-               default:
-                       break;
-               }
-               break;
-       case IPPROTO_IPV6:
-               opts = &us->ipv6_opts;
-               switch (optname) {
-               case IPV6_V6ONLY:
-                       ret = rdma_set_option(us->cm_id, RDMA_OPTION_ID,
-                                             RDMA_OPTION_ID_AFONLY,
-                                             (void *) optval, optlen);
-                       opt_on = *(int *) optval;
-                       break;
-               default:
-                       break;
-               }
-               break;
-       case SOL_RDMA:
-               if (us->state >= rs_opening) {
-                       ret = ERR(EINVAL);
-                       break;
-               }
-
-               switch (optname) {
-               case RDMA_SQSIZE:
-                       us->sq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE);
-                       break;
-               case RDMA_RQSIZE:
-                       us->rq_size = min((*(uint32_t *) optval), RS_QP_MAX_SIZE);
-                       break;
-               case RDMA_INLINE:
-                       us->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE);
-                       if (us->sq_inline < RS_MIN_INLINE)
-                               us->sq_inline = RS_MIN_INLINE;
-                       break;
-               case RDMA_IOMAPSIZE:
-                       us->target_iomap_size = (uint16_t) rs_scale_to_value(
-                               (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
-                       break;
-               default:
-                       break;
-               }
-               break;
-       default:
-               break;
-       }
-
-       if (!ret && opts) {
-               if (opt_on)
-                       *opts |= (1 << optname);
-               else
-                       *opts &= ~(1 << optname);
-       }
-
-       return ret;
-}
-
-int rgetsockopt(int socket, int level, int optname,
-               void *optval, socklen_t *optlen)
-{
-       struct usocket *us;
-       int ret = 0;
-
-       us = idm_at(&idm, socket);
-       switch (level) {
-       case SOL_SOCKET:
-               switch (optname) {
-               case SO_REUSEADDR:
-               case SO_KEEPALIVE:
-               case SO_OOBINLINE:
-                       *((int *) optval) = !!(us->so_opts & (1 << optname));
-                       *optlen = sizeof(int);
-                       break;
-               case SO_RCVBUF:
-                       *((int *) optval) = us->rbuf_size;
-                       *optlen = sizeof(int);
-                       break;
-               case SO_SNDBUF:
-                       *((int *) optval) = us->sbuf_size;
-                       *optlen = sizeof(int);
-                       break;
-               case SO_LINGER:
-                       /* Value is inverted so default so_opt = 0 is on */
-                       ((struct linger *) optval)->l_onoff =
-                                       !(us->so_opts & (1 << optname));
-                       ((struct linger *) optval)->l_linger = 0;
-                       *optlen = sizeof(struct linger);
-                       break;
-               case SO_ERROR:
-                       *((int *) optval) = us->err;
-                       *optlen = sizeof(int);
-                       us->err = 0;
-                       break;
-               default:
-                       ret = ENOTSUP;
-                       break;
-               }
-               break;
-       case IPPROTO_TCP:
-               switch (optname) {
-               case TCP_NODELAY:
-                       *((int *) optval) = !!(us->tcp_opts & (1 << optname));
-                       *optlen = sizeof(int);
-                       break;
-               case TCP_MAXSEG:
-                       *((int *) optval) = (us->cm_id && us->cm_id->route.num_paths) ?
-                                           1 << (7 + us->cm_id->route.path_rec->mtu) :
-                                           2048;
-                       *optlen = sizeof(int);
-                       break;
-               default:
-                       ret = ENOTSUP;
-                       break;
-               }
-               break;
-       case IPPROTO_IPV6:
-               switch (optname) {
-               case IPV6_V6ONLY:
-                       *((int *) optval) = !!(us->ipv6_opts & (1 << optname));
-                       *optlen = sizeof(int);
-                       break;
-               default:
-                       ret = ENOTSUP;
-                       break;
-               }
-               break;
-       case SOL_RDMA:
-               switch (optname) {
-               case RDMA_SQSIZE:
-                       *((int *) optval) = us->sq_size;
-                       *optlen = sizeof(int);
-                       break;
-               case RDMA_RQSIZE:
-                       *((int *) optval) = us->rq_size;
-                       *optlen = sizeof(int);
-                       break;
-               case RDMA_INLINE:
-                       *((int *) optval) = us->sq_inline;
-                       *optlen = sizeof(int);
-                       break;
-               case RDMA_IOMAPSIZE:
-                       *((int *) optval) = us->target_iomap_size;
-                       *optlen = sizeof(int);
-                       break;
-               default:
-                       ret = ENOTSUP;
-                       break;
-               }
-               break;
-       default:
-               ret = ENOTSUP;
-               break;
-       }
-
-       return rdma_seterrno(ret);
-}
-
-int ufcntl(int socket, int cmd, ... /* arg */ )
-{
-       struct usocket *us;
-       va_list args;
-       long param;
-       int ret = 0;
-
-       us = idm_at(&idm, socket);
-       va_start(args, cmd);
-       switch (cmd) {
-       case F_GETFL:
-               ret = (int) us->fd_flags;
-               break;
-       case F_SETFL:
-               param = va_arg(args, long);
-               if (param & O_NONBLOCK)
-                       ret = rs_set_nonblocking(us, O_NONBLOCK);
-
-               if (!ret)
-                       us->fd_flags |= param;
-               break;
-       default:
-               ret = ERR(ENOTSUP);
-               break;
-       }
-       va_end(args);
-       return ret;
-}