]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Mon, 26 Nov 2012 20:10:17 +0000 (12:10 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 26 Nov 2012 20:10:17 +0000 (12:10 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index aa8620cd1c8132f8bb5f3cf9deb116c588bbc08d..711c5a1fffbf99f85f53e96c2dec1cb5de7b9c8d 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: 5734fbaa6c0e1a922578782abfbd2ab14da6a05c
-Head: bdafce023ae945e1c2fd739a60f60d005c8baef8
+Previous: 81199560fd2b5bf7fdc3657035d8a8acb6710571
+Head: 59a409d3d22b50727262a8e057761e048d59db4b
 Applied:
-  dsocket: 7029a9a237f690cfe1fd840d35a93647ca447ab5
-  refresh-temp: bdafce023ae945e1c2fd739a60f60d005c8baef8
+  dsocket: 59a409d3d22b50727262a8e057761e048d59db4b
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
index 59d04caf037aa2c9f85b352a1ad1427129b37b8f..f870b78b96ec03aaa49401ffe12d9a0086951363 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top:    da0048097eea01b21df587e85b3f7ac44a2582c8
+Top:    faabde8e748d27fc0f733e702ed7f3c3902d8f58
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -11,7 +11,7 @@ Signed-off-by: Sean Hefty <sean.hefty@intel.com>
 ---
 
 diff --git a/docs/rsocket b/docs/rsocket
-index 1484f65..03d49df 100644
+index 1484f65..a660208 100644
 --- a/docs/rsocket
 +++ b/docs/rsocket
 @@ -1,7 +1,7 @@
@@ -25,7 +25,7 @@ index 1484f65..03d49df 100644
  Rsockets is a protocol over RDMA that supports a socket-level API
  for applications.  For details on the current state of the
  implementation, readers should refer to the rsocket man page.  This
-@@ -189,3 +189,47 @@ registered remote data buffer.
+@@ -189,3 +189,34 @@ registered remote data buffer.
  From host A's perspective, the transfer appears as a normal send/write
  operation, with the data stream redirected directly into the receiving
  application's buffer.
@@ -34,7 +34,7 @@ index 1484f65..03d49df 100644
 +
 +Datagram Overview
 +-----------------
-+THe rsocket API supports datagram sockets.  Datagram support is handled through an
++The rsocket API supports datagram sockets.  Datagram support is handled through an
 +entirely different protocol and internal implementation.  Unlike connected rsockets,
 +datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
 +socket may use any number of network (IP) addresses, including those which map to
@@ -44,35 +44,22 @@ index 1484f65..03d49df 100644
 +
 +Rsockets uses headers inserted before user data sent over UDP sockets to resolve
 +remote UD QP numbers.  When a user first attempts to send a datagram to a remote
-+address (IP and UDP port), rsockets will take the following steps.  First, it
-+will allocate and store the destination address into a lookup table for future
-+use.  Then it will resolve which local network address should be used when sending
-+to the specified destination.  It will allocate a UD QP on the RDMA device
-+associated with the local address.  Finally, rsockets will send the user's
-+datagram to the remote UDP socket, but inserting a header before the datagram.
-+The header specifies the UD QP number associated with the local network address
-+(IP and UDP port) of the send.
-+
-+Under many circumstances, the rsocket UDP header also provides enough path
-+information for the receiver to send the reply using a UD QP.  However, certain
-+fabric topologies may require contacting a subnet administrator.  Whenever
-+rsockets lacks sufficient information to send data to a remote peer using
-+a UD QP, it will automatically fall back to using a standard UDP socket.  This
-+helps minimize the latency for performing a given send, while lenghty address
-+and route resolution protocols complete.
-+
-+Currently, rsockets allocates a single UD QP, with an infrastructure is provided
-+to support more.  Future enhancements may add support for multiple UD QPs, each
-+associated with a single network (IP) address.  Multiplexing different network
-+addresses over a single UD QP and using shared receive queues are other possible
-+enhancements.
-+
-+Because rsockets may use multiple UD QPs, buffer management can become an issue.
-+The total size of receive buffers is specified by the mem_default configuration
-+files, but may be overridden with rsetsockopt.  The buffer is divided into
-+MTU sized messages and distributed among the allocated QPs.  As new QPs are
-+created, the receive buffers may be redistributed by posting loopback sends
-+on a QP in order to free the receive buffers for reposting to a different QP.  
++address (IP and UDP port), rsockets will take the following steps:
++
++1. Store the destination address into a lookup table.
++2. Resolve which local network address should be used when sending
++   to the specified destination.
++3. Allocate a UD QP on the RDMA device associated with the local address.
++4. Send the user's datagram to the remote UDP socket.
++
++A header is inserted before the user's datagram.  The header specifies the
++UD QP number associated with the local network address (IP and UDP port) of
++the send.
++
++A service thread is used to process messages received on the UDP socket.  This
++thread updates the rsocket lookup tables with the remote QPN and path record
++data.  The service thread forwards data received on the UDP socket to an
++rsocket QP.
 \ No newline at end of file
 diff --git a/src/cma.c b/src/cma.c
 index 91bf108..2c6b032 100755
@@ -99,10 +86,18 @@ index 91bf108..2c6b032 100755
  
  uint16_t ucma_get_port(struct sockaddr *addr)
 diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..0695d12 100644
+index 58fcb8e..a81b8f3 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
-@@ -55,7 +55,7 @@
+@@ -46,6 +46,7 @@
+ #include <string.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
++#include <sys/epoll.h>
+ #include <rdma/rdma_cma.h>
+ #include <rdma/rdma_verbs.h>
+@@ -55,7 +56,7 @@
  
  #define RS_OLAP_START_SIZE 2048
  #define RS_MAX_TRANSFER 65536
@@ -111,7 +106,7 @@ index 58fcb8e..0695d12 100644
  #define RS_QP_MAX_SIZE 0xFFFE
  #define RS_QP_CTRL_SIZE 4
  #define RS_CONN_RETRIES 6
-@@ -63,6 +63,23 @@
+@@ -63,6 +64,23 @@
  static struct index_map idm;
  static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
  
@@ -135,20 +130,50 @@ index 58fcb8e..0695d12 100644
  static uint16_t def_iomap_size = 0;
  static uint16_t def_inline = 64;
  static uint16_t def_sqsize = 384;
-@@ -110,6 +127,12 @@ struct rs_msg {
+@@ -99,6 +117,14 @@ enum {
+ #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
+ #define rs_msg_op(imm_data)   (imm_data >> 29)
+ #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
++#define RS_RECV_WR_ID (~((uint64_t) 0))
++
++#define DS_WR_RECV 0xFFFFFFFF
++#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
++#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
++#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
++#define ds_wr_length(wr_id) ((uint32_t) wr_id)
++#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
+ enum {
+       RS_CTRL_DISCONNECT,
+@@ -110,6 +136,18 @@ struct rs_msg {
        uint32_t data;
  };
  
 +struct ds_qp;
 +
-+struct ds_msg {
-+      struct ds_qp *qp;
++struct ds_rmsg {
++      struct ds_qp    *qp;
++      uint32_t        offset;
++      uint32_t        length;
++};
++
++struct ds_smsg {
++      struct ds_smsg  *next;
 +};
 +
  struct rs_sge {
        uint64_t addr;
        uint32_t key;
-@@ -159,9 +182,9 @@ enum rs_state {
+@@ -144,8 +182,6 @@ struct rs_conn_data {
+       struct rs_sge     data_buf;
+ };
+-#define RS_RECV_WR_ID (~((uint64_t) 0))
+-
+ /*
+  * rsocket states are ordered as passive, connecting, connected, disconnected.
+  */
+@@ -159,9 +195,9 @@ enum rs_state {
        rs_connecting      = rs_opening |   0x0040,
        rs_accepting       = rs_opening |   0x0080,
        rs_connected       =                0x0100,
@@ -161,7 +186,7 @@ index 58fcb8e..0695d12 100644
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
-@@ -169,13 +192,68 @@ enum rs_state {
+@@ -169,68 +205,203 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -177,12 +202,11 @@ index 58fcb8e..0695d12 100644
 +      struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
 +
++      struct ibv_mr     *smr;
 +      struct ibv_mr     *rmr;
 +      uint8_t           *rbuf;
 +
-+      struct ibv_mr     *smr;
-+      uint16_t          lid;
-+      uint8_t           sl;
++      int               cq_armed;
 +};
 +
 +struct ds_dest {
@@ -206,6 +230,13 @@ index 58fcb8e..0695d12 100644
 +              /* data stream */
 +              struct {
 +                      struct rdma_cm_id *cm_id;
++                      uint64_t          tcp_opts;
++
++                      int               ctrl_avail;
++                      uint16_t          sseq_no;
++                      uint16_t          sseq_comp;
++                      uint16_t          rseq_no;
++                      uint16_t          rseq_comp;
 +
 +                      int               remote_sge;
 +                      struct rs_sge     remote_sgl;
@@ -217,32 +248,57 @@ index 58fcb8e..0695d12 100644
 +                      void              *target_buffer_list;
 +                      volatile struct rs_sge    *target_sgl;
 +                      struct rs_iomap   *target_iomap;
++
++                      int               rbuf_bytes_avail;
++                      int               rbuf_free_offset;
++                      int               rbuf_offset;
++                      struct ibv_mr     *rmr;
++                      uint8_t           *rbuf;
++
++                      int               sbuf_bytes_avail;
++                      struct ibv_mr     *smr;
++                      struct ibv_sge    ssgl[2];
 +              };
 +              /* datagram */
 +              struct {
-+                      dlist_t           qp_list;
++                      struct ds_qp      *qp_list;
 +                      void              *dest_map;
 +                      struct ds_dest    *conn_dest;
-+                      struct pollfd     *fds;
-+                      nfds_t            nfds;
-+                      int               dsock;
-+                      int               sbytes_needed;
++
++                      int               udp_sock;
++                      int               epfd;
++                      int               rqe_avail;
++                      struct ds_smsg    *smsg_free;
 +              };
 +      };
  
        int               opts;
        long              fd_flags;
-@@ -186,7 +264,7 @@ struct rsocket {
+       uint64_t          so_opts;
+-      uint64_t          tcp_opts;
+       uint64_t          ipv6_opts;
+       int               state;
        int               cq_armed;
        int               retries;
        int               err;
 -      int               index;
+-      int               ctrl_avail;
 +
-       int               ctrl_avail;
        int               sqe_avail;
-       int               sbuf_bytes_avail;
-@@ -203,34 +281,93 @@ struct rsocket {
-       int               rbuf_offset;
+-      int               sbuf_bytes_avail;
+-      uint16_t          sseq_no;
+-      uint16_t          sseq_comp;
++      uint32_t          sbuf_size;
+       uint16_t          sq_size;
+       uint16_t          sq_inline;
++      uint32_t          rbuf_size;
+       uint16_t          rq_size;
+-      uint16_t          rseq_no;
+-      uint16_t          rseq_comp;
+-      int               rbuf_bytes_avail;
+-      int               rbuf_free_offset;
+-      int               rbuf_offset;
        int               rmsg_head;
        int               rmsg_tail;
 -      struct rs_msg     *rmsg;
@@ -252,13 +308,15 @@ index 58fcb8e..0695d12 100644
 -      struct rs_sge     remote_iomap;
 +      union {
 +              struct rs_msg     *rmsg;
-+              struct ds_msg     *dmsg;
++              struct ds_rmsg    *dmsg;
 +      };
  
++      uint8_t           *sbuf;
        struct rs_iomap_mr *remote_iomappings;
        dlist_entry       iomap_list;
        dlist_entry       iomap_queue;
        int               iomap_pending;
++};
  
 -      struct ibv_mr    *target_mr;
 -      int               target_sge;
@@ -267,28 +325,43 @@ index 58fcb8e..0695d12 100644
 -      volatile struct rs_sge    *target_sgl;
 -      struct rs_iomap  *target_iomap;
 -
-       uint32_t          rbuf_size;
+-      uint32_t          rbuf_size;
 -      struct ibv_mr    *rmr;
-+      struct ibv_mr     *rmr;
-       uint8_t           *rbuf;
+-      uint8_t           *rbuf;
++#define DS_UDP_TAG 0x5555555555555555ULL
  
-       uint32_t          sbuf_size;
+-      uint32_t          sbuf_size;
 -      struct ibv_mr    *smr;
-+      struct ibv_mr     *smr;
-       struct ibv_sge    ssgl[2];
-       uint8_t           *sbuf;
- };
-+#define DS_UDP_TAG 0x5555555555555555ULL
-+
+-      struct ibv_sge    ssgl[2];
+-      uint8_t           *sbuf;
 +struct ds_udp_header {
 +      uint64_t          tag;
 +      uint8_t           version;
-+      uint8_t           sl;
-+      uint16_t          slid;
++      uint8_t           reserved[3];
 +      uint32_t          qpn;  /* upper 8-bits reserved */
-+};
+ };
 +
++#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
++
++static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
++{
++      if (!rs->qp_list)
++              list_init(&qp->list);
++      else
++              list_insert_head(&qp->list, &rs->qp_list->list);
++      rs->qp_list = *qp;
++}
++
++static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
++{
++      if (qp->list.next != qp->list) {
++              rs->qp_list = ds_next_qp(qp);
++              dlist_remove(&qp->list);
++      } else {
++              rs->qp_list = NULL;
++      }
++}
 +
 +static int rs_svc_run(void *arg)
 +{
@@ -349,7 +422,7 @@ index 58fcb8e..0695d12 100644
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -306,10 +443,10 @@ out:
+@@ -306,10 +477,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -362,7 +435,7 @@ index 58fcb8e..0695d12 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -321,7 +458,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +492,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -371,17 +444,19 @@ index 58fcb8e..0695d12 100644
  {
        struct rsocket *rs;
  
-@@ -329,7 +466,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +500,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
 +      rs->type = type;
        rs->index = -1;
-+      rs->dsock = -1;
++      rs->udp_sock = -1;
++      rs->epfd = -1;
++
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,12 +490,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,7 +526,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
@@ -390,15 +465,99 @@ index 58fcb8e..0695d12 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
- }
+@@ -359,13 +534,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
  
-+/*
-+ * TODO: Support datagram rsockets
-+ */
  static int rs_set_nonblocking(struct rsocket *rs, long arg)
  {
++      struct ds_qp *qp;
        int ret = 0;
-@@ -439,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
+-      if (rs->cm_id->recv_cq_channel)
+-              ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
++      if (rs->type == SOCK_STREAM) {
++              if (rs->cm_id->recv_cq_channel)
++                      ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+-      if (!ret && rs->state < rs_connected)
+-              ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++              if (!ret && rs->state < rs_connected)
++                      ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++      } else {
++              ret = fcntl(rs->epfd, F_SETFL, arg);
++
++              if (!ret && rs->qp_list) {
++                      qp = rs->qp_list;
++                      do {
++                              ret = fcntl(qp->cm_id->recv_cq_channel->fd,
++                                          F_SETFL, arg);
++                              qp = ds_next_qp(qp);
++                      } while (qp != rs->qp_list && !ret);
++              }
++      }
+       return ret;
+ }
+@@ -389,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+               rs->rq_size = 2;
+ }
++static void ds_set_qp_size(struct rsocket *rs)
++{
++      uint16_t max_size;
++
++      max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
++
++      if (rs->sq_size > max_size)
++              rs->sq_size = max_size;
++      if (rs->rq_size > max_size)
++              rs->rq_size = max_size;
++
++      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
++              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
++      else
++              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++
++      if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
++              rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
++      else
++              rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
++}
++
+ static int rs_init_bufs(struct rsocket *rs)
+ {
+       size_t len;
+       rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
+       if (!rs->rmsg)
+-              return -1;
++              return ERR(ENOEMEM);
+       rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+       if (!rs->sbuf)
+-              return -1;
++              return ERR(ENOEMEM);
+       rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
+       if (!rs->smr)
+@@ -409,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
+             sizeof(*rs->target_iomap) * rs->target_iomap_size;
+       rs->target_buffer_list = malloc(len);
+       if (!rs->target_buffer_list)
+-              return -1;
++              return ERR(ENOEMEM);
+       rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
+       if (!rs->target_mr)
+@@ -422,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
+       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+       if (!rs->rbuf)
+-              return -1;
++              return ERR(ENOEMEM);
+       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+       if (!rs->rmr)
+@@ -439,15 +650,32 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
  }
  
@@ -437,7 +596,7 @@ index 58fcb8e..0695d12 100644
                goto err1;
  
        if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -455,21 +683,20 @@ static int rs_create_cq(struct rsocket *rs)
                        goto err2;
        }
  
@@ -466,7 +625,7 @@ index 58fcb8e..0695d12 100644
  {
        struct ibv_recv_wr wr, *bad;
  
-@@ -481,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -481,6 +708,23 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -479,7 +638,7 @@ index 58fcb8e..0695d12 100644
 +      sge.length = RS_SNDLOWAT;
 +      sge.lkey = qp->rmr;
 +
-+      wr.wr_id = RS_RECV_WR_ID;
++      wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
 +      wr.next = NULL;
 +      wr.sg_list = &sge;
 +      wr.num_sge = 1;
@@ -490,7 +649,7 @@ index 58fcb8e..0695d12 100644
  static int rs_create_ep(struct rsocket *rs)
  {
        struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -491,7 +735,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
  
@@ -499,7 +658,7 @@ index 58fcb8e..0695d12 100644
        if (ret)
                return ret;
  
-@@ -548,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -548,8 +792,73 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
  }
  
@@ -516,7 +675,8 @@ index 58fcb8e..0695d12 100644
 +
 +      if (qp->cm_id) {
 +              if (qp->cm_id->qp) {
-+                      dlist_remove(&qp->list);
++                      epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
++                                qp->cm_id->recv_cq_channel->fd, NULL);
 +                      rdma_destroy_qp(qp->cm_id);
 +              }
 +              rdma_destroy_id(qp->cm_id);
@@ -524,31 +684,32 @@ index 58fcb8e..0695d12 100644
 +      free(qp);
 +}
 +
-+static void ds_free_qps(struct rsocket *rs)
++static void ds_free(struct rsocket *rs)
 +{
 +      struct ds_qp *qp;
-+      dlist_t *entry;
 +
-+      while (!dlist_empty(&rs->qp_list)) {
-+              qp = container_of(rs->qp_list.next, struct ds_qp, list);
-+              ds_free_qp(qp);
-+      }
-+}
-+
-+static void ds_free(struct rsocket *rs)
-+{
 +      if (rs->state & (rs_readable | rs_writable))
 +              rs_svc_remove(rs);
 +
-+      if (rs->dsock >= 0)
-+              close(rs->dsock);
++      if (rs->udp_sock >= 0)
++              close(rs->udp_sock);
 +
 +      if (rs->index >= 0)
 +              rs_remove(rs);
 +
-+      ds_free_qps(rs);
-+      if (rs->fds)
-+              free(rs->fds);
++      if (rs->dmsg)
++              free(rs->dmsg);
++
++      if (rs->smsg)
++              free(rs->smsg);
++
++      while (rs->qp_list) {
++              ds_remove_qp(rs, rs->qp_list);
++              ds_free_qp(rs->qp_list);
++      }
++
++      if (rs->epfd >= 0)
++              close(rs->epfd);
 +
 +      if (rs->sbuf)
 +              free(rs->sbuf);
@@ -571,7 +732,7 @@ index 58fcb8e..0695d12 100644
        if (rs->index >= 0)
                rs_remove(rs);
  
-@@ -581,7 +819,7 @@ static void rs_free(struct rsocket *rs)
+@@ -581,7 +890,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
  
@@ -580,21 +741,19 @@ index 58fcb8e..0695d12 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -635,29 +873,56 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +944,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
 +static int ds_init(struct rsocket *rs, int domain)
 +{
-+      rs->dsock = socket(domain, SOCK_DGRAM, 0);
-+      if (rs->dsock < 0)
-+              return rs->dsock;
++      rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
++      if (rs->udp_sock < 0)
++              return rs->udp_sock;
 +
-+      rs->fds = calloc(1, sizeof *fds);
-+      if (!rs->fds)
-+              return ERR(ENOMEM);
-+      rs->nfds = 1;
-+      dlist_init(&rs->qp_list);
++      rs->epfd = epoll_create(0);
++      if (rs->epfd < 0)
++              return rs->epfd;
 +
 +      return 0;
 +}
@@ -633,7 +792,7 @@ index 58fcb8e..0695d12 100644
 +              if (ret)
 +                      goto err;
 +
-+              index = rs->dsock;
++              index = rs->udp_sock;
 +      }
  
 -      ret = rs_insert(rs);
@@ -645,7 +804,7 @@ index 58fcb8e..0695d12 100644
        return rs->index;
  
  err:
-@@ -671,9 +936,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +1005,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -657,7 +816,7 @@ index 58fcb8e..0695d12 100644
 +              if (!ret)
 +                      rs->state = rs_bound;
 +      } else {
-+              ret = bind(rs->dsock, addr, addrlen);
++              ret = bind(rs->udp_sock, addr, addrlen);
 +              if (!ret) {
 +                      ret = rs_svc_insert(rs);
 +                      if (!ret)
@@ -667,7 +826,7 @@ index 58fcb8e..0695d12 100644
        return ret;
  }
  
-@@ -709,7 +983,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +1052,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -676,7 +835,7 @@ index 58fcb8e..0695d12 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -717,7 +991,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +1060,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -685,27 +844,36 @@ index 58fcb8e..0695d12 100644
        if (ret < 0)
                goto err;
  
-@@ -854,13 +1128,212 @@ connected:
+@@ -854,13 +1197,231 @@ connected:
        return ret;
  }
  
 +static int ds_init_ep(struct rsocket *rs)
 +{
-+      int ret;
++      struct ds_smsg *msg;
++      int i, ret;
 +
-+      rs_set_qp_size(rs);
-+      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-+              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-+      else
-+              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++      ds_set_qp_size(rs);
 +
-+      rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
++      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
 +      if (!rs->sbuf)
 +              return ERR(ENOMEM);
 +
-+      rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
++      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
++      if (!rs->dmsg)
++              return ERR(ENOEMEM);
++
 +      rs->sbuf_bytes_avail = rs->sbuf_size;
 +      rs->sqe_avail = rs->sq_size;
++      rs->rqe_avail = rs->rq_size;
++
++      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
++      msg = rs->smsg_free;
++      for (i = 0; i < rs->sq_size - 1; i++) {
++              msg->next = (void *) msg + i * RS_SNDLOWAT;
++              msg = msg->next;
++      }
++      msg->next = NULL;
 +
 +      ret = rs_svc_insert(rs);
 +      if (ret)
@@ -747,7 +915,7 @@ index 58fcb8e..0695d12 100644
 +      uint16_t port;
 +
 +      *src_len = sizeof src_addr;
-+      ret = getsockname(rs->dsock, &src_addr->sa, src_len);
++      ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
 +      if (ret || !rs_any_addr(src_addr))
 +              return ret;
 +
@@ -772,6 +940,7 @@ index 58fcb8e..0695d12 100644
 +                      socklen_t addrlen, struct ds_qp **qp)
 +{
 +      struct ibv_qp_init_attr qp_attr;
++      struct epoll_event event;
 +      int ret;
 +
 +      *qp = calloc(1, sizeof(struct ds_qp));
@@ -806,17 +975,24 @@ index 58fcb8e..0695d12 100644
 +      qp_attr.cap.max_send_sge = 2;
 +      qp_attr.cap.max_recv_sge = 1;
 +      qp_attr.cap.max_inline_data = rs->sq_inline;
-+
 +      ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
 +      if (ret)
-+              return ret;
++              goto err;
++
++      event.events = EPOLLIN | EPOLLOUT;
++      event.data.ptr = *qp;
++      ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
++                      (*qp)->cm_id->recv_cq_channel->fd, &event);
++      if (ret)
++              goto err;
 +
 +      for (i = 0; i < rs->rq_size; i++) {
 +              ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
 +              if (ret)
 +                      goto err;
 +      }
-+      list_insert_head(&(*qp)->list, &rs->qp_list);
++
++      ds_insert_qp(rs, *qp);
 +      return 0;
 +err:
 +      ds_free_qp(*qp);
@@ -826,12 +1002,14 @@ index 58fcb8e..0695d12 100644
 +static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
 +                   socklen_t addrlen, struct ds_qp **qp)
 +{
-+      dlist_t *entry;
-+
-+      for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
-+              *qp = container_of(entry, struct ds_qp, list);
-+              if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-+                      return 0;
++      if (rs->qp_list) {
++              *qp = rs->qp_list;
++              do {
++                      if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
++                              return 0;
++
++                      *qp = ds_next_qp(*qp);
++              } while (*qp != rs->qp_list);
 +      }
 +
 +      return ds_create_qp(rs, src_addr, addrlen, qp);
@@ -891,7 +1069,7 @@ index 58fcb8e..0695d12 100644
 +              ret = rs_do_connect(rs);
 +      } else {
 +              fastlock_acquire(&rs->slock);
-+              ret = connect(rs->dsock, addr, addrlen);
++              ret = connect(rs->udp_sock, addr, addrlen);
 +              if (!ret)
 +                      ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
 +              fastlock_release(&rs->slock);
@@ -900,7 +1078,7 @@ index 58fcb8e..0695d12 100644
  }
  
  static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1375,25 @@ static int rs_post_write(struct rsocket *rs,
+@@ -902,6 +1463,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -926,7 +1104,7 @@ index 58fcb8e..0695d12 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -932,6 +1424,15 @@ static int rs_write_data(struct rsocket *rs,
+@@ -932,6 +1512,18 @@ static int rs_write_data(struct rsocket *rs,
                                 flags, addr, rkey);
  }
  
@@ -934,15 +1112,18 @@ index 58fcb8e..0695d12 100644
 +                      struct ibv_sge *sgl, int nsge,
 +                      uint32_t length, int flags)
 +{
++      uint64_t offset;
++
 +      rs->sqe_avail--;
 +      rs->sbuf_bytes_avail -= length;
-+      return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
++      offset = sgl->addr - (uintptr_t) rs->sbuf;
++      return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
 +}
 +
  static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
                           struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
  {
-@@ -1045,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1045,7 +1637,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
@@ -951,13 +1132,167 @@ index 58fcb8e..0695d12 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1218,9 +1719,14 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1187,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+       return ret;
+ }
++/*
++ * Poll all CQs associated with a datagram rsocket.  We need to drop any
++ * received messages that we do not have room to store.  To limit drops,
++ * we only poll if we have room to store the receive or we need a send
++ * buffer.  To ensure fairness, we poll the CQs round robin, remembering
++ * where we left off.
++ */
++static void ds_poll_cqs(struct rsocket *rs)
++{
++      struct ds_qp *qp;
++      struct ds_smsg *msg;
++      struct ibv_wc wc;
++      int ret, cnt;
++
++      qp = rs->qp_list;
++      do {
++              cnt = 0;
++              do {
++                      ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
++                      if (ret <= 0) {
++                              qp = ds_next_qp(qp);
++                              continue;
++                      }
++
++                      if (ds_wr_is_recv(wc.wr_id)) {
++                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
++                                      rs->rqe_avail--;
++                                      rs->dmsg[rs->rmsg_tail].qp = qp;
++                                      if (++rs->rmsg_tail == rs->rq_size + 1)
++                                              rs->rmsg_tail = 0;
++                              } else {
++                                      ds_post_recv(rs, qp, qp->rbuf +
++                                                           ds_wr_offset(wc.wr_id));
++                              }
++                      } else {
++                              if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
++                                      msg = (struct ds_smsg *)
++                                            (rs->sbuf + ds_wr_offset(wc.wr_id));
++                                      msg->next = rs->smsg_free;
++                                      rs->smsg_free = msg;
++                              }
++                              rs->sqe_avail++;
++                      }
++
++                      qp = ds_next_qp(qp);
++                      if (!rs->rqe_avail && rs->sqe_avail) {
++                              rs->qp_list = qp;
++                              return;
++                      }
++                      cnt++;
++              } while (qp != rs->qp_list);
++      } while (cnt);
++}
++
++static void ds_req_notify_cqs(struct rsocket *rs)
++{
++      struct ds_qp *qp;
++
++      qp = rs->qp_list;
++      do {
++              if (!qp->cq_armed) {
++                      ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
++                      qp->cq_armed = 1;
++              }
++              qp = ds_next_qp(qp);
++      } while (qp != rs->qp_list);
++}
++
++static int ds_get_cq_event(struct rsocket *rs)
++{
++      struct epoll_event event;
++      struct ds_qp *qp;
++      struct ibv_cq *cq;
++      void *context;
++      int ret;
++
++      if (!rs->cq_armed)
++              return 0;
++
++      ret = epoll_wait(rs->epfd, &event, 1, -1);
++      if (ret <= 0)
++              return ret;
++
++      qp = event.data.ptr;
++      ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
++      if (!ret) {
++              ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
++              qp->cq_armed = 0;
++              rs->cq_armed = 0;
++      }
++
++      return ret;
++}
++
++static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
++{
++      int ret = 0;
++
++      fastlock_acquire(&rs->cq_lock);
++      do {
++              ds_poll_cqs(rs);
++              if (test(rs)) {
++                      ret = 0;
++                      break;
++              } else if (nonblock) {
++                      ret = ERR(EWOULDBLOCK);
++              } else if (!rs->cq_armed) {
++                      ds_req_notify_cqs(rs);
++                      rs->cq_armed = 1;
++              } else {
++                      rs_update_credits(rs);
++                      fastlock_acquire(&rs->cq_wait_lock);
++                      fastlock_release(&rs->cq_lock);
++
++                      ret = ds_get_cq_event(rs);
++                      fastlock_release(&rs->cq_wait_lock);
++                      fastlock_acquire(&rs->cq_lock);
++              }
++      } while (!ret);
++
++      fastlock_release(&rs->cq_lock);
++      return ret;
++}
++
++static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
++{
++      struct timeval s, e;
++      uint32_t poll_time = 0;
++      int ret;
++
++      do {
++              ret = ds_process_cqs(rs, 1, test);
++              if (!ret || nonblock || errno != EWOULDBLOCK)
++                      return ret;
++
++              if (!poll_time)
++                      gettimeofday(&s, NULL);
++
++              gettimeofday(&e, NULL);
++              poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
++                          (e.tv_usec - s.tv_usec) + 1;
++      } while (poll_time <= polling_time);
++
++      ret = ds_process_cqs(rs, 0, test);
++      return ret;
++}
++
+ static int rs_nonblocking(struct rsocket *rs, int flags)
+ {
+       return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
+@@ -1218,9 +1957,14 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
 +static int ds_can_send(struct rsocket *rs)
 +{
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
++      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
 +}
 +
  static int rs_conn_can_send(struct rsocket *rs)
@@ -967,7 +1302,7 @@ index 58fcb8e..0695d12 100644
  }
  
  static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1235,7 +1979,7 @@ static int rs_have_rdata(struct rsocket *rs)
  
  static int rs_conn_have_rdata(struct rsocket *rs)
  {
@@ -976,7 +1311,7 @@ index 58fcb8e..0695d12 100644
  }
  
  static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1338,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1338,7 +2082,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
  
@@ -985,7 +1320,7 @@ index 58fcb8e..0695d12 100644
  
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
-@@ -1390,14 +1896,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1390,14 +2134,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -1002,7 +1337,7 @@ index 58fcb8e..0695d12 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1446,10 +1952,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1446,10 +2190,84 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1016,31 +1351,22 @@ index 58fcb8e..0695d12 100644
 +{
 +      struct ds_udp_header hdr;
 +      struct msghdr msg;
-+      struct iovec miov[4];
++      struct iovec miov[8];
 +      struct ds_qp *qp;
 +
-+      if (iovcnt > 4)
++      if (iovcnt > 8)
 +              return ERR(ENOTSUP);
 +
-+      qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
 +      hdr.tag = htonll(DS_UDP_TAG);
 +      hdr.version = 1;
-+      if (qp) {
-+              hdr.sl = qp->sl;
-+              hdr.slid = htons(qp->lid);
-+              hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
-+      } else {
-+              hdr.sl = 0;
-+              hdr.slid = 0;
-+              hdr.qpn = 0;
-+      }
++      memset(&hdr->reserved, 0, sizeof hdr->reserved);
++      hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
 +
 +      miov[0].iov_base = &hdr;
 +      miov[0].iov_len = sizeof hdr;
 +      memcpy(&miov[1], iov, sizeof *iov * iovcnt);
 +
 +      memset(&msg, 0, sizeof msg);
-+      /* TODO: specify name if needed */
 +      msg.msg_iov = miov;
 +      msg.msg_iovlen = iovcnt + 1;
 +      return sendmsg(rs->fd, msg, flags);
@@ -1059,14 +1385,13 @@ index 58fcb8e..0695d12 100644
 +      struct ibv_sge sge;
 +      int ret = 0;
 +
-+      if (!rs->conn_dest || !rs->conn_dest->ah)
++      if (!rs->conn_dest->ah)
 +              return ds_send_udp(rs, buf, len, flags);
 +
-+      rs->sbytes_needed = len;
 +      if (!ds_can_send(rs)) {
-+              ret = rs_get_comp(rs, 1, ds_can_send);
++              ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
 +              if (ret)
-+                      return ds_send_udp(rs, buf, len, flags);
++                      return ret;
 +      }
 +
 +      if (len <= rs->sq_inline) {
@@ -1098,7 +1423,7 @@ index 58fcb8e..0695d12 100644
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1463,6 +2053,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2281,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1112,7 +1437,7 @@ index 58fcb8e..0695d12 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1484,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1484,7 +2309,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1121,7 +1446,7 @@ index 58fcb8e..0695d12 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1537,10 +2134,21 @@ out:
+@@ -1537,10 +2362,26 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
@@ -1133,20 +1458,25 @@ index 58fcb8e..0695d12 100644
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
 +                      return ERR(EISCONN);
--      return rsend(socket, buf, len, flags);
++
 +              return rsend(socket, buf, len, flags);
 +      }
-+
+-      return rsend(socket, buf, len, flags);
 +      fastlock_acquire(&rs->slock);
-+      ds_connect(rs, dest_addr, addrlen);
++      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
++              ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++              if (ret)
++                      goto out;
++      }
 +      ret = dsend(rs, buf, len, flags);
++out:
 +      fastlock_release(&rs->slock);
 +      return ret;
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1599,7 +2440,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1155,7 +1485,7 @@ index 58fcb8e..0695d12 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1652,7 +2260,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1652,7 +2493,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1164,7 +1494,7 @@ index 58fcb8e..0695d12 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1948,7 +2556,7 @@ int rshutdown(int socket, int how)
+@@ -1948,7 +2789,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1173,7 +1503,7 @@ index 58fcb8e..0695d12 100644
                return 0;
        }
  
-@@ -1958,10 +2566,10 @@ int rshutdown(int socket, int how)
+@@ -1958,10 +2799,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -1187,7 +1517,7 @@ index 58fcb8e..0695d12 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -2017,8 +2625,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2017,8 +2858,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1197,12 +1527,12 @@ index 58fcb8e..0695d12 100644
 +              rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
 +              return 0;
 +      } else {
-+              return getpeername(rs->fs, addr, addrlen);
++              return getpeername(rs->udp_sock, addr, addrlen);
 +      }
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2638,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +2871,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1212,17 +1542,17 @@ index 58fcb8e..0695d12 100644
 +              rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
 +              return 0;
 +      } else {
-+              return getsockname(rs->fd, addr, addrlen);
++              return getsockname(rs->udp_sock, addr, addrlen);
 +      }
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2039,6 +2655,12 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,18 +2888,26 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
 +      if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-+              ret = setsockopt(rs->fd, optname, optval, optlen);
++              ret = setsockopt(rs->udp_sock, optname, optval, optlen);
 +              if (ret)
 +                      return ret;
 +      }
@@ -1230,17 +1560,43 @@ index 58fcb8e..0695d12 100644
        switch (level) {
        case SOL_SOCKET:
                opts = &rs->so_opts;
-@@ -2156,6 +2778,9 @@ int rgetsockopt(int socket, int level, int optname,
-       int ret = 0;
-       rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
-+              return getsockopt(rs->fd, level, optname, optval, optlen);
-+
-       switch (level) {
-       case SOL_SOCKET:
                switch (optname) {
-@@ -2314,7 +2939,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+               case SO_REUSEADDR:
+-                      ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+-                                            RDMA_OPTION_ID_REUSEADDR,
+-                                            (void *) optval, optlen);
+-                      if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
+-                          rs->cm_id->context &&
+-                          (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
+-                              ret = 0;
++                      if (rs->type == SOCK_STREAM) {
++                              ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
++                                                    RDMA_OPTION_ID_REUSEADDR,
++                                                    (void *) optval, optlen);
++                              if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
++                                  rs->cm_id->context &&
++                                  (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
++                                      ret = 0;
++                      }
+                       opt_on = *(int *) optval;
+                       break;
+               case SO_RCVBUF:
+@@ -2100,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
+               opts = &rs->ipv6_opts;
+               switch (optname) {
+               case IPV6_V6ONLY:
+-                      ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+-                                            RDMA_OPTION_ID_AFONLY,
+-                                            (void *) optval, optlen);
++                      if (rs->type == SOCK_STREAM) {
++                              ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
++                                                    RDMA_OPTION_ID_AFONLY,
++                                                    (void *) optval, optlen);
++                      }
+                       opt_on = *(int *) optval;
+                       break;
+               default:
+@@ -2314,7 +3173,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -1249,7 +1605,7 @@ index 58fcb8e..0695d12 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +2973,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +3207,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -1258,7 +1614,7 @@ index 58fcb8e..0695d12 100644
        return offset;
  }
  
-@@ -2360,7 +2985,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +3219,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1267,7 +1623,7 @@ index 58fcb8e..0695d12 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2381,7 +3006,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3240,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -1276,7 +1632,7 @@ index 58fcb8e..0695d12 100644
        return ret;
  }
  
-@@ -2425,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2425,7 +3284,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1285,28 +1641,3 @@ index 58fcb8e..0695d12 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -2475,3 +3100,24 @@ out:
-       return (ret && left == count) ? ret : count - left;
- }
-+
-+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
-+                struct sockaddr *src_addr, socklen_t *addrlen)
-+{
-+      int ret;
-+
-+      ret = rrecv(socket, buf, len, flags);
-+      if (ret > 0 && src_addr)
-+              rgetpeername(socket, src_addr, addrlen);
-+
-+      return ret;
-+}
-+
-+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
-+              const struct sockaddr *dest_addr, socklen_t addrlen)
-+{
-+      if (dest_addr || addrlen)
-+              return ERR(EISCONN);
-+
-+      return usend(socket, buf, len, flags);
-+}
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index bf5f771..0000000
+++ /dev/null
@@ -1,948 +0,0 @@
-Bottom: da0048097eea01b21df587e85b3f7ac44a2582c8
-Top:    faabde8e748d27fc0f733e702ed7f3c3902d8f58
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-11-26 12:10:16 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/docs/rsocket b/docs/rsocket
-index 03d49df..a660208 100644
---- a/docs/rsocket
-+++ b/docs/rsocket
-@@ -194,7 +194,7 @@ application's buffer.
- Datagram Overview
- -----------------
--THe rsocket API supports datagram sockets.  Datagram support is handled through an
-+The rsocket API supports datagram sockets.  Datagram support is handled through an
- entirely different protocol and internal implementation.  Unlike connected rsockets,
- datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
- socket may use any number of network (IP) addresses, including those which map to
-@@ -204,32 +204,19 @@ UDP socket, plus zero or more UD QPs.
- Rsockets uses headers inserted before user data sent over UDP sockets to resolve
- remote UD QP numbers.  When a user first attempts to send a datagram to a remote
--address (IP and UDP port), rsockets will take the following steps.  First, it
--will allocate and store the destination address into a lookup table for future
--use.  Then it will resolve which local network address should be used when sending
--to the specified destination.  It will allocate a UD QP on the RDMA device
--associated with the local address.  Finally, rsockets will send the user's
--datagram to the remote UDP socket, but inserting a header before the datagram.
--The header specifies the UD QP number associated with the local network address
--(IP and UDP port) of the send.
--
--Under many circumstances, the rsocket UDP header also provides enough path
--information for the receiver to send the reply using a UD QP.  However, certain
--fabric topologies may require contacting a subnet administrator.  Whenever
--rsockets lacks sufficient information to send data to a remote peer using
--a UD QP, it will automatically fall back to using a standard UDP socket.  This
--helps minimize the latency for performing a given send, while lenghty address
--and route resolution protocols complete.
--
--Currently, rsockets allocates a single UD QP, with an infrastructure is provided
--to support more.  Future enhancements may add support for multiple UD QPs, each
--associated with a single network (IP) address.  Multiplexing different network
--addresses over a single UD QP and using shared receive queues are other possible
--enhancements.
--
--Because rsockets may use multiple UD QPs, buffer management can become an issue.
--The total size of receive buffers is specified by the mem_default configuration
--files, but may be overridden with rsetsockopt.  The buffer is divided into
--MTU sized messages and distributed among the allocated QPs.  As new QPs are
--created, the receive buffers may be redistributed by posting loopback sends
--on a QP in order to free the receive buffers for reposting to a different QP.  
-\ No newline at end of file
-+address (IP and UDP port), rsockets will take the following steps:
-+
-+1. Store the destination address into a lookup table.
-+2. Resolve which local network address should be used when sending
-+   to the specified destination.
-+3. Allocate a UD QP on the RDMA device associated with the local address.
-+4. Send the user's datagram to the remote UDP socket.
-+
-+A header is inserted before the user's datagram.  The header specifies the
-+UD QP number associated with the local network address (IP and UDP port) of
-+the send.
-+
-+A service thread is used to process messages received on the UDP socket.  This
-+thread updates the rsocket lookup tables with the remote QPN and path record
-+data.  The service thread forwards data received on the UDP socket to an
-+rsocket QP.
-\ No newline at end of file
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 0695d12..a81b8f3 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -46,6 +46,7 @@
- #include <string.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
-+#include <sys/epoll.h>
- #include <rdma/rdma_cma.h>
- #include <rdma/rdma_verbs.h>
-@@ -116,6 +117,14 @@ enum {
- #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
- #define rs_msg_op(imm_data)   (imm_data >> 29)
- #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-+#define RS_RECV_WR_ID (~((uint64_t) 0))
-+
-+#define DS_WR_RECV 0xFFFFFFFF
-+#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
-+#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
-+#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
-+#define ds_wr_length(wr_id) ((uint32_t) wr_id)
-+#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
- enum {
-       RS_CTRL_DISCONNECT,
-@@ -129,8 +138,14 @@ struct rs_msg {
- struct ds_qp;
--struct ds_msg {
--      struct ds_qp *qp;
-+struct ds_rmsg {
-+      struct ds_qp    *qp;
-+      uint32_t        offset;
-+      uint32_t        length;
-+};
-+
-+struct ds_smsg {
-+      struct ds_smsg  *next;
- };
- struct rs_sge {
-@@ -167,8 +182,6 @@ struct rs_conn_data {
-       struct rs_sge     data_buf;
- };
--#define RS_RECV_WR_ID (~((uint64_t) 0))
--
- /*
-  * rsocket states are ordered as passive, connecting, connected, disconnected.
-  */
-@@ -203,12 +216,11 @@ struct ds_qp {
-       struct rsocket    *rs;
-       struct rdma_cm_id *cm_id;
-+      struct ibv_mr     *smr;
-       struct ibv_mr     *rmr;
-       uint8_t           *rbuf;
--      struct ibv_mr     *smr;
--      uint16_t          lid;
--      uint8_t           sl;
-+      int               cq_armed;
- };
- struct ds_dest {
-@@ -231,6 +243,13 @@ struct rsocket {
-               /* data stream */
-               struct {
-                       struct rdma_cm_id *cm_id;
-+                      uint64_t          tcp_opts;
-+
-+                      int               ctrl_avail;
-+                      uint16_t          sseq_no;
-+                      uint16_t          sseq_comp;
-+                      uint16_t          rseq_no;
-+                      uint16_t          rseq_comp;
-                       int               remote_sge;
-                       struct rs_sge     remote_sgl;
-@@ -242,63 +261,58 @@ struct rsocket {
-                       void              *target_buffer_list;
-                       volatile struct rs_sge    *target_sgl;
-                       struct rs_iomap   *target_iomap;
-+
-+                      int               rbuf_bytes_avail;
-+                      int               rbuf_free_offset;
-+                      int               rbuf_offset;
-+                      struct ibv_mr     *rmr;
-+                      uint8_t           *rbuf;
-+
-+                      int               sbuf_bytes_avail;
-+                      struct ibv_mr     *smr;
-+                      struct ibv_sge    ssgl[2];
-               };
-               /* datagram */
-               struct {
--                      dlist_t           qp_list;
-+                      struct ds_qp      *qp_list;
-                       void              *dest_map;
-                       struct ds_dest    *conn_dest;
--                      struct pollfd     *fds;
--                      nfds_t            nfds;
--                      int               dsock;
--                      int               sbytes_needed;
-+
-+                      int               udp_sock;
-+                      int               epfd;
-+                      int               rqe_avail;
-+                      struct ds_smsg    *smsg_free;
-               };
-       };
-       int               opts;
-       long              fd_flags;
-       uint64_t          so_opts;
--      uint64_t          tcp_opts;
-       uint64_t          ipv6_opts;
-       int               state;
-       int               cq_armed;
-       int               retries;
-       int               err;
--      int               ctrl_avail;
-       int               sqe_avail;
--      int               sbuf_bytes_avail;
--      uint16_t          sseq_no;
--      uint16_t          sseq_comp;
-+      uint32_t          sbuf_size;
-       uint16_t          sq_size;
-       uint16_t          sq_inline;
-+      uint32_t          rbuf_size;
-       uint16_t          rq_size;
--      uint16_t          rseq_no;
--      uint16_t          rseq_comp;
--      int               rbuf_bytes_avail;
--      int               rbuf_free_offset;
--      int               rbuf_offset;
-       int               rmsg_head;
-       int               rmsg_tail;
-       union {
-               struct rs_msg     *rmsg;
--              struct ds_msg     *dmsg;
-+              struct ds_rmsg    *dmsg;
-       };
-+      uint8_t           *sbuf;
-       struct rs_iomap_mr *remote_iomappings;
-       dlist_entry       iomap_list;
-       dlist_entry       iomap_queue;
-       int               iomap_pending;
--
--      uint32_t          rbuf_size;
--      struct ibv_mr     *rmr;
--      uint8_t           *rbuf;
--
--      uint32_t          sbuf_size;
--      struct ibv_mr     *smr;
--      struct ibv_sge    ssgl[2];
--      uint8_t           *sbuf;
- };
- #define DS_UDP_TAG 0x5555555555555555ULL
-@@ -306,12 +320,32 @@ struct rsocket {
- struct ds_udp_header {
-       uint64_t          tag;
-       uint8_t           version;
--      uint8_t           sl;
--      uint16_t          slid;
-+      uint8_t           reserved[3];
-       uint32_t          qpn;  /* upper 8-bits reserved */
- };
-+#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
-+
-+static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
-+{
-+      if (!rs->qp_list)
-+              list_init(&qp->list);
-+      else
-+              list_insert_head(&qp->list, &rs->qp_list->list);
-+      rs->qp_list = *qp;
-+}
-+
-+static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
-+{
-+      if (qp->list.next != qp->list) {
-+              rs->qp_list = ds_next_qp(qp);
-+              dlist_remove(&qp->list);
-+      } else {
-+              rs->qp_list = NULL;
-+      }
-+}
-+
- static int rs_svc_run(void *arg)
- {
-       return 0;
-@@ -468,7 +502,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
-       rs->type = type;
-       rs->index = -1;
--      rs->dsock = -1;
-+      rs->udp_sock = -1;
-+      rs->epfd = -1;
-+
-       if (inherited_rs) {
-               rs->sbuf_size = inherited_rs->sbuf_size;
-               rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -496,18 +532,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
-       return rs;
- }
--/*
-- * TODO: Support datagram rsockets
-- */
- static int rs_set_nonblocking(struct rsocket *rs, long arg)
- {
-+      struct ds_qp *qp;
-       int ret = 0;
--      if (rs->cm_id->recv_cq_channel)
--              ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
-+      if (rs->type == SOCK_STREAM) {
-+              if (rs->cm_id->recv_cq_channel)
-+                      ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
--      if (!ret && rs->state < rs_connected)
--              ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
-+              if (!ret && rs->state < rs_connected)
-+                      ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
-+      } else {
-+              ret = fcntl(rs->epfd, F_SETFL, arg);
-+
-+              if (!ret && rs->qp_list) {
-+                      qp = rs->qp_list;
-+                      do {
-+                              ret = fcntl(qp->cm_id->recv_cq_channel->fd,
-+                                          F_SETFL, arg);
-+                              qp = ds_next_qp(qp);
-+                      } while (qp != rs->qp_list && !ret);
-+              }
-+      }
-       return ret;
- }
-@@ -531,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
-               rs->rq_size = 2;
- }
-+static void ds_set_qp_size(struct rsocket *rs)
-+{
-+      uint16_t max_size;
-+
-+      max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
-+
-+      if (rs->sq_size > max_size)
-+              rs->sq_size = max_size;
-+      if (rs->rq_size > max_size)
-+              rs->rq_size = max_size;
-+
-+      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-+              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-+      else
-+              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
-+
-+      if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
-+              rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
-+      else
-+              rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-+}
-+
- static int rs_init_bufs(struct rsocket *rs)
- {
-       size_t len;
-       rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
-       if (!rs->rmsg)
--              return -1;
-+              return ERR(ENOEMEM);
-       rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
-       if (!rs->sbuf)
--              return -1;
-+              return ERR(ENOEMEM);
-       rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
-       if (!rs->smr)
-@@ -551,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
-             sizeof(*rs->target_iomap) * rs->target_iomap_size;
-       rs->target_buffer_list = malloc(len);
-       if (!rs->target_buffer_list)
--              return -1;
-+              return ERR(ENOEMEM);
-       rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
-       if (!rs->target_mr)
-@@ -564,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
-       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
-       if (!rs->rbuf)
--              return -1;
-+              return ERR(ENOEMEM);
-       rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
-       if (!rs->rmr)
-@@ -648,7 +717,7 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
-       sge.length = RS_SNDLOWAT;
-       sge.lkey = qp->rmr;
--      wr.wr_id = RS_RECV_WR_ID;
-+      wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
-       wr.next = NULL;
-       wr.sg_list = &sge;
-       wr.num_sge = 1;
-@@ -736,7 +805,8 @@ static void ds_free_qp(struct ds_qp *qp)
-       if (qp->cm_id) {
-               if (qp->cm_id->qp) {
--                      dlist_remove(&qp->list);
-+                      epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
-+                                qp->cm_id->recv_cq_channel->fd, NULL);
-                       rdma_destroy_qp(qp->cm_id);
-               }
-               rdma_destroy_id(qp->cm_id);
-@@ -744,31 +814,32 @@ static void ds_free_qp(struct ds_qp *qp)
-       free(qp);
- }
--static void ds_free_qps(struct rsocket *rs)
-+static void ds_free(struct rsocket *rs)
- {
-       struct ds_qp *qp;
--      dlist_t *entry;
--      while (!dlist_empty(&rs->qp_list)) {
--              qp = container_of(rs->qp_list.next, struct ds_qp, list);
--              ds_free_qp(qp);
--      }
--}
--
--static void ds_free(struct rsocket *rs)
--{
-       if (rs->state & (rs_readable | rs_writable))
-               rs_svc_remove(rs);
--      if (rs->dsock >= 0)
--              close(rs->dsock);
-+      if (rs->udp_sock >= 0)
-+              close(rs->udp_sock);
-       if (rs->index >= 0)
-               rs_remove(rs);
--      ds_free_qps(rs);
--      if (rs->fds)
--              free(rs->fds);
-+      if (rs->dmsg)
-+              free(rs->dmsg);
-+
-+      if (rs->smsg)
-+              free(rs->smsg);
-+
-+      while (rs->qp_list) {
-+              ds_remove_qp(rs, rs->qp_list);
-+              ds_free_qp(rs->qp_list);
-+      }
-+
-+      if (rs->epfd >= 0)
-+              close(rs->epfd);
-       if (rs->sbuf)
-               free(rs->sbuf);
-@@ -875,15 +946,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
- static int ds_init(struct rsocket *rs, int domain)
- {
--      rs->dsock = socket(domain, SOCK_DGRAM, 0);
--      if (rs->dsock < 0)
--              return rs->dsock;
-+      rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
-+      if (rs->udp_sock < 0)
-+              return rs->udp_sock;
--      rs->fds = calloc(1, sizeof *fds);
--      if (!rs->fds)
--              return ERR(ENOMEM);
--      rs->nfds = 1;
--      dlist_init(&rs->qp_list);
-+      rs->epfd = epoll_create(0);
-+      if (rs->epfd < 0)
-+              return rs->epfd;
-       return 0;
- }
-@@ -916,7 +985,7 @@ int rsocket(int domain, int type, int protocol)
-               if (ret)
-                       goto err;
--              index = rs->dsock;
-+              index = rs->udp_sock;
-       }
-       ret = rs_insert(rs, index);
-@@ -941,7 +1010,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               if (!ret)
-                       rs->state = rs_bound;
-       } else {
--              ret = bind(rs->dsock, addr, addrlen);
-+              ret = bind(rs->udp_sock, addr, addrlen);
-               if (!ret) {
-                       ret = rs_svc_insert(rs);
-                       if (!ret)
-@@ -1130,21 +1199,30 @@ connected:
- static int ds_init_ep(struct rsocket *rs)
- {
--      int ret;
-+      struct ds_smsg *msg;
-+      int i, ret;
--      rs_set_qp_size(rs);
--      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
--              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
--      else
--              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
-+      ds_set_qp_size(rs);
--      rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
-+      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-       if (!rs->sbuf)
-               return ERR(ENOMEM);
--      rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
-+      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+      if (!rs->dmsg)
-+              return ERR(ENOEMEM);
-+
-       rs->sbuf_bytes_avail = rs->sbuf_size;
-       rs->sqe_avail = rs->sq_size;
-+      rs->rqe_avail = rs->rq_size;
-+
-+      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+      msg = rs->smsg_free;
-+      for (i = 0; i < rs->sq_size - 1; i++) {
-+              msg->next = (void *) msg + i * RS_SNDLOWAT;
-+              msg = msg->next;
-+      }
-+      msg->next = NULL;
-       ret = rs_svc_insert(rs);
-       if (ret)
-@@ -1186,7 +1264,7 @@ static int ds_get_src_addr(struct rsocket *rs,
-       uint16_t port;
-       *src_len = sizeof src_addr;
--      ret = getsockname(rs->dsock, &src_addr->sa, src_len);
-+      ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
-       if (ret || !rs_any_addr(src_addr))
-               return ret;
-@@ -1211,6 +1289,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-                       socklen_t addrlen, struct ds_qp **qp)
- {
-       struct ibv_qp_init_attr qp_attr;
-+      struct epoll_event event;
-       int ret;
-       *qp = calloc(1, sizeof(struct ds_qp));
-@@ -1245,17 +1324,24 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-       qp_attr.cap.max_send_sge = 2;
-       qp_attr.cap.max_recv_sge = 1;
-       qp_attr.cap.max_inline_data = rs->sq_inline;
--
-       ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
-       if (ret)
--              return ret;
-+              goto err;
-+
-+      event.events = EPOLLIN | EPOLLOUT;
-+      event.data.ptr = *qp;
-+      ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
-+                      (*qp)->cm_id->recv_cq_channel->fd, &event);
-+      if (ret)
-+              goto err;
-       for (i = 0; i < rs->rq_size; i++) {
-               ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
-               if (ret)
-                       goto err;
-       }
--      list_insert_head(&(*qp)->list, &rs->qp_list);
-+
-+      ds_insert_qp(rs, *qp);
-       return 0;
- err:
-       ds_free_qp(*qp);
-@@ -1265,12 +1351,14 @@ err:
- static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
-                    socklen_t addrlen, struct ds_qp **qp)
- {
--      dlist_t *entry;
-+      if (rs->qp_list) {
-+              *qp = rs->qp_list;
-+              do {
-+                      if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-+                              return 0;
--      for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
--              *qp = container_of(entry, struct ds_qp, list);
--              if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
--                      return 0;
-+                      *qp = ds_next_qp(*qp);
-+              } while (*qp != rs->qp_list);
-       }
-       return ds_create_qp(rs, src_addr, addrlen, qp);
-@@ -1328,7 +1416,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               ret = rs_do_connect(rs);
-       } else {
-               fastlock_acquire(&rs->slock);
--              ret = connect(rs->dsock, addr, addrlen);
-+              ret = connect(rs->udp_sock, addr, addrlen);
-               if (!ret)
-                       ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-               fastlock_release(&rs->slock);
-@@ -1428,9 +1516,12 @@ static int ds_send_data(struct rsocket *rs,
-                       struct ibv_sge *sgl, int nsge,
-                       uint32_t length, int flags)
- {
-+      uint64_t offset;
-+
-       rs->sqe_avail--;
-       rs->sbuf_bytes_avail -= length;
--      return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
-+      offset = sgl->addr - (uintptr_t) rs->sbuf;
-+      return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
- }
- static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-@@ -1688,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-       return ret;
- }
-+/*
-+ * Poll all CQs associated with a datagram rsocket.  We need to drop any
-+ * received messages that we do not have room to store.  To limit drops,
-+ * we only poll if we have room to store the receive or we need a send
-+ * buffer.  To ensure fairness, we poll the CQs round robin, remembering
-+ * where we left off.
-+ */
-+static void ds_poll_cqs(struct rsocket *rs)
-+{
-+      struct ds_qp *qp;
-+      struct ds_smsg *msg;
-+      struct ibv_wc wc;
-+      int ret, cnt;
-+
-+      qp = rs->qp_list;
-+      do {
-+              cnt = 0;
-+              do {
-+                      ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
-+                      if (ret <= 0) {
-+                              qp = ds_next_qp(qp);
-+                              continue;
-+                      }
-+
-+                      if (ds_wr_is_recv(wc.wr_id)) {
-+                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
-+                                      rs->rqe_avail--;
-+                                      rs->dmsg[rs->rmsg_tail].qp = qp;
-+                                      if (++rs->rmsg_tail == rs->rq_size + 1)
-+                                              rs->rmsg_tail = 0;
-+                              } else {
-+                                      ds_post_recv(rs, qp, qp->rbuf +
-+                                                           ds_wr_offset(wc.wr_id));
-+                              }
-+                      } else {
-+                              if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
-+                                      msg = (struct ds_smsg *)
-+                                            (rs->sbuf + ds_wr_offset(wc.wr_id));
-+                                      msg->next = rs->smsg_free;
-+                                      rs->smsg_free = msg;
-+                              }
-+                              rs->sqe_avail++;
-+                      }
-+
-+                      qp = ds_next_qp(qp);
-+                      if (!rs->rqe_avail && rs->sqe_avail) {
-+                              rs->qp_list = qp;
-+                              return;
-+                      }
-+                      cnt++;
-+              } while (qp != rs->qp_list);
-+      } while (cnt);
-+}
-+
-+static void ds_req_notify_cqs(struct rsocket *rs)
-+{
-+      struct ds_qp *qp;
-+
-+      qp = rs->qp_list;
-+      do {
-+              if (!qp->cq_armed) {
-+                      ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
-+                      qp->cq_armed = 1;
-+              }
-+              qp = ds_next_qp(qp);
-+      } while (qp != rs->qp_list);
-+}
-+
-+static int ds_get_cq_event(struct rsocket *rs)
-+{
-+      struct epoll_event event;
-+      struct ds_qp *qp;
-+      struct ibv_cq *cq;
-+      void *context;
-+      int ret;
-+
-+      if (!rs->cq_armed)
-+              return 0;
-+
-+      ret = epoll_wait(rs->epfd, &event, 1, -1);
-+      if (ret <= 0)
-+              return ret;
-+
-+      qp = event.data.ptr;
-+      ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
-+      if (!ret) {
-+              ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
-+              qp->cq_armed = 0;
-+              rs->cq_armed = 0;
-+      }
-+
-+      return ret;
-+}
-+
-+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
-+{
-+      int ret = 0;
-+
-+      fastlock_acquire(&rs->cq_lock);
-+      do {
-+              ds_poll_cqs(rs);
-+              if (test(rs)) {
-+                      ret = 0;
-+                      break;
-+              } else if (nonblock) {
-+                      ret = ERR(EWOULDBLOCK);
-+              } else if (!rs->cq_armed) {
-+                      ds_req_notify_cqs(rs);
-+                      rs->cq_armed = 1;
-+              } else {
-+                      rs_update_credits(rs);
-+                      fastlock_acquire(&rs->cq_wait_lock);
-+                      fastlock_release(&rs->cq_lock);
-+
-+                      ret = ds_get_cq_event(rs);
-+                      fastlock_release(&rs->cq_wait_lock);
-+                      fastlock_acquire(&rs->cq_lock);
-+              }
-+      } while (!ret);
-+
-+      fastlock_release(&rs->cq_lock);
-+      return ret;
-+}
-+
-+static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
-+{
-+      struct timeval s, e;
-+      uint32_t poll_time = 0;
-+      int ret;
-+
-+      do {
-+              ret = ds_process_cqs(rs, 1, test);
-+              if (!ret || nonblock || errno != EWOULDBLOCK)
-+                      return ret;
-+
-+              if (!poll_time)
-+                      gettimeofday(&s, NULL);
-+
-+              gettimeofday(&e, NULL);
-+              poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
-+                          (e.tv_usec - s.tv_usec) + 1;
-+      } while (poll_time <= polling_time);
-+
-+      ret = ds_process_cqs(rs, 0, test);
-+      return ret;
-+}
-+
- static int rs_nonblocking(struct rsocket *rs, int flags)
- {
-       return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1721,7 +1959,7 @@ static int rs_can_send(struct rsocket *rs)
- static int ds_can_send(struct rsocket *rs)
- {
--      return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
- }
- static int rs_conn_can_send(struct rsocket *rs)
-@@ -1961,31 +2199,22 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- {
-       struct ds_udp_header hdr;
-       struct msghdr msg;
--      struct iovec miov[4];
-+      struct iovec miov[8];
-       struct ds_qp *qp;
--      if (iovcnt > 4)
-+      if (iovcnt > 8)
-               return ERR(ENOTSUP);
--      qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
-       hdr.tag = htonll(DS_UDP_TAG);
-       hdr.version = 1;
--      if (qp) {
--              hdr.sl = qp->sl;
--              hdr.slid = htons(qp->lid);
--              hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
--      } else {
--              hdr.sl = 0;
--              hdr.slid = 0;
--              hdr.qpn = 0;
--      }
-+      memset(&hdr->reserved, 0, sizeof hdr->reserved);
-+      hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-       miov[0].iov_base = &hdr;
-       miov[0].iov_len = sizeof hdr;
-       memcpy(&miov[1], iov, sizeof *iov * iovcnt);
-       memset(&msg, 0, sizeof msg);
--      /* TODO: specify name if needed */
-       msg.msg_iov = miov;
-       msg.msg_iovlen = iovcnt + 1;
-       return sendmsg(rs->fd, msg, flags);
-@@ -2004,14 +2233,13 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-       struct ibv_sge sge;
-       int ret = 0;
--      if (!rs->conn_dest || !rs->conn_dest->ah)
-+      if (!rs->conn_dest->ah)
-               return ds_send_udp(rs, buf, len, flags);
--      rs->sbytes_needed = len;
-       if (!ds_can_send(rs)) {
--              ret = rs_get_comp(rs, 1, ds_can_send);
-+              ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-               if (ret)
--                      return ds_send_udp(rs, buf, len, flags);
-+                      return ret;
-       }
-       if (len <= rs->sq_inline) {
-@@ -2145,8 +2373,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-       }
-       fastlock_acquire(&rs->slock);
--      ds_connect(rs, dest_addr, addrlen);
-+      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+              ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+              if (ret)
-+                      goto out;
-+      }
-       ret = dsend(rs, buf, len, flags);
-+out:
-       fastlock_release(&rs->slock);
-       return ret;
- }
-@@ -2629,7 +2862,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
-               rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
-               return 0;
-       } else {
--              return getpeername(rs->fs, addr, addrlen);
-+              return getpeername(rs->udp_sock, addr, addrlen);
-       }
- }
-@@ -2642,7 +2875,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-               rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
-               return 0;
-       } else {
--              return getsockname(rs->fd, addr, addrlen);
-+              return getsockname(rs->udp_sock, addr, addrlen);
-       }
- }
-@@ -2656,7 +2889,7 @@ int rsetsockopt(int socket, int level, int optname,
-       ret = ERR(ENOTSUP);
-       rs = idm_at(&idm, socket);
-       if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
--              ret = setsockopt(rs->fd, optname, optval, optlen);
-+              ret = setsockopt(rs->udp_sock, optname, optval, optlen);
-               if (ret)
-                       return ret;
-       }
-@@ -2666,13 +2899,15 @@ int rsetsockopt(int socket, int level, int optname,
-               opts = &rs->so_opts;
-               switch (optname) {
-               case SO_REUSEADDR:
--                      ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
--                                            RDMA_OPTION_ID_REUSEADDR,
--                                            (void *) optval, optlen);
--                      if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
--                          rs->cm_id->context &&
--                          (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
--                              ret = 0;
-+                      if (rs->type == SOCK_STREAM) {
-+                              ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-+                                                    RDMA_OPTION_ID_REUSEADDR,
-+                                                    (void *) optval, optlen);
-+                              if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
-+                                  rs->cm_id->context &&
-+                                  (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
-+                                      ret = 0;
-+                      }
-                       opt_on = *(int *) optval;
-                       break;
-               case SO_RCVBUF:
-@@ -2722,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
-               opts = &rs->ipv6_opts;
-               switch (optname) {
-               case IPV6_V6ONLY:
--                      ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
--                                            RDMA_OPTION_ID_AFONLY,
--                                            (void *) optval, optlen);
-+                      if (rs->type == SOCK_STREAM) {
-+                              ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-+                                                    RDMA_OPTION_ID_AFONLY,
-+                                                    (void *) optval, optlen);
-+                      }
-                       opt_on = *(int *) optval;
-                       break;
-               default:
-@@ -2778,9 +3015,6 @@ int rgetsockopt(int socket, int level, int optname,
-       int ret = 0;
-       rs = idm_at(&idm, socket);
--      if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
--              return getsockopt(rs->fd, level, optname, optval, optlen);
--
-       switch (level) {
-       case SOL_SOCKET:
-               switch (optname) {
-@@ -3100,24 +3334,3 @@ out:
-       return (ret && left == count) ? ret : count - left;
- }
--
--ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
--                struct sockaddr *src_addr, socklen_t *addrlen)
--{
--      int ret;
--
--      ret = rrecv(socket, buf, len, flags);
--      if (ret > 0 && src_addr)
--              rgetpeername(socket, src_addr, addrlen);
--
--      return ret;
--}
--
--ssize_t usendto(int socket, const void *buf, size_t len, int flags,
--              const struct sockaddr *dest_addr, socklen_t addrlen)
--{
--      if (dest_addr || addrlen)
--              return ERR(EISCONN);
--
--      return usend(socket, buf, len, flags);
--}