]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Wed, 21 Nov 2012 07:43:14 +0000 (23:43 -0800)
committerSean Hefty <sean.hefty@intel.com>
Wed, 21 Nov 2012 07:43:14 +0000 (23:43 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index 9f1c2dfb958b61557071cb654b350f54d15bf00d..a928e973f25d25c77725ba1b9fdf8ead60403d60 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: c44192b95c3530520ecc46d6741a8b17685d7945
-Head: 817eacc979df6c00af54c0c00eebb8b25f496dde
+Previous: 7b392fb54627148faed076bf44fd1a75f0b13937
+Head: 7029a9a237f690cfe1fd840d35a93647ca447ab5
 Applied:
-  dsocket: 158ea5a8c1a0fcf3ca61c642095023af16759c90
-  refresh-temp: 817eacc979df6c00af54c0c00eebb8b25f496dde
+  dsocket: 7029a9a237f690cfe1fd840d35a93647ca447ab5
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
index 7e35c6f3d8d61d316a67aad3998379be623c5389..59d04caf037aa2c9f85b352a1ad1427129b37b8f 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top:    97a52629c221cba1033082bbd308ecfc4d4b6082
+Top:    da0048097eea01b21df587e85b3f7ac44a2582c8
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -74,11 +74,68 @@ index 1484f65..03d49df 100644
 +created, the receive buffers may be redistributed by posting loopback sends
 +on a QP in order to free the receive buffers for reposting to a different QP.  
 \ No newline at end of file
+diff --git a/src/cma.c b/src/cma.c
+index 91bf108..2c6b032 100755
+--- a/src/cma.c
++++ b/src/cma.c
+@@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
+ int ucma_max_qpsize(struct rdma_cm_id *id)
+ {
+       struct cma_id_private *id_priv;
++      int i, max_size = 0;
+       id_priv = container_of(id, struct cma_id_private, id);
+-      return id_priv->cma_dev->max_qpsize;
++      if (id && id_priv->cma_dev) {
++              max_size = id_priv->cma_dev->max_qpsize;
++      } else {
++              for (i = 0; i < cma_dev_cnt; i++) {
++                      if (!max_size || max_size > cma_dev_array[i].max_qpsize)
++                              max_size = cma_dev_array[i].max_qpsize;
++              }
++      }
++      return max_size;
+ }
+ uint16_t ucma_get_port(struct sockaddr *addr)
 diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..99e638c 100644
+index 58fcb8e..0695d12 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
-@@ -110,6 +110,12 @@ struct rs_msg {
+@@ -55,7 +55,7 @@
+ #define RS_OLAP_START_SIZE 2048
+ #define RS_MAX_TRANSFER 65536
+-#define RS_SNDLOWAT 64
++#define RS_SNDLOWAT 2048
+ #define RS_QP_MAX_SIZE 0xFFFE
+ #define RS_QP_CTRL_SIZE 4
+ #define RS_CONN_RETRIES 6
+@@ -63,6 +63,23 @@
+ static struct index_map idm;
+ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
++enum {
++      RS_SVC_INSERT,
++      RS_SVC_REMOVE
++};
++
++struct rsocket;
++
++struct rs_svc_msg {
++      uint32_t op;
++      uint32_t status;
++      struct rsocket *rs;
++};
++
++static pthread_t svc_id;
++static int svc_cnt;
++static int svc_fds[2];
++
+ static uint16_t def_iomap_size = 0;
+ static uint16_t def_inline = 64;
+ static uint16_t def_sqsize = 384;
+@@ -110,6 +127,12 @@ struct rs_msg {
        uint32_t data;
  };
  
@@ -91,7 +148,20 @@ index 58fcb8e..99e638c 100644
  struct rs_sge {
        uint64_t addr;
        uint32_t key;
-@@ -169,13 +175,63 @@ enum rs_state {
+@@ -159,9 +182,9 @@ enum rs_state {
+       rs_connecting      = rs_opening |   0x0040,
+       rs_accepting       = rs_opening |   0x0080,
+       rs_connected       =                0x0100,
+-      rs_connect_wr      =                0x0200,
+-      rs_connect_rd      =                0x0400,
+-      rs_connect_rdwr    = rs_connected | rs_connect_rd | rs_connect_wr,
++      rs_writable        =                0x0200,
++      rs_readable        =                0x0400,
++      rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
+       rs_connect_error   =                0x0800,
+       rs_disconnected    =                0x1000,
+       rs_error           =                0x2000,
+@@ -169,13 +192,68 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -103,18 +173,23 @@ index 58fcb8e..99e638c 100644
 +};
 +
 +struct ds_qp {
++      dlist_t           list;
++      struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
-+      int               rbuf_cnt;
++
++      struct ibv_mr     *rmr;
++      uint8_t           *rbuf;
++
++      struct ibv_mr     *smr;
 +      uint16_t          lid;
 +      uint8_t           sl;
 +};
 +
 +struct ds_dest {
-+      union socket_addr addr;
++      union socket_addr addr; /* must be first */
 +      struct ds_qp      *qp;
 +      struct ibv_ah     *ah;
 +      uint32_t           qpn;
-+      atomic_t           refcnt;
 +};
 +
 +struct rsocket {
@@ -145,19 +220,19 @@ index 58fcb8e..99e638c 100644
 +              };
 +              /* datagram */
 +              struct {
-+                      struct ds_qp      *qp_array;
++                      dlist_t           qp_list;
 +                      void              *dest_map;
 +                      struct ds_dest    *conn_dest;
 +                      struct pollfd     *fds;
 +                      nfds_t            nfds;
-+                      int               fd;
++                      int               dsock;
 +                      int               sbytes_needed;
 +              };
 +      };
  
        int               opts;
        long              fd_flags;
-@@ -186,7 +242,7 @@ struct rsocket {
+@@ -186,7 +264,7 @@ struct rsocket {
        int               cq_armed;
        int               retries;
        int               err;
@@ -166,7 +241,7 @@ index 58fcb8e..99e638c 100644
        int               ctrl_avail;
        int               sqe_avail;
        int               sbuf_bytes_avail;
-@@ -203,34 +259,37 @@ struct rsocket {
+@@ -203,34 +281,93 @@ struct rsocket {
        int               rbuf_offset;
        int               rmsg_head;
        int               rmsg_tail;
@@ -214,11 +289,67 @@ index 58fcb8e..99e638c 100644
 +      uint32_t          qpn;  /* upper 8-bits reserved */
 +};
 +
++
++static int rs_svc_run(void *arg)
++{
++      return 0;
++}
++
++static int rs_svc_insert(struct rsocket *rs)
++{
++      struct rs_svc_msg msg;
++      int ret;
++
++      pthread_mutex_lock(&mut);
++      if (!svc_cnt) {
++              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
++              if (ret)
++                      goto out;
++
++              ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
++              if (ret) {
++                      close(svc_fds[0]);
++                      close(svc_fds[1]);
++                      ret = ERR(ret);
++                      goto out;
++              }
++      }
++
++      msg.op = RS_SVC_INSERT;
++      msg.status = EINVAL;
++      msg.rs = rs;
++      svc_cnt++;
++      write(svc_fds[0], &msg, sizeof msg);
++      read(svc_fds[0], &msg, sizeof msg);
++      ret = ERR(msg.status);
++out:
++      pthread_mutex_unlock(&mut);
++      return ret;
++}
++
++static int rs_svc_remove(struct rsocket *rs)
++{
++      struct rs_svc_msg msg;
++      int ret;
++
++      pthread_mutex_lock(&mut);
++      msg.op = RS_SVC_REMOVE;
++      msg.status = EINVAL;
++      msg.rs = rs;
++      write(svc_fds[0], &msg, sizeof msg);
++      read(svc_fds[0], &msg, sizeof msg);
++      ret = ERR(msg.status);
++      if (!ret && !--svn_cnt)
++              pthread_join(svc_id, NULL);
++
++      pthread_mutex_unlock(&mut);
++      return ret;
++}
 +
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -306,10 +365,10 @@ out:
+@@ -306,10 +443,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -231,7 +362,7 @@ index 58fcb8e..99e638c 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +458,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -240,17 +371,17 @@ index 58fcb8e..99e638c 100644
  {
        struct rsocket *rs;
  
-@@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +466,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
 +      rs->type = type;
        rs->index = -1;
-+      rs->fd = -1;
++      rs->dsock = -1;
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,12 +490,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
@@ -259,34 +390,211 @@ index 58fcb8e..99e638c 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
-@@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs)
-               rdma_destroy_id(rs->cm_id);
+ }
++/*
++ * TODO: Support datagram rsockets
++ */
+ static int rs_set_nonblocking(struct rsocket *rs, long arg)
+ {
+       int ret = 0;
+@@ -439,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
+       return 0;
+ }
+-static int rs_create_cq(struct rsocket *rs)
++static int ds_init_bufs(struct ds_qp *qp)
+ {
+-      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+-      if (!rs->cm_id->recv_cq_channel)
++      qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
++      if (!qp->rbuf)
++              return ERR(ENOMEM);
++
++      qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
++      if (!qp->smr)
++              return -1;
++
++      qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
++      if (!qp->rmr)
+               return -1;
+-      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+-                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+-      if (!rs->cm_id->recv_cq)
++      return 0;
++}
++
++static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
++{
++      cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
++      if (!cm_id->recv_cq_channel)
++              return -1;
++
++      cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
++                                     cm_id, cm_id->recv_cq_channel, 0);
++      if (!cm_id->recv_cq)
+               goto err1;
+       if (rs->fd_flags & O_NONBLOCK) {
+@@ -455,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
+                       goto err2;
        }
  
--      fastlock_destroy(&rs->iomap_lock);
-+      if (rs->fd >= 0)
-+              close(rs->fd);
+-      rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
+-      rs->cm_id->send_cq = rs->cm_id->recv_cq;
++      cm_id->send_cq_channel = cm_id->recv_cq_channel;
++      cm_id->send_cq = cm_id->recv_cq;
+       return 0;
+ err2:
+-      ibv_destroy_cq(rs->cm_id->recv_cq);
+-      rs->cm_id->recv_cq = NULL;
++      ibv_destroy_cq(cm_id->recv_cq);
++      cm_id->recv_cq = NULL;
+ err1:
+-      ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel);
+-      rs->cm_id->recv_cq_channel = NULL;
++      ibv_destroy_comp_channel(cm_id->recv_cq_channel);
++      cm_id->recv_cq_channel = NULL;
+       return -1;
+ }
+-static inline int
+-rs_post_recv(struct rsocket *rs)
++static inline int rs_post_recv(struct rsocket *rs)
+ {
+       struct ibv_recv_wr wr, *bad;
+@@ -481,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
+       return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
+ }
++static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
++{
++      struct ibv_recv_wr wr, *bad;
++      struct ibv_sge sge;
++
++      sge.addr = (uintptr_t) buf;
++      sge.length = RS_SNDLOWAT;
++      sge.lkey = qp->rmr;
++
++      wr.wr_id = RS_RECV_WR_ID;
++      wr.next = NULL;
++      wr.sg_list = &sge;
++      wr.num_sge = 1;
++
++      return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
++}
++
+ static int rs_create_ep(struct rsocket *rs)
+ {
+       struct ibv_qp_init_attr qp_attr;
+@@ -491,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
+       if (ret)
+               return ret;
+-      ret = rs_create_cq(rs);
++      ret = rs_create_cq(rs, rs->cm_id);
+       if (ret)
+               return ret;
+@@ -548,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
+       }
+ }
++static void ds_free_qp(struct ds_qp *qp)
++{
++      if (qp->smr)
++              rdma_dereg_mr(qp->smr);
++
++      if (qp->rbuf) {
++              if (qp->rmr)
++                      rdma_dereg_mr(qp->rmr);
++              free(qp->rbuf);
++      }
++
++      if (qp->cm_id) {
++              if (qp->cm_id->qp) {
++                      dlist_remove(&qp->list);
++                      rdma_destroy_qp(qp->cm_id);
++              }
++              rdma_destroy_id(qp->cm_id);
++      }
++      free(qp);
++}
++
++static void ds_free_qps(struct rsocket *rs)
++{
++      struct ds_qp *qp;
++      dlist_t *entry;
++
++      while (!dlist_empty(&rs->qp_list)) {
++              qp = container_of(rs->qp_list.next, struct ds_qp, list);
++              ds_free_qp(qp);
++      }
++}
++
++static void ds_free(struct rsocket *rs)
++{
++      if (rs->state & (rs_readable | rs_writable))
++              rs_svc_remove(rs);
++
++      if (rs->dsock >= 0)
++              close(rs->dsock);
++
++      if (rs->index >= 0)
++              rs_remove(rs);
++
++      ds_free_qps(rs);
 +      if (rs->fds)
 +              free(rs->fds);
 +
++      if (rs->sbuf)
++              free(rs->sbuf);
++
++      fastlock_destroy(&rs->map_lock);
++      fastlock_destroy(&rs->cq_wait_lock);
++      fastlock_destroy(&rs->cq_lock);
++      fastlock_destroy(&rs->rlock);
++      fastlock_destroy(&rs->slock);
++      free(rs);
++}
++
+ static void rs_free(struct rsocket *rs)
+ {
++      if (rs->type == SOCK_DGRAM) {
++              ds_free(rs);
++              return;
++      }
++
+       if (rs->index >= 0)
+               rs_remove(rs);
+@@ -581,7 +819,7 @@ static void rs_free(struct rsocket *rs)
+               rdma_destroy_id(rs->cm_id);
+       }
+-      fastlock_destroy(&rs->iomap_lock);
 +      fastlock_destroy(&rs->map_lock);
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +873,56 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
 +static int ds_init(struct rsocket *rs, int domain)
 +{
-+      rs->fd = socket(domain, SOCK_DGRAM, 0);
-+      if (rs->fd < 0)
-+              return rs->fd;
++      rs->dsock = socket(domain, SOCK_DGRAM, 0);
++      if (rs->dsock < 0)
++              return rs->dsock;
 +
 +      rs->fds = calloc(1, sizeof *fds);
 +      if (!rs->fds)
 +              return ERR(ENOMEM);
 +      rs->nfds = 1;
++      dlist_init(&rs->qp_list);
 +
 +      return 0;
 +}
@@ -324,11 +632,11 @@ index 58fcb8e..99e638c 100644
 +              ret = ds_init(rs, domain);
 +              if (ret)
 +                      goto err;
++
++              index = rs->dsock;
++      }
  
 -      ret = rs_insert(rs);
-+              index = rs->fd;
-+      }
-+
 +      ret = rs_insert(rs, index);
        if (ret < 0)
                goto err;
@@ -337,7 +645,7 @@ index 58fcb8e..99e638c 100644
        return rs->index;
  
  err:
-@@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +936,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -349,12 +657,17 @@ index 58fcb8e..99e638c 100644
 +              if (!ret)
 +                      rs->state = rs_bound;
 +      } else {
-+              ret = bind(rs->fd, addr, addrlen);
++              ret = bind(rs->dsock, addr, addrlen);
++              if (!ret) {
++                      ret = rs_svc_insert(rs);
++                      if (!ret)
++                              rs->state = rs_readable | rs_writable;
++              }
 +      }
        return ret;
  }
  
-@@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +983,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -363,7 +676,7 @@ index 58fcb8e..99e638c 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +991,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -372,11 +685,37 @@ index 58fcb8e..99e638c 100644
        if (ret < 0)
                goto err;
  
-@@ -854,6 +950,66 @@ connected:
+@@ -854,13 +1128,212 @@ connected:
        return ret;
  }
  
-+static int ds_compare_dest(const void *dst1, const void *dst2)
++static int ds_init_ep(struct rsocket *rs)
++{
++      int ret;
++
++      rs_set_qp_size(rs);
++      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
++              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
++      else
++              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++
++      rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
++      if (!rs->sbuf)
++              return ERR(ENOMEM);
++
++      rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
++      rs->sbuf_bytes_avail = rs->sbuf_size;
++      rs->sqe_avail = rs->sq_size;
++
++      ret = rs_svc_insert(rs);
++      if (ret)
++              return ret;
++
++      rs->state = rs_readable | rs_writable;
++      return 0;
++}
++
++static int ds_compare_addr(const void *dst1, const void *dst2)
 +{
 +      const struct sockaddr *sa1, *sa2;
 +      size_t len;
@@ -389,57 +728,179 @@ index 58fcb8e..99e638c 100644
 +      return memcmp(dst1, dst2, len);
 +}
 +
-+/* Caller must hold map_lock around accessing source address */
-+static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
-+                                        socklen_t dst_len, socklen_t *src_len)
++static int rs_any_addr(const union socket_addr *addr)
++{
++      if (addr->sa.sa_family == AF_INET) {
++              return (addr->sin.sin_addr == INADDR_ANY ||
++                      addr->sin.sin_addr == INADDR_LOOPBACK);
++      } else {
++              return (addr->sin6.sin6_addr == in6addr_any ||
++                      addr->sin6.sin6_addr == in6addr_loopback);
++      }
++}
++
++static int ds_get_src_addr(struct rsocket *rs,
++                         const struct sockaddr *dest_addr, socklen_t dest_len,
++                         union socket_addr *src_addr, socklen_t *src_len)
 +{
-+      static union socket_addr src_addr;
 +      int sock, ret;
++      uint16_t port;
 +
-+      sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
++      *src_len = sizeof src_addr;
++      ret = getsockname(rs->dsock, &src_addr->sa, src_len);
++      if (ret || !rs_any_addr(src_addr))
++              return ret;
++
++      port = src_addr->sin.sin_port;
++      sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
 +      if (sock < 0)
-+              return NULL;
++              return sock;
 +
-+      ret = connect(sock, &dst_addr->sa, dst_len);
++      ret = connect(sock, dest_addr, dest_len);
 +      if (ret)
 +              goto out;
 +
 +      *src_len = sizeof src_addr;
-+      ret = getsockname(sock, &src_addr.sa, src_len);
-+
++      ret = getsockname(sock, &src_addr->sa, src_len);
++      src_addr->sin.sin_port = port;
 +out:
 +      close(sock);
-+      return ret ? NULL : &src_addr;
++      return ret;
++}
++
++static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
++                      socklen_t addrlen, struct ds_qp **qp)
++{
++      struct ibv_qp_init_attr qp_attr;
++      int ret;
++
++      *qp = calloc(1, sizeof(struct ds_qp));
++      if (!*qp)
++              return ERR(ENOMEM);
++
++      (*qp)->rs = rs;
++      ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
++      if (ret)
++              goto err;
++
++      ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
++      if (ret)
++              goto err;
++
++      ret = ds_init_bufs(*qp);
++      if (ret)
++              goto err;
++
++      ret = rs_create_cq(rs, (*qp)->cm_id);
++      if (ret)
++              goto err;
++
++      memset(&qp_attr, 0, sizeof qp_attr);
++      qp_attr.qp_context = qp;
++      qp_attr.send_cq = rs->cm_id->send_cq;
++      qp_attr.recv_cq = rs->cm_id->recv_cq;
++      qp_attr.qp_type = IBV_QPT_UD;
++      qp_attr.sq_sig_all = 1;
++      qp_attr.cap.max_send_wr = rs->sq_size;
++      qp_attr.cap.max_recv_wr = rs->rq_size;
++      qp_attr.cap.max_send_sge = 2;
++      qp_attr.cap.max_recv_sge = 1;
++      qp_attr.cap.max_inline_data = rs->sq_inline;
++
++      ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
++      if (ret)
++              return ret;
++
++      for (i = 0; i < rs->rq_size; i++) {
++              ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
++              if (ret)
++                      goto err;
++      }
++      list_insert_head(&(*qp)->list, &rs->qp_list);
++      return 0;
++err:
++      ds_free_qp(*qp);
++      return ret;
 +}
 +
-+static struct ds_qp *ds_get_qp(struct rsocket *rs,
-+                             const struct sockaddr *src_addr, socklen_t addrlen)
++static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
++                   socklen_t addrlen, struct ds_qp **qp)
 +{
-+      union socket_addr *addr;
-+      socklen_t len;
++      dlist_t *entry;
++
++      for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
++              *qp = container_of(entry, struct ds_qp, list);
++              if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
++                      return 0;
++      }
 +
++      return ds_create_qp(rs, src_addr, addrlen, qp);
 +}
 +
-+static int ds_connect(struct rsocket *rs,
-+                    const struct sockaddr *dest_addr, socklen_t addrlen)
++static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
++                     socklen_t addrlen, struct ds_dest **dest)
 +{
-+      struct ds_dest **dest;
++      union socket_addr src_addr;
++      socklen_t src_len;
++      struct ds_qp *qp;
++      int ret = 0;
 +
 +      fastlock_acquire(&rs->map_lock);
-+      dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
-+      if (dest) {
-+              rs->conn_dest = *dest;
++      dest = tfind(addr, &rs->dest_map, ds_compare_addr);
++      if (dest)
 +              goto out;
++
++      if (rs->state == rs_init) {
++              ret = ds_init_ep(rs);
++              if (ret)
++                      goto out;
 +      }
++
++      ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
++      if (ret)
++              goto out;
++
++      ret = ds_get_qp(rs, src_addr, src_len, &qp);
++      if (ret)
++              goto out;
++
++      *dest = calloc(1, sizeof(struct ds_dest));
++      if (!*dest) {
++              ret = ERR(ENOMEM);
++              goto out;
++      }
++
++      memcpy(&(*dest)->addr, addr, addrlen);
++      (*dest)->qp = qp;
++      tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
 +out:
 +      fastlock_release(&rs->map_lock);
-+      return 0;
++      return ret;
 +}
 +
  int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
  {
        struct rsocket *rs;
-@@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs,
++      int ret;
+       rs = idm_at(&idm, socket);
+-      memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+-      return rs_do_connect(rs);
++      if (rs->type == SOCK_STREAM) {
++              memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
++              ret = rs_do_connect(rs);
++      } else {
++              fastlock_acquire(&rs->slock);
++              ret = connect(rs->dsock, addr, addrlen);
++              if (!ret)
++                      ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++              fastlock_release(&rs->slock);
++      }
++      return ret;
+ }
+ static int rs_post_write_msg(struct rsocket *rs,
+@@ -902,6 +1375,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -465,7 +926,7 @@ index 58fcb8e..99e638c 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs,
+@@ -932,6 +1424,15 @@ static int rs_write_data(struct rsocket *rs,
                                 flags, addr, rkey);
  }
  
@@ -481,7 +942,16 @@ index 58fcb8e..99e638c 100644
  static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
                           struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
  {
-@@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1045,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
+                                       rs->state = rs_disconnected;
+                                       return 0;
+                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
+-                                      rs->state &= ~rs_connect_rd;
++                                      rs->state &= ~rs_readable;
+                               }
+                               break;
+                       case RS_OP_WRITE:
+@@ -1218,9 +1719,14 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
@@ -492,8 +962,30 @@ index 58fcb8e..99e638c 100644
 +
  static int rs_conn_can_send(struct rsocket *rs)
  {
-       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
-@@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+-      return rs_can_send(rs) || !(rs->state & rs_connect_wr);
++      return rs_can_send(rs) || !(rs->state & rs_writable);
+ }
+ static int rs_conn_can_send_ctrl(struct rsocket *rs)
+@@ -1235,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
+ static int rs_conn_have_rdata(struct rsocket *rs)
+ {
+-      return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
++      return rs_have_rdata(rs) || !(rs->state & rs_readable);
+ }
+ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1338,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+                       rs->rbuf_bytes_avail += rsize;
+               }
+-      } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
++      } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
+       fastlock_release(&rs->rlock);
+       return ret ? ret : len - left;
+@@ -1390,14 +1896,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -502,7 +994,15 @@ index 58fcb8e..99e638c 100644
        while (!dlist_empty(&rs->iomap_queue)) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+-                      if (!(rs->state & rs_connect_wr)) {
++                      if (!(rs->state & rs_writable)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+@@ -1446,10 +1952,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -598,7 +1098,7 @@ index 58fcb8e..99e638c 100644
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2053,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -612,7 +1112,16 @@ index 58fcb8e..99e638c 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1537,10 +1817,21 @@ out:
+@@ -1484,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+-                      if (!(rs->state & rs_connect_wr)) {
++                      if (!(rs->state & rs_writable)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+@@ -1537,10 +2134,21 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
@@ -624,11 +1133,11 @@ index 58fcb8e..99e638c 100644
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
 +                      return ERR(EISCONN);
-+
-+              return rsend(socket, buf, len, flags);
-+      }
  
 -      return rsend(socket, buf, len, flags);
++              return rsend(socket, buf, len, flags);
++      }
++
 +      fastlock_acquire(&rs->slock);
 +      ds_connect(rs, dest_addr, addrlen);
 +      ret = dsend(rs, buf, len, flags);
@@ -637,7 +1146,16 @@ index 58fcb8e..99e638c 100644
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1599,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+-                      if (!(rs->state & rs_connect_wr)) {
++                      if (!(rs->state & rs_writable)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+@@ -1652,7 +2260,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -646,7 +1164,30 @@ index 58fcb8e..99e638c 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -1948,7 +2556,7 @@ int rshutdown(int socket, int how)
+       rs = idm_at(&idm, socket);
+       if (how == SHUT_RD) {
+-              rs->state &= ~rs_connect_rd;
++              rs->state &= ~rs_readable;
+               return 0;
+       }
+@@ -1958,10 +2566,10 @@ int rshutdown(int socket, int how)
+       if (rs->state & rs_connected) {
+               if (how == SHUT_RDWR) {
+                       ctrl = RS_CTRL_DISCONNECT;
+-                      rs->state &= ~(rs_connect_rd | rs_connect_wr);
++                      rs->state &= ~(rs_readable | rs_writable);
+               } else {
+-                      rs->state &= ~rs_connect_wr;
+-                      ctrl = (rs->state & rs_connect_rd) ?
++                      rs->state &= ~rs_writable;
++                      ctrl = (rs->state & rs_readable) ?
+                               RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
+               }
+               if (!rs->ctrl_avail) {
+@@ -2017,8 +2625,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -661,7 +1202,7 @@ index 58fcb8e..99e638c 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +2638,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -676,7 +1217,7 @@ index 58fcb8e..99e638c 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,6 +2655,12 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -689,7 +1230,7 @@ index 58fcb8e..99e638c 100644
        switch (level) {
        case SOL_SOCKET:
                opts = &rs->so_opts;
-@@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname,
+@@ -2156,6 +2778,9 @@ int rgetsockopt(int socket, int level, int optname,
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -699,7 +1240,7 @@ index 58fcb8e..99e638c 100644
        switch (level) {
        case SOL_SOCKET:
                switch (optname) {
-@@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2314,7 +2939,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -708,7 +1249,7 @@ index 58fcb8e..99e638c 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +2973,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -717,7 +1258,7 @@ index 58fcb8e..99e638c 100644
        return offset;
  }
  
-@@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +2985,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -726,7 +1267,7 @@ index 58fcb8e..99e638c 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3006,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -735,7 +1276,16 @@ index 58fcb8e..99e638c 100644
        return ret;
  }
  
-@@ -2475,3 +2783,24 @@ out:
+@@ -2425,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+-                      if (!(rs->state & rs_connect_wr)) {
++                      if (!(rs->state & rs_writable)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+@@ -2475,3 +3100,24 @@ out:
  
        return (ret && left == count) ? ret : count - left;
  }
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 97ba24d..0000000
+++ /dev/null
@@ -1,761 +0,0 @@
-Bottom: 97a52629c221cba1033082bbd308ecfc4d4b6082
-Top:    da0048097eea01b21df587e85b3f7ac44a2582c8
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-11-20 23:43:13 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/cma.c b/src/cma.c
-index 91bf108..2c6b032 100755
---- a/src/cma.c
-+++ b/src/cma.c
-@@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
- int ucma_max_qpsize(struct rdma_cm_id *id)
- {
-       struct cma_id_private *id_priv;
-+      int i, max_size = 0;
-       id_priv = container_of(id, struct cma_id_private, id);
--      return id_priv->cma_dev->max_qpsize;
-+      if (id && id_priv->cma_dev) {
-+              max_size = id_priv->cma_dev->max_qpsize;
-+      } else {
-+              for (i = 0; i < cma_dev_cnt; i++) {
-+                      if (!max_size || max_size > cma_dev_array[i].max_qpsize)
-+                              max_size = cma_dev_array[i].max_qpsize;
-+              }
-+      }
-+      return max_size;
- }
- uint16_t ucma_get_port(struct sockaddr *addr)
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 99e638c..0695d12 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -55,7 +55,7 @@
- #define RS_OLAP_START_SIZE 2048
- #define RS_MAX_TRANSFER 65536
--#define RS_SNDLOWAT 64
-+#define RS_SNDLOWAT 2048
- #define RS_QP_MAX_SIZE 0xFFFE
- #define RS_QP_CTRL_SIZE 4
- #define RS_CONN_RETRIES 6
-@@ -63,6 +63,23 @@
- static struct index_map idm;
- static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-+enum {
-+      RS_SVC_INSERT,
-+      RS_SVC_REMOVE
-+};
-+
-+struct rsocket;
-+
-+struct rs_svc_msg {
-+      uint32_t op;
-+      uint32_t status;
-+      struct rsocket *rs;
-+};
-+
-+static pthread_t svc_id;
-+static int svc_cnt;
-+static int svc_fds[2];
-+
- static uint16_t def_iomap_size = 0;
- static uint16_t def_inline = 64;
- static uint16_t def_sqsize = 384;
-@@ -165,9 +182,9 @@ enum rs_state {
-       rs_connecting      = rs_opening |   0x0040,
-       rs_accepting       = rs_opening |   0x0080,
-       rs_connected       =                0x0100,
--      rs_connect_wr      =                0x0200,
--      rs_connect_rd      =                0x0400,
--      rs_connect_rdwr    = rs_connected | rs_connect_rd | rs_connect_wr,
-+      rs_writable        =                0x0200,
-+      rs_readable        =                0x0400,
-+      rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
-       rs_connect_error   =                0x0800,
-       rs_disconnected    =                0x1000,
-       rs_error           =                0x2000,
-@@ -182,18 +199,23 @@ union socket_addr {
- };
- struct ds_qp {
-+      dlist_t           list;
-+      struct rsocket    *rs;
-       struct rdma_cm_id *cm_id;
--      int               rbuf_cnt;
-+
-+      struct ibv_mr     *rmr;
-+      uint8_t           *rbuf;
-+
-+      struct ibv_mr     *smr;
-       uint16_t          lid;
-       uint8_t           sl;
- };
- struct ds_dest {
--      union socket_addr addr;
-+      union socket_addr addr; /* must be first */
-       struct ds_qp      *qp;
-       struct ibv_ah     *ah;
-       uint32_t           qpn;
--      atomic_t           refcnt;
- };
- struct rsocket {
-@@ -223,12 +245,12 @@ struct rsocket {
-               };
-               /* datagram */
-               struct {
--                      struct ds_qp      *qp_array;
-+                      dlist_t           qp_list;
-                       void              *dest_map;
-                       struct ds_dest    *conn_dest;
-                       struct pollfd     *fds;
-                       nfds_t            nfds;
--                      int               fd;
-+                      int               dsock;
-                       int               sbytes_needed;
-               };
-       };
-@@ -290,6 +312,62 @@ struct ds_udp_header {
- };
-+static int rs_svc_run(void *arg)
-+{
-+      return 0;
-+}
-+
-+static int rs_svc_insert(struct rsocket *rs)
-+{
-+      struct rs_svc_msg msg;
-+      int ret;
-+
-+      pthread_mutex_lock(&mut);
-+      if (!svc_cnt) {
-+              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
-+              if (ret)
-+                      goto out;
-+
-+              ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
-+              if (ret) {
-+                      close(svc_fds[0]);
-+                      close(svc_fds[1]);
-+                      ret = ERR(ret);
-+                      goto out;
-+              }
-+      }
-+
-+      msg.op = RS_SVC_INSERT;
-+      msg.status = EINVAL;
-+      msg.rs = rs;
-+      svc_cnt++;
-+      write(svc_fds[0], &msg, sizeof msg);
-+      read(svc_fds[0], &msg, sizeof msg);
-+      ret = ERR(msg.status);
-+out:
-+      pthread_mutex_unlock(&mut);
-+      return ret;
-+}
-+
-+static int rs_svc_remove(struct rsocket *rs)
-+{
-+      struct rs_svc_msg msg;
-+      int ret;
-+
-+      pthread_mutex_lock(&mut);
-+      msg.op = RS_SVC_REMOVE;
-+      msg.status = EINVAL;
-+      msg.rs = rs;
-+      write(svc_fds[0], &msg, sizeof msg);
-+      read(svc_fds[0], &msg, sizeof msg);
-+      ret = ERR(msg.status);
-+      if (!ret && !--svn_cnt)
-+              pthread_join(svc_id, NULL);
-+
-+      pthread_mutex_unlock(&mut);
-+      return ret;
-+}
-+
- static int rs_value_to_scale(int value, int bits)
- {
-       return value <= (1 << (bits - 1)) ?
-@@ -390,7 +468,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
-       rs->type = type;
-       rs->index = -1;
--      rs->fd = -1;
-+      rs->dsock = -1;
-       if (inherited_rs) {
-               rs->sbuf_size = inherited_rs->sbuf_size;
-               rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -418,6 +496,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
-       return rs;
- }
-+/*
-+ * TODO: Support datagram rsockets
-+ */
- static int rs_set_nonblocking(struct rsocket *rs, long arg)
- {
-       int ret = 0;
-@@ -500,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
-       return 0;
- }
--static int rs_create_cq(struct rsocket *rs)
-+static int ds_init_bufs(struct ds_qp *qp)
- {
--      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
--      if (!rs->cm_id->recv_cq_channel)
-+      qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
-+      if (!qp->rbuf)
-+              return ERR(ENOMEM);
-+
-+      qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
-+      if (!qp->smr)
-+              return -1;
-+
-+      qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
-+      if (!qp->rmr)
-               return -1;
--      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
--                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
--      if (!rs->cm_id->recv_cq)
-+      return 0;
-+}
-+
-+static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
-+{
-+      cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
-+      if (!cm_id->recv_cq_channel)
-+              return -1;
-+
-+      cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
-+                                     cm_id, cm_id->recv_cq_channel, 0);
-+      if (!cm_id->recv_cq)
-               goto err1;
-       if (rs->fd_flags & O_NONBLOCK) {
-@@ -516,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
-                       goto err2;
-       }
--      rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
--      rs->cm_id->send_cq = rs->cm_id->recv_cq;
-+      cm_id->send_cq_channel = cm_id->recv_cq_channel;
-+      cm_id->send_cq = cm_id->recv_cq;
-       return 0;
- err2:
--      ibv_destroy_cq(rs->cm_id->recv_cq);
--      rs->cm_id->recv_cq = NULL;
-+      ibv_destroy_cq(cm_id->recv_cq);
-+      cm_id->recv_cq = NULL;
- err1:
--      ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel);
--      rs->cm_id->recv_cq_channel = NULL;
-+      ibv_destroy_comp_channel(cm_id->recv_cq_channel);
-+      cm_id->recv_cq_channel = NULL;
-       return -1;
- }
--static inline int
--rs_post_recv(struct rsocket *rs)
-+static inline int rs_post_recv(struct rsocket *rs)
- {
-       struct ibv_recv_wr wr, *bad;
-@@ -542,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
-       return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
- }
-+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
-+{
-+      struct ibv_recv_wr wr, *bad;
-+      struct ibv_sge sge;
-+
-+      sge.addr = (uintptr_t) buf;
-+      sge.length = RS_SNDLOWAT;
-+      sge.lkey = qp->rmr;
-+
-+      wr.wr_id = RS_RECV_WR_ID;
-+      wr.next = NULL;
-+      wr.sg_list = &sge;
-+      wr.num_sge = 1;
-+
-+      return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
-+}
-+
- static int rs_create_ep(struct rsocket *rs)
- {
-       struct ibv_qp_init_attr qp_attr;
-@@ -552,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
-       if (ret)
-               return ret;
--      ret = rs_create_cq(rs);
-+      ret = rs_create_cq(rs, rs->cm_id);
-       if (ret)
-               return ret;
-@@ -609,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
-       }
- }
-+static void ds_free_qp(struct ds_qp *qp)
-+{
-+      if (qp->smr)
-+              rdma_dereg_mr(qp->smr);
-+
-+      if (qp->rbuf) {
-+              if (qp->rmr)
-+                      rdma_dereg_mr(qp->rmr);
-+              free(qp->rbuf);
-+      }
-+
-+      if (qp->cm_id) {
-+              if (qp->cm_id->qp) {
-+                      dlist_remove(&qp->list);
-+                      rdma_destroy_qp(qp->cm_id);
-+              }
-+              rdma_destroy_id(qp->cm_id);
-+      }
-+      free(qp);
-+}
-+
-+static void ds_free_qps(struct rsocket *rs)
-+{
-+      struct ds_qp *qp;
-+      dlist_t *entry;
-+
-+      while (!dlist_empty(&rs->qp_list)) {
-+              qp = container_of(rs->qp_list.next, struct ds_qp, list);
-+              ds_free_qp(qp);
-+      }
-+}
-+
-+static void ds_free(struct rsocket *rs)
-+{
-+      if (rs->state & (rs_readable | rs_writable))
-+              rs_svc_remove(rs);
-+
-+      if (rs->dsock >= 0)
-+              close(rs->dsock);
-+
-+      if (rs->index >= 0)
-+              rs_remove(rs);
-+
-+      ds_free_qps(rs);
-+      if (rs->fds)
-+              free(rs->fds);
-+
-+      if (rs->sbuf)
-+              free(rs->sbuf);
-+
-+      fastlock_destroy(&rs->map_lock);
-+      fastlock_destroy(&rs->cq_wait_lock);
-+      fastlock_destroy(&rs->cq_lock);
-+      fastlock_destroy(&rs->rlock);
-+      fastlock_destroy(&rs->slock);
-+      free(rs);
-+}
-+
- static void rs_free(struct rsocket *rs)
- {
-+      if (rs->type == SOCK_DGRAM) {
-+              ds_free(rs);
-+              return;
-+      }
-+
-       if (rs->index >= 0)
-               rs_remove(rs);
-@@ -642,11 +819,6 @@ static void rs_free(struct rsocket *rs)
-               rdma_destroy_id(rs->cm_id);
-       }
--      if (rs->fd >= 0)
--              close(rs->fd);
--      if (rs->fds)
--              free(rs->fds);
--
-       fastlock_destroy(&rs->map_lock);
-       fastlock_destroy(&rs->cq_wait_lock);
-       fastlock_destroy(&rs->cq_lock);
-@@ -703,14 +875,15 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
- static int ds_init(struct rsocket *rs, int domain)
- {
--      rs->fd = socket(domain, SOCK_DGRAM, 0);
--      if (rs->fd < 0)
--              return rs->fd;
-+      rs->dsock = socket(domain, SOCK_DGRAM, 0);
-+      if (rs->dsock < 0)
-+              return rs->dsock;
-       rs->fds = calloc(1, sizeof *fds);
-       if (!rs->fds)
-               return ERR(ENOMEM);
-       rs->nfds = 1;
-+      dlist_init(&rs->qp_list);
-       return 0;
- }
-@@ -743,7 +916,7 @@ int rsocket(int domain, int type, int protocol)
-               if (ret)
-                       goto err;
--              index = rs->fd;
-+              index = rs->dsock;
-       }
-       ret = rs_insert(rs, index);
-@@ -768,7 +941,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               if (!ret)
-                       rs->state = rs_bound;
-       } else {
--              ret = bind(rs->fd, addr, addrlen);
-+              ret = bind(rs->dsock, addr, addrlen);
-+              if (!ret) {
-+                      ret = rs_svc_insert(rs);
-+                      if (!ret)
-+                              rs->state = rs_readable | rs_writable;
-+              }
-       }
-       return ret;
- }
-@@ -950,7 +1128,33 @@ connected:
-       return ret;
- }
--static int ds_compare_dest(const void *dst1, const void *dst2)
-+static int ds_init_ep(struct rsocket *rs)
-+{
-+      int ret;
-+
-+      rs_set_qp_size(rs);
-+      if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-+              rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-+      else
-+              rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
-+
-+      rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
-+      if (!rs->sbuf)
-+              return ERR(ENOMEM);
-+
-+      rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
-+      rs->sbuf_bytes_avail = rs->sbuf_size;
-+      rs->sqe_avail = rs->sq_size;
-+
-+      ret = rs_svc_insert(rs);
-+      if (ret)
-+              return ret;
-+
-+      rs->state = rs_readable | rs_writable;
-+      return 0;
-+}
-+
-+static int ds_compare_addr(const void *dst1, const void *dst2)
- {
-       const struct sockaddr *sa1, *sa2;
-       size_t len;
-@@ -963,60 +1167,173 @@ static int ds_compare_dest(const void *dst1, const void *dst2)
-       return memcmp(dst1, dst2, len);
- }
--/* Caller must hold map_lock around accessing source address */
--static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
--                                        socklen_t dst_len, socklen_t *src_len)
-+static int rs_any_addr(const union socket_addr *addr)
-+{
-+      if (addr->sa.sa_family == AF_INET) {
-+              return (addr->sin.sin_addr == INADDR_ANY ||
-+                      addr->sin.sin_addr == INADDR_LOOPBACK);
-+      } else {
-+              return (addr->sin6.sin6_addr == in6addr_any ||
-+                      addr->sin6.sin6_addr == in6addr_loopback);
-+      }
-+}
-+
-+static int ds_get_src_addr(struct rsocket *rs,
-+                         const struct sockaddr *dest_addr, socklen_t dest_len,
-+                         union socket_addr *src_addr, socklen_t *src_len)
- {
--      static union socket_addr src_addr;
-       int sock, ret;
-+      uint16_t port;
--      sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
-+      *src_len = sizeof src_addr;
-+      ret = getsockname(rs->dsock, &src_addr->sa, src_len);
-+      if (ret || !rs_any_addr(src_addr))
-+              return ret;
-+
-+      port = src_addr->sin.sin_port;
-+      sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
-       if (sock < 0)
--              return NULL;
-+              return sock;
--      ret = connect(sock, &dst_addr->sa, dst_len);
-+      ret = connect(sock, dest_addr, dest_len);
-       if (ret)
-               goto out;
-       *src_len = sizeof src_addr;
--      ret = getsockname(sock, &src_addr.sa, src_len);
--
-+      ret = getsockname(sock, &src_addr->sa, src_len);
-+      src_addr->sin.sin_port = port;
- out:
-       close(sock);
--      return ret ? NULL : &src_addr;
-+      return ret;
-+}
-+
-+static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-+                      socklen_t addrlen, struct ds_qp **qp)
-+{
-+      struct ibv_qp_init_attr qp_attr;
-+      int ret;
-+
-+      *qp = calloc(1, sizeof(struct ds_qp));
-+      if (!*qp)
-+              return ERR(ENOMEM);
-+
-+      (*qp)->rs = rs;
-+      ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
-+      if (ret)
-+              goto err;
-+
-+      ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
-+      if (ret)
-+              goto err;
-+
-+      ret = ds_init_bufs(*qp);
-+      if (ret)
-+              goto err;
-+
-+      ret = rs_create_cq(rs, (*qp)->cm_id);
-+      if (ret)
-+              goto err;
-+
-+      memset(&qp_attr, 0, sizeof qp_attr);
-+      qp_attr.qp_context = qp;
-+      qp_attr.send_cq = rs->cm_id->send_cq;
-+      qp_attr.recv_cq = rs->cm_id->recv_cq;
-+      qp_attr.qp_type = IBV_QPT_UD;
-+      qp_attr.sq_sig_all = 1;
-+      qp_attr.cap.max_send_wr = rs->sq_size;
-+      qp_attr.cap.max_recv_wr = rs->rq_size;
-+      qp_attr.cap.max_send_sge = 2;
-+      qp_attr.cap.max_recv_sge = 1;
-+      qp_attr.cap.max_inline_data = rs->sq_inline;
-+
-+      ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
-+      if (ret)
-+              return ret;
-+
-+      for (i = 0; i < rs->rq_size; i++) {
-+              ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
-+              if (ret)
-+                      goto err;
-+      }
-+      list_insert_head(&(*qp)->list, &rs->qp_list);
-+      return 0;
-+err:
-+      ds_free_qp(*qp);
-+      return ret;
- }
--static struct ds_qp *ds_get_qp(struct rsocket *rs,
--                             const struct sockaddr *src_addr, socklen_t addrlen)
-+static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
-+                   socklen_t addrlen, struct ds_qp **qp)
- {
--      union socket_addr *addr;
--      socklen_t len;
-+      dlist_t *entry;
-+      for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
-+              *qp = container_of(entry, struct ds_qp, list);
-+              if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-+                      return 0;
-+      }
-+
-+      return ds_create_qp(rs, src_addr, addrlen, qp);
- }
--static int ds_connect(struct rsocket *rs,
--                    const struct sockaddr *dest_addr, socklen_t addrlen)
-+static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
-+                     socklen_t addrlen, struct ds_dest **dest)
- {
--      struct ds_dest **dest;
-+      union socket_addr src_addr;
-+      socklen_t src_len;
-+      struct ds_qp *qp;
-+      int ret = 0;
-       fastlock_acquire(&rs->map_lock);
--      dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
--      if (dest) {
--              rs->conn_dest = *dest;
-+      dest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+      if (dest)
-+              goto out;
-+
-+      if (rs->state == rs_init) {
-+              ret = ds_init_ep(rs);
-+              if (ret)
-+                      goto out;
-+      }
-+
-+      ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+      if (ret)
-+              goto out;
-+
-+      ret = ds_get_qp(rs, src_addr, src_len, &qp);
-+      if (ret)
-+              goto out;
-+
-+      *dest = calloc(1, sizeof(struct ds_dest));
-+      if (!*dest) {
-+              ret = ERR(ENOMEM);
-               goto out;
-       }
-+
-+      memcpy(&(*dest)->addr, addr, addrlen);
-+      (*dest)->qp = qp;
-+      tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
- out:
-       fastlock_release(&rs->map_lock);
--      return 0;
-+      return ret;
- }
- int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- {
-       struct rsocket *rs;
-+      int ret;
-       rs = idm_at(&idm, socket);
--      memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
--      return rs_do_connect(rs);
-+      if (rs->type == SOCK_STREAM) {
-+              memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
-+              ret = rs_do_connect(rs);
-+      } else {
-+              fastlock_acquire(&rs->slock);
-+              ret = connect(rs->dsock, addr, addrlen);
-+              if (!ret)
-+                      ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+              fastlock_release(&rs->slock);
-+      }
-+      return ret;
- }
- static int rs_post_write_msg(struct rsocket *rs,
-@@ -1229,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
-                                       rs->state = rs_disconnected;
-                                       return 0;
-                               } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
--                                      rs->state &= ~rs_connect_rd;
-+                                      rs->state &= ~rs_readable;
-                               }
-                               break;
-                       case RS_OP_WRITE:
-@@ -1409,7 +1726,7 @@ static int ds_can_send(struct rsocket *rs)
- static int rs_conn_can_send(struct rsocket *rs)
- {
--      return rs_can_send(rs) || !(rs->state & rs_connect_wr);
-+      return rs_can_send(rs) || !(rs->state & rs_writable);
- }
- static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1424,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
- static int rs_conn_have_rdata(struct rsocket *rs)
- {
--      return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
-+      return rs_have_rdata(rs) || !(rs->state & rs_readable);
- }
- static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1527,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
-                       rs->rbuf_bytes_avail += rsize;
-               }
--      } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
-+      } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
-       fastlock_release(&rs->rlock);
-       return ret ? ret : len - left;
-@@ -1586,7 +1903,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
--                      if (!(rs->state & rs_connect_wr)) {
-+                      if (!(rs->state & rs_writable)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-@@ -1764,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
--                      if (!(rs->state & rs_connect_wr)) {
-+                      if (!(rs->state & rs_writable)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-@@ -1890,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
--                      if (!(rs->state & rs_connect_wr)) {
-+                      if (!(rs->state & rs_writable)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }
-@@ -2239,7 +2556,7 @@ int rshutdown(int socket, int how)
-       rs = idm_at(&idm, socket);
-       if (how == SHUT_RD) {
--              rs->state &= ~rs_connect_rd;
-+              rs->state &= ~rs_readable;
-               return 0;
-       }
-@@ -2249,10 +2566,10 @@ int rshutdown(int socket, int how)
-       if (rs->state & rs_connected) {
-               if (how == SHUT_RDWR) {
-                       ctrl = RS_CTRL_DISCONNECT;
--                      rs->state &= ~(rs_connect_rd | rs_connect_wr);
-+                      rs->state &= ~(rs_readable | rs_writable);
-               } else {
--                      rs->state &= ~rs_connect_wr;
--                      ctrl = (rs->state & rs_connect_rd) ?
-+                      rs->state &= ~rs_writable;
-+                      ctrl = (rs->state & rs_readable) ?
-                               RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
-               }
-               if (!rs->ctrl_avail) {
-@@ -2733,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
-                                         rs_conn_can_send);
-                       if (ret)
-                               break;
--                      if (!(rs->state & rs_connect_wr)) {
-+                      if (!(rs->state & rs_writable)) {
-                               ret = ERR(ECONNRESET);
-                               break;
-                       }