]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of dsocket
authorSean Hefty <sean.hefty@intel.com>
Mon, 26 Nov 2012 20:10:16 +0000 (12:10 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 26 Nov 2012 20:10:16 +0000 (12:10 -0800)
docs/rsocket
src/rsocket.c

index 03d49df7824fca62f5a670f5df5870df25e298a4..a66020845a7dc6df908bddc65df5c03face905df 100644 (file)
@@ -194,7 +194,7 @@ application's buffer.
 
 Datagram Overview
 -----------------
-THe rsocket API supports datagram sockets.  Datagram support is handled through an
+The rsocket API supports datagram sockets.  Datagram support is handled through an
 entirely different protocol and internal implementation.  Unlike connected rsockets,
 datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
 socket may use any number of network (IP) addresses, including those which map to
@@ -204,32 +204,19 @@ UDP socket, plus zero or more UD QPs.
 
 Rsockets uses headers inserted before user data sent over UDP sockets to resolve
 remote UD QP numbers.  When a user first attempts to send a datagram to a remote
-address (IP and UDP port), rsockets will take the following steps.  First, it
-will allocate and store the destination address into a lookup table for future
-use.  Then it will resolve which local network address should be used when sending
-to the specified destination.  It will allocate a UD QP on the RDMA device
-associated with the local address.  Finally, rsockets will send the user's
-datagram to the remote UDP socket, but inserting a header before the datagram.
-The header specifies the UD QP number associated with the local network address
-(IP and UDP port) of the send.
-
-Under many circumstances, the rsocket UDP header also provides enough path
-information for the receiver to send the reply using a UD QP.  However, certain
-fabric topologies may require contacting a subnet administrator.  Whenever
-rsockets lacks sufficient information to send data to a remote peer using
-a UD QP, it will automatically fall back to using a standard UDP socket.  This
-helps minimize the latency for performing a given send, while lenghty address
-and route resolution protocols complete.
-
-Currently, rsockets allocates a single UD QP, with an infrastructure is provided
-to support more.  Future enhancements may add support for multiple UD QPs, each
-associated with a single network (IP) address.  Multiplexing different network
-addresses over a single UD QP and using shared receive queues are other possible
-enhancements.
-
-Because rsockets may use multiple UD QPs, buffer management can become an issue.
-The total size of receive buffers is specified by the mem_default configuration
-files, but may be overridden with rsetsockopt.  The buffer is divided into
-MTU sized messages and distributed among the allocated QPs.  As new QPs are
-created, the receive buffers may be redistributed by posting loopback sends
-on a QP in order to free the receive buffers for reposting to a different QP.  
\ No newline at end of file
+address (IP and UDP port), rsockets will take the following steps:
+
+1. Store the destination address into a lookup table.
+2. Resolve which local network address should be used when sending
+   to the specified destination.
+3. Allocate a UD QP on the RDMA device associated with the local address.
+4. Send the user's datagram to the remote UDP socket.
+
+A header is inserted before the user's datagram.  The header specifies the
+UD QP number associated with the local network address (IP and UDP port) of
+the send.
+
+A service thread is used to process messages received on the UDP socket.  This
+thread updates the rsocket lookup tables with the remote QPN and path record
+data.  The service thread forwards data received on the UDP socket to an
+rsocket QP.
\ No newline at end of file
index 0695d12d72304d465b4e26ea03f8895d42a9290b..a81b8f3915971eec0e802318d84e6698b5b6b425 100644 (file)
@@ -46,6 +46,7 @@
 #include <string.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
+#include <sys/epoll.h>
 
 #include <rdma/rdma_cma.h>
 #include <rdma/rdma_verbs.h>
@@ -116,6 +117,14 @@ enum {
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
 #define rs_msg_op(imm_data)   (imm_data >> 29)
 #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+#define RS_RECV_WR_ID (~((uint64_t) 0))
+
+#define DS_WR_RECV 0xFFFFFFFF
+#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
+#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
+#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
+#define ds_wr_length(wr_id) ((uint32_t) wr_id)
+#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
 
 enum {
        RS_CTRL_DISCONNECT,
@@ -129,8 +138,14 @@ struct rs_msg {
 
 struct ds_qp;
 
-struct ds_msg {
-       struct ds_qp *qp;
+struct ds_rmsg {
+       struct ds_qp    *qp;
+       uint32_t        offset;
+       uint32_t        length;
+};
+
+struct ds_smsg {
+       struct ds_smsg  *next;
 };
 
 struct rs_sge {
@@ -167,8 +182,6 @@ struct rs_conn_data {
        struct rs_sge     data_buf;
 };
 
-#define RS_RECV_WR_ID (~((uint64_t) 0))
-
 /*
  * rsocket states are ordered as passive, connecting, connected, disconnected.
  */
@@ -203,12 +216,11 @@ struct ds_qp {
        struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
 
+       struct ibv_mr     *smr;
        struct ibv_mr     *rmr;
        uint8_t           *rbuf;
 
-       struct ibv_mr     *smr;
-       uint16_t          lid;
-       uint8_t           sl;
+       int               cq_armed;
 };
 
 struct ds_dest {
@@ -231,6 +243,13 @@ struct rsocket {
                /* data stream */
                struct {
                        struct rdma_cm_id *cm_id;
+                       uint64_t          tcp_opts;
+
+                       int               ctrl_avail;
+                       uint16_t          sseq_no;
+                       uint16_t          sseq_comp;
+                       uint16_t          rseq_no;
+                       uint16_t          rseq_comp;
 
                        int               remote_sge;
                        struct rs_sge     remote_sgl;
@@ -242,63 +261,58 @@ struct rsocket {
                        void              *target_buffer_list;
                        volatile struct rs_sge    *target_sgl;
                        struct rs_iomap   *target_iomap;
+
+                       int               rbuf_bytes_avail;
+                       int               rbuf_free_offset;
+                       int               rbuf_offset;
+                       struct ibv_mr     *rmr;
+                       uint8_t           *rbuf;
+
+                       int               sbuf_bytes_avail;
+                       struct ibv_mr     *smr;
+                       struct ibv_sge    ssgl[2];
                };
                /* datagram */
                struct {
-                       dlist_t           qp_list;
+                       struct ds_qp      *qp_list;
                        void              *dest_map;
                        struct ds_dest    *conn_dest;
-                       struct pollfd     *fds;
-                       nfds_t            nfds;
-                       int               dsock;
-                       int               sbytes_needed;
+
+                       int               udp_sock;
+                       int               epfd;
+                       int               rqe_avail;
+                       struct ds_smsg    *smsg_free;
                };
        };
 
        int               opts;
        long              fd_flags;
        uint64_t          so_opts;
-       uint64_t          tcp_opts;
        uint64_t          ipv6_opts;
        int               state;
        int               cq_armed;
        int               retries;
        int               err;
 
-       int               ctrl_avail;
        int               sqe_avail;
-       int               sbuf_bytes_avail;
-       uint16_t          sseq_no;
-       uint16_t          sseq_comp;
+       uint32_t          sbuf_size;
        uint16_t          sq_size;
        uint16_t          sq_inline;
 
+       uint32_t          rbuf_size;
        uint16_t          rq_size;
-       uint16_t          rseq_no;
-       uint16_t          rseq_comp;
-       int               rbuf_bytes_avail;
-       int               rbuf_free_offset;
-       int               rbuf_offset;
        int               rmsg_head;
        int               rmsg_tail;
        union {
                struct rs_msg     *rmsg;
-               struct ds_msg     *dmsg;
+               struct ds_rmsg    *dmsg;
        };
 
+       uint8_t           *sbuf;
        struct rs_iomap_mr *remote_iomappings;
        dlist_entry       iomap_list;
        dlist_entry       iomap_queue;
        int               iomap_pending;
-
-       uint32_t          rbuf_size;
-       struct ibv_mr     *rmr;
-       uint8_t           *rbuf;
-
-       uint32_t          sbuf_size;
-       struct ibv_mr     *smr;
-       struct ibv_sge    ssgl[2];
-       uint8_t           *sbuf;
 };
 
 #define DS_UDP_TAG 0x5555555555555555ULL
@@ -306,12 +320,32 @@ struct rsocket {
 struct ds_udp_header {
        uint64_t          tag;
        uint8_t           version;
-       uint8_t           sl;
-       uint16_t          slid;
+       uint8_t           reserved[3];
        uint32_t          qpn;  /* upper 8-bits reserved */
 };
 
 
+#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
+
+static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
+{
+       if (!rs->qp_list)
+               list_init(&qp->list);
+       else
+               list_insert_head(&qp->list, &rs->qp_list->list);
+       rs->qp_list = *qp;
+}
+
+static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
+{
+       if (qp->list.next != qp->list) {
+               rs->qp_list = ds_next_qp(qp);
+               dlist_remove(&qp->list);
+       } else {
+               rs->qp_list = NULL;
+       }
+}
+
 static int rs_svc_run(void *arg)
 {
        return 0;
@@ -468,7 +502,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
 
        rs->type = type;
        rs->index = -1;
-       rs->dsock = -1;
+       rs->udp_sock = -1;
+       rs->epfd = -1;
+
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
@@ -496,18 +532,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
        return rs;
 }
 
-/*
- * TODO: Support datagram rsockets
- */
 static int rs_set_nonblocking(struct rsocket *rs, long arg)
 {
+       struct ds_qp *qp;
        int ret = 0;
 
-       if (rs->cm_id->recv_cq_channel)
-               ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+       if (rs->type == SOCK_STREAM) {
+               if (rs->cm_id->recv_cq_channel)
+                       ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
 
-       if (!ret && rs->state < rs_connected)
-               ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+               if (!ret && rs->state < rs_connected)
+                       ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
+       } else {
+               ret = fcntl(rs->epfd, F_SETFL, arg);
+
+               if (!ret && rs->qp_list) {
+                       qp = rs->qp_list;
+                       do {
+                               ret = fcntl(qp->cm_id->recv_cq_channel->fd,
+                                           F_SETFL, arg);
+                               qp = ds_next_qp(qp);
+                       } while (qp != rs->qp_list && !ret);
+               }
+       }
 
        return ret;
 }
@@ -531,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
                rs->rq_size = 2;
 }
 
+static void ds_set_qp_size(struct rsocket *rs)
+{
+       uint16_t max_size;
+
+       max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
+
+       if (rs->sq_size > max_size)
+               rs->sq_size = max_size;
+       if (rs->rq_size > max_size)
+               rs->rq_size = max_size;
+
+       if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
+               rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
+       else
+               rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+
+       if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
+               rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
+       else
+               rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
+}
+
 static int rs_init_bufs(struct rsocket *rs)
 {
        size_t len;
 
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
        if (!rs->rmsg)
-               return -1;
+               return ERR(ENOEMEM);
 
        rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
        if (!rs->sbuf)
-               return -1;
+               return ERR(ENOEMEM);
 
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
@@ -551,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
-               return -1;
+               return ERR(ENOEMEM);
 
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
@@ -564,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
 
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
-               return -1;
+               return ERR(ENOEMEM);
 
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
@@ -648,7 +717,7 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
        sge.length = RS_SNDLOWAT;
        sge.lkey = qp->rmr;
 
-       wr.wr_id = RS_RECV_WR_ID;
+       wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
        wr.next = NULL;
        wr.sg_list = &sge;
        wr.num_sge = 1;
@@ -736,7 +805,8 @@ static void ds_free_qp(struct ds_qp *qp)
 
        if (qp->cm_id) {
                if (qp->cm_id->qp) {
-                       dlist_remove(&qp->list);
+                       epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
+                                 qp->cm_id->recv_cq_channel->fd, NULL);
                        rdma_destroy_qp(qp->cm_id);
                }
                rdma_destroy_id(qp->cm_id);
@@ -744,31 +814,32 @@ static void ds_free_qp(struct ds_qp *qp)
        free(qp);
 }
 
-static void ds_free_qps(struct rsocket *rs)
+static void ds_free(struct rsocket *rs)
 {
        struct ds_qp *qp;
-       dlist_t *entry;
 
-       while (!dlist_empty(&rs->qp_list)) {
-               qp = container_of(rs->qp_list.next, struct ds_qp, list);
-               ds_free_qp(qp);
-       }
-}
-
-static void ds_free(struct rsocket *rs)
-{
        if (rs->state & (rs_readable | rs_writable))
                rs_svc_remove(rs);
 
-       if (rs->dsock >= 0)
-               close(rs->dsock);
+       if (rs->udp_sock >= 0)
+               close(rs->udp_sock);
 
        if (rs->index >= 0)
                rs_remove(rs);
 
-       ds_free_qps(rs);
-       if (rs->fds)
-               free(rs->fds);
+       if (rs->dmsg)
+               free(rs->dmsg);
+
+       if (rs->smsg)
+               free(rs->smsg);
+
+       while (rs->qp_list) {
+               ds_remove_qp(rs, rs->qp_list);
+               ds_free_qp(rs->qp_list);
+       }
+
+       if (rs->epfd >= 0)
+               close(rs->epfd);
 
        if (rs->sbuf)
                free(rs->sbuf);
@@ -875,15 +946,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
 
 static int ds_init(struct rsocket *rs, int domain)
 {
-       rs->dsock = socket(domain, SOCK_DGRAM, 0);
-       if (rs->dsock < 0)
-               return rs->dsock;
+       rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
+       if (rs->udp_sock < 0)
+               return rs->udp_sock;
 
-       rs->fds = calloc(1, sizeof *fds);
-       if (!rs->fds)
-               return ERR(ENOMEM);
-       rs->nfds = 1;
-       dlist_init(&rs->qp_list);
+       rs->epfd = epoll_create(0);
+       if (rs->epfd < 0)
+               return rs->epfd;
 
        return 0;
 }
@@ -916,7 +985,7 @@ int rsocket(int domain, int type, int protocol)
                if (ret)
                        goto err;
 
-               index = rs->dsock;
+               index = rs->udp_sock;
        }
 
        ret = rs_insert(rs, index);
@@ -941,7 +1010,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
                if (!ret)
                        rs->state = rs_bound;
        } else {
-               ret = bind(rs->dsock, addr, addrlen);
+               ret = bind(rs->udp_sock, addr, addrlen);
                if (!ret) {
                        ret = rs_svc_insert(rs);
                        if (!ret)
@@ -1130,21 +1199,30 @@ connected:
 
 static int ds_init_ep(struct rsocket *rs)
 {
-       int ret;
+       struct ds_smsg *msg;
+       int i, ret;
 
-       rs_set_qp_size(rs);
-       if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-               rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-       else
-               rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+       ds_set_qp_size(rs);
 
-       rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+       rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
        if (!rs->sbuf)
                return ERR(ENOMEM);
 
-       rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
+       rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
+       if (!rs->dmsg)
+               return ERR(ENOEMEM);
+
        rs->sbuf_bytes_avail = rs->sbuf_size;
        rs->sqe_avail = rs->sq_size;
+       rs->rqe_avail = rs->rq_size;
+
+       rs->smsg_free = (struct ds_smsg *) rs->sbuf;
+       msg = rs->smsg_free;
+       for (i = 0; i < rs->sq_size - 1; i++) {
+               msg->next = (void *) msg + i * RS_SNDLOWAT;
+               msg = msg->next;
+       }
+       msg->next = NULL;
 
        ret = rs_svc_insert(rs);
        if (ret)
@@ -1186,7 +1264,7 @@ static int ds_get_src_addr(struct rsocket *rs,
        uint16_t port;
 
        *src_len = sizeof src_addr;
-       ret = getsockname(rs->dsock, &src_addr->sa, src_len);
+       ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
        if (ret || !rs_any_addr(src_addr))
                return ret;
 
@@ -1211,6 +1289,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
                        socklen_t addrlen, struct ds_qp **qp)
 {
        struct ibv_qp_init_attr qp_attr;
+       struct epoll_event event;
        int ret;
 
        *qp = calloc(1, sizeof(struct ds_qp));
@@ -1245,17 +1324,24 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
        qp_attr.cap.max_send_sge = 2;
        qp_attr.cap.max_recv_sge = 1;
        qp_attr.cap.max_inline_data = rs->sq_inline;
-
        ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
        if (ret)
-               return ret;
+               goto err;
+
+       event.events = EPOLLIN | EPOLLOUT;
+       event.data.ptr = *qp;
+       ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
+                       (*qp)->cm_id->recv_cq_channel->fd, &event);
+       if (ret)
+               goto err;
 
        for (i = 0; i < rs->rq_size; i++) {
                ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
                if (ret)
                        goto err;
        }
-       list_insert_head(&(*qp)->list, &rs->qp_list);
+
+       ds_insert_qp(rs, *qp);
        return 0;
 err:
        ds_free_qp(*qp);
@@ -1265,12 +1351,14 @@ err:
 static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
                     socklen_t addrlen, struct ds_qp **qp)
 {
-       dlist_t *entry;
+       if (rs->qp_list) {
+               *qp = rs->qp_list;
+               do {
+                       if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+                               return 0;
 
-       for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
-               *qp = container_of(entry, struct ds_qp, list);
-               if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-                       return 0;
+                       *qp = ds_next_qp(*qp);
+               } while (*qp != rs->qp_list);
        }
 
        return ds_create_qp(rs, src_addr, addrlen, qp);
@@ -1328,7 +1416,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
                ret = rs_do_connect(rs);
        } else {
                fastlock_acquire(&rs->slock);
-               ret = connect(rs->dsock, addr, addrlen);
+               ret = connect(rs->udp_sock, addr, addrlen);
                if (!ret)
                        ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
                fastlock_release(&rs->slock);
@@ -1428,9 +1516,12 @@ static int ds_send_data(struct rsocket *rs,
                        struct ibv_sge *sgl, int nsge,
                        uint32_t length, int flags)
 {
+       uint64_t offset;
+
        rs->sqe_avail--;
        rs->sbuf_bytes_avail -= length;
-       return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
+       offset = sgl->addr - (uintptr_t) rs->sbuf;
+       return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
 }
 
 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
@@ -1688,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
        return ret;
 }
 
+/*
+ * Poll all CQs associated with a datagram rsocket.  We need to drop any
+ * received messages that we do not have room to store.  To limit drops,
+ * we only poll if we have room to store the receive or we need a send
+ * buffer.  To ensure fairness, we poll the CQs round robin, remembering
+ * where we left off.
+ */
+static void ds_poll_cqs(struct rsocket *rs)
+{
+       struct ds_qp *qp;
+       struct ds_smsg *msg;
+       struct ibv_wc wc;
+       int ret, cnt;
+
+       qp = rs->qp_list;
+       do {
+               cnt = 0;
+               do {
+                       ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
+                       if (ret <= 0) {
+                               qp = ds_next_qp(qp);
+                               continue;
+                       }
+
+                       if (ds_wr_is_recv(wc.wr_id)) {
+                               if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
+                                       rs->rqe_avail--;
+                                       rs->dmsg[rs->rmsg_tail].qp = qp;
+                                       if (++rs->rmsg_tail == rs->rq_size + 1)
+                                               rs->rmsg_tail = 0;
+                               } else {
+                                       ds_post_recv(rs, qp, qp->rbuf +
+                                                            ds_wr_offset(wc.wr_id));
+                               }
+                       } else {
+                               if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
+                                       msg = (struct ds_smsg *)
+                                             (rs->sbuf + ds_wr_offset(wc.wr_id));
+                                       msg->next = rs->smsg_free;
+                                       rs->smsg_free = msg;
+                               }
+                               rs->sqe_avail++;
+                       }
+
+                       qp = ds_next_qp(qp);
+                       if (!rs->rqe_avail && rs->sqe_avail) {
+                               rs->qp_list = qp;
+                               return;
+                       }
+                       cnt++;
+               } while (qp != rs->qp_list);
+       } while (cnt);
+}
+
+static void ds_req_notify_cqs(struct rsocket *rs)
+{
+       struct ds_qp *qp;
+
+       qp = rs->qp_list;
+       do {
+               if (!qp->cq_armed) {
+                       ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
+                       qp->cq_armed = 1;
+               }
+               qp = ds_next_qp(qp);
+       } while (qp != rs->qp_list);
+}
+
+static int ds_get_cq_event(struct rsocket *rs)
+{
+       struct epoll_event event;
+       struct ds_qp *qp;
+       struct ibv_cq *cq;
+       void *context;
+       int ret;
+
+       if (!rs->cq_armed)
+               return 0;
+
+       ret = epoll_wait(rs->epfd, &event, 1, -1);
+       if (ret <= 0)
+               return ret;
+
+       qp = event.data.ptr;
+       ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
+       if (!ret) {
+               ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
+               qp->cq_armed = 0;
+               rs->cq_armed = 0;
+       }
+
+       return ret;
+}
+
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+       int ret = 0;
+
+       fastlock_acquire(&rs->cq_lock);
+       do {
+               ds_poll_cqs(rs);
+               if (test(rs)) {
+                       ret = 0;
+                       break;
+               } else if (nonblock) {
+                       ret = ERR(EWOULDBLOCK);
+               } else if (!rs->cq_armed) {
+                       ds_req_notify_cqs(rs);
+                       rs->cq_armed = 1;
+               } else {
+                       rs_update_credits(rs);
+                       fastlock_acquire(&rs->cq_wait_lock);
+                       fastlock_release(&rs->cq_lock);
+
+                       ret = ds_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
+                       fastlock_acquire(&rs->cq_lock);
+               }
+       } while (!ret);
+
+       fastlock_release(&rs->cq_lock);
+       return ret;
+}
+
+static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+       struct timeval s, e;
+       uint32_t poll_time = 0;
+       int ret;
+
+       do {
+               ret = ds_process_cqs(rs, 1, test);
+               if (!ret || nonblock || errno != EWOULDBLOCK)
+                       return ret;
+
+               if (!poll_time)
+                       gettimeofday(&s, NULL);
+
+               gettimeofday(&e, NULL);
+               poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
+                           (e.tv_usec - s.tv_usec) + 1;
+       } while (poll_time <= polling_time);
+
+       ret = ds_process_cqs(rs, 0, test);
+       return ret;
+}
+
 static int rs_nonblocking(struct rsocket *rs, int flags)
 {
        return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
@@ -1721,7 +1959,7 @@ static int rs_can_send(struct rsocket *rs)
 
 static int ds_can_send(struct rsocket *rs)
 {
-       return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
+       return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
 }
 
 static int rs_conn_can_send(struct rsocket *rs)
@@ -1961,31 +2199,22 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
 {
        struct ds_udp_header hdr;
        struct msghdr msg;
-       struct iovec miov[4];
+       struct iovec miov[8];
        struct ds_qp *qp;
 
-       if (iovcnt > 4)
+       if (iovcnt > 8)
                return ERR(ENOTSUP);
 
-       qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
        hdr.tag = htonll(DS_UDP_TAG);
        hdr.version = 1;
-       if (qp) {
-               hdr.sl = qp->sl;
-               hdr.slid = htons(qp->lid);
-               hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
-       } else {
-               hdr.sl = 0;
-               hdr.slid = 0;
-               hdr.qpn = 0;
-       }
+       memset(&hdr->reserved, 0, sizeof hdr->reserved);
+       hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
 
        miov[0].iov_base = &hdr;
        miov[0].iov_len = sizeof hdr;
        memcpy(&miov[1], iov, sizeof *iov * iovcnt);
 
        memset(&msg, 0, sizeof msg);
-       /* TODO: specify name if needed */
        msg.msg_iov = miov;
        msg.msg_iovlen = iovcnt + 1;
        return sendmsg(rs->fd, msg, flags);
@@ -2004,14 +2233,13 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
        struct ibv_sge sge;
        int ret = 0;
 
-       if (!rs->conn_dest || !rs->conn_dest->ah)
+       if (!rs->conn_dest->ah)
                return ds_send_udp(rs, buf, len, flags);
 
-       rs->sbytes_needed = len;
        if (!ds_can_send(rs)) {
-               ret = rs_get_comp(rs, 1, ds_can_send);
+               ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
                if (ret)
-                       return ds_send_udp(rs, buf, len, flags);
+                       return ret;
        }
 
        if (len <= rs->sq_inline) {
@@ -2145,8 +2373,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
        }
 
        fastlock_acquire(&rs->slock);
-       ds_connect(rs, dest_addr, addrlen);
+       if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
+               ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+               if (ret)
+                       goto out;
+       }
        ret = dsend(rs, buf, len, flags);
+out:
        fastlock_release(&rs->slock);
        return ret;
 }
@@ -2629,7 +2862,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
                rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
                return 0;
        } else {
-               return getpeername(rs->fs, addr, addrlen);
+               return getpeername(rs->udp_sock, addr, addrlen);
        }
 }
 
@@ -2642,7 +2875,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
                rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
                return 0;
        } else {
-               return getsockname(rs->fd, addr, addrlen);
+               return getsockname(rs->udp_sock, addr, addrlen);
        }
 }
 
@@ -2656,7 +2889,7 @@ int rsetsockopt(int socket, int level, int optname,
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
        if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-               ret = setsockopt(rs->fd, optname, optval, optlen);
+               ret = setsockopt(rs->udp_sock, optname, optval, optlen);
                if (ret)
                        return ret;
        }
@@ -2666,13 +2899,15 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->so_opts;
                switch (optname) {
                case SO_REUSEADDR:
-                       ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-                                             RDMA_OPTION_ID_REUSEADDR,
-                                             (void *) optval, optlen);
-                       if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
-                           rs->cm_id->context &&
-                           (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
-                               ret = 0;
+                       if (rs->type == SOCK_STREAM) {
+                               ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+                                                     RDMA_OPTION_ID_REUSEADDR,
+                                                     (void *) optval, optlen);
+                               if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
+                                   rs->cm_id->context &&
+                                   (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
+                                       ret = 0;
+                       }
                        opt_on = *(int *) optval;
                        break;
                case SO_RCVBUF:
@@ -2722,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
-                       ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-                                             RDMA_OPTION_ID_AFONLY,
-                                             (void *) optval, optlen);
+                       if (rs->type == SOCK_STREAM) {
+                               ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+                                                     RDMA_OPTION_ID_AFONLY,
+                                                     (void *) optval, optlen);
+                       }
                        opt_on = *(int *) optval;
                        break;
                default:
@@ -2778,9 +3015,6 @@ int rgetsockopt(int socket, int level, int optname,
        int ret = 0;
 
        rs = idm_at(&idm, socket);
-       if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
-               return getsockopt(rs->fd, level, optname, optval, optlen);
-
        switch (level) {
        case SOL_SOCKET:
                switch (optname) {
@@ -3100,24 +3334,3 @@ out:
 
        return (ret && left == count) ? ret : count - left;
 }
-
-ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
-                 struct sockaddr *src_addr, socklen_t *addrlen)
-{
-       int ret;
-
-       ret = rrecv(socket, buf, len, flags);
-       if (ret > 0 && src_addr)
-               rgetpeername(socket, src_addr, addrlen);
-
-       return ret;
-}
-
-ssize_t usendto(int socket, const void *buf, size_t len, int flags,
-               const struct sockaddr *dest_addr, socklen_t addrlen)
-{
-       if (dest_addr || addrlen)
-               return ERR(EISCONN);
-
-       return usend(socket, buf, len, flags);
-}