]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Mon, 3 Dec 2012 19:22:59 +0000 (11:22 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 3 Dec 2012 19:22:59 +0000 (11:22 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index c20a978ae4943383aceabfcf450aa5e38cb749dd..354cd60ee14bec9839732af10f4ad0a6129ad7eb 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: b8e820f263f0051ab7f56a7bee69f54cd5bed8c8
-Head: a47ef338677a222cd8301615f95ed046b4fbc8c8
+Previous: 5bbff4033a907b285250c91299e6ba370d016ff3
+Head: f8e4ababdee71863405d63f568dd9fa5fcf3ba7d
 Applied:
-  dsocket: b43272823d9e4e48a7daea9bafb1345043d615cf
-  refresh-temp: a47ef338677a222cd8301615f95ed046b4fbc8c8
+  dsocket: f8e4ababdee71863405d63f568dd9fa5fcf3ba7d
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
index afced889e61eb702f12376d7ebedb59efe691760..ddbba1bbb943c292b6410b23f2faace60f18b161 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top:    0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
+Top:    bd15dfc9adaf449efbd6fcccaac92ea3ed7ad81b
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -86,7 +86,7 @@ index 91bf108..2c6b032 100755
  
  uint16_t ucma_get_port(struct sockaddr *addr)
 diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..d7c3163 100644
+index 58fcb8e..07cf31d 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
 @@ -46,6 +46,7 @@
@@ -106,7 +106,7 @@ index 58fcb8e..d7c3163 100644
  #define RS_QP_MAX_SIZE 0xFFFE
  #define RS_QP_CTRL_SIZE 4
  #define RS_CONN_RETRIES 6
-@@ -63,6 +64,23 @@
+@@ -63,6 +64,26 @@
  static struct index_map idm;
  static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
  
@@ -124,13 +124,16 @@ index 58fcb8e..d7c3163 100644
 +};
 +
 +static pthread_t svc_id;
++static int svc_sock[2];
 +static int svc_cnt;
-+static int svc_fds[2];
++static int svc_size;
++static struct rsocket **svc_rss;
++static struct pollfd *svc_fds;
 +
  static uint16_t def_iomap_size = 0;
  static uint16_t def_inline = 64;
  static uint16_t def_sqsize = 384;
-@@ -99,6 +117,14 @@ enum {
+@@ -99,6 +120,14 @@ enum {
  #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
  #define rs_msg_op(imm_data)   (imm_data >> 29)
  #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
@@ -145,7 +148,7 @@ index 58fcb8e..d7c3163 100644
  
  enum {
        RS_CTRL_DISCONNECT,
-@@ -110,6 +136,18 @@ struct rs_msg {
+@@ -110,6 +139,18 @@ struct rs_msg {
        uint32_t data;
  };
  
@@ -164,7 +167,7 @@ index 58fcb8e..d7c3163 100644
  struct rs_sge {
        uint64_t addr;
        uint32_t key;
-@@ -144,8 +182,6 @@ struct rs_conn_data {
+@@ -144,8 +185,6 @@ struct rs_conn_data {
        struct rs_sge     data_buf;
  };
  
@@ -173,7 +176,7 @@ index 58fcb8e..d7c3163 100644
  /*
   * rsocket states are ordered as passive, connecting, connected, disconnected.
   */
-@@ -159,9 +195,9 @@ enum rs_state {
+@@ -159,9 +198,9 @@ enum rs_state {
        rs_connecting      = rs_opening |   0x0040,
        rs_accepting       = rs_opening |   0x0080,
        rs_connected       =                0x0100,
@@ -186,7 +189,7 @@ index 58fcb8e..d7c3163 100644
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
-@@ -169,68 +205,219 @@ enum rs_state {
+@@ -169,68 +208,349 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -213,11 +216,19 @@ index 58fcb8e..d7c3163 100644
 +#define DS_IPV4_HDR_LEN  8
 +#define DS_IPV6_HDR_LEN 24
 +
++struct ds_dest {
++      union socket_addr addr; /* must be first */
++      struct ds_qp      *qp;
++      struct ibv_ah     *ah;
++      uint32_t           qpn;
++};
++
 +struct ds_qp {
 +      dlist_t           list;
 +      struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
 +      struct ds_header  hdr;
++      struct ds_dest    dest;
 +
 +      struct ibv_mr     *smr;
 +      struct ibv_mr     *rmr;
@@ -226,13 +237,6 @@ index 58fcb8e..d7c3163 100644
 +      int               cq_armed;
 +};
 +
-+struct ds_dest {
-+      union socket_addr addr; /* must be first */
-+      struct ds_qp      *qp;
-+      struct ibv_ah     *ah;
-+      uint32_t           qpn;
-+};
-+
 +struct rsocket {
 +      int               type;
 +      int               index;
@@ -345,17 +349,25 @@ index 58fcb8e..d7c3163 100644
 -      uint32_t          rbuf_size;
 -      struct ibv_mr    *rmr;
 -      uint8_t           *rbuf;
-+#define DS_UDP_TAG 0x5555555555555555ULL
++#define DS_UDP_TAG 0x55555555
  
 -      uint32_t          sbuf_size;
 -      struct ibv_mr    *smr;
 -      struct ibv_sge    ssgl[2];
 -      uint8_t           *sbuf;
 +struct ds_udp_header {
-+      uint64_t          tag;
++      uint32_t          tag;
 +      uint8_t           version;
-+      uint8_t           reserved[3];
-+      uint32_t          qpn;  /* upper 8-bits reserved */
++      uint8_t           op;
++      uint8_t           length;
++      uint8_t           reserved;
++      uint32_t          qpn;  /* lower 8-bits reserved */
++      union {
++              uint32_t ipv4;
++              struct {
++                      uint8_t  addr[16];
++              } ipv6;
++      } addr;
  };
  
 +#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
@@ -379,8 +391,118 @@ index 58fcb8e..d7c3163 100644
 +      }
 +}
 +
++static int rs_svc_grow_sets(void)
++{
++      struct rsocket **rss;
++      struct pollfd *fds;
++      void *set;
++
++      set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
++      if (!set)
++              return ENOMEM;
++
++      svc_size += 2;
++      rss = set;
++      fds = set + sizeof(*rss) * svc_size;
++      if (svc_cnt) {
++              memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
++              memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
++      }
++
++      free(svc_rss);
++      free(svc_fds);
++      svc_rss = rss;
++      svc_fds = fds;
++      return 0;
++}
++
++/*
++ * Index 0 is reserved for the service's communication socket.
++ */
++static int rs_svc_add_rs(struct rsocket *rs)
++{
++      int ret;
++
++      if (svc_cnt >= svc_size - 1) {
++              ret = rs_svc_grow_sets();
++              if (ret)
++                      return ret;
++      }
++
++      svc_rss[++svc_cnt] = rs;
++      svc_fds[svc_cnt].fd = rs->udp_sock;
++      svc_fds[svc_cnt].events = POLLIN;
++      svc_fds[svc_cnt].revents = 0;
++      return 0;
++}
++
++static int rs_svc_rm_rs(struct rsocket *rs)
++{
++      int i;
++
++      for (i = 1; i <= svc_cnt; i++) {
++              if (svc_rss[i] == rs) {
++                      svc_cnt--;
++                      svc_rss[i] = svc_rss[svc_cnt];
++                      svc_fds[i] = svc_fds[svc_cnt];
++                      return 0;
++              }
++      }
++      return EBADF;
++}
++
++static void rs_svc_process_sock(void)
++{
++      struct rs_svc_msg msg;
++
++      read(svc_sock[1], &msg, sizeof msg);
++      switch (msg.op) {
++      case RS_SVC_INSERT:
++              msg.status = rs_svc_add_rs(msg.rs);
++              break;
++      case RS_SVC_REMOVE:
++              msg.status = rs_svc_rm_rs(msg.rs);
++              break;
++      default:
++              msg.status = ENOTSUP;
++              break;
++      }
++      write(svc_sock[1], &msg, sizeof msg);
++}
++
++static void rs_svc_process_rs(struct rsocket *rs)
++{
++
++}
++
 +static int rs_svc_run(void *arg)
 +{
++      struct rs_svc_msg msg;
++      int i, ret;
++
++      ret = rs_svc_grow_sets();
++      if (ret) {
++              msg.status = ret;
++              write(svc_sock[1] &msg, sizeof msg);
++              return ret;
++      }
++
++      svc_fds[0].fd = svc_sock[1];
++      svc_fds[0].events = POLLIN;
++      do {
++              for (i = 0; i <= svc_cnt; i++)
++                      svc_fds[i].revents = 0;
++
++              poll(svc_fds, svc_cnt + 1, -1);
++              if (svc_fds[0].revents)
++                      rs_svc_process_sock();
++
++              for (i = 1; i <= svc_cnt; i++) {
++                      if (svc_fds[i].revents)
++                              rs_svc_process_rs(svc_rss[i]);
++              }
++      } while (svc_cnt > 1);
++
 +      return 0;
 +}
 +
@@ -391,27 +513,35 @@ index 58fcb8e..d7c3163 100644
 +
 +      pthread_mutex_lock(&mut);
 +      if (!svc_cnt) {
-+              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
++              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
 +              if (ret)
-+                      goto out;
++                      goto err1;
 +
 +              ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
 +              if (ret) {
-+                      close(svc_fds[0]);
-+                      close(svc_fds[1]);
 +                      ret = ERR(ret);
-+                      goto out;
++                      goto err2;
 +              }
 +      }
 +
 +      msg.op = RS_SVC_INSERT;
 +      msg.status = EINVAL;
 +      msg.rs = rs;
-+      svc_cnt++;
-+      write(svc_fds[0], &msg, sizeof msg);
-+      read(svc_fds[0], &msg, sizeof msg);
++      write(svc_sock[0], &msg, sizeof msg);
++      read(svc_sock[0], &msg, sizeof msg);
 +      ret = ERR(msg.status);
-+out:
++      if (ret && !svn_cnt)
++              goto err3;
++
++      pthread_mutex_unlock(&mut);
++      return ret;
++
++err3:
++      pthread_join(svc_id, NULL);
++err2:
++      close(svc_sock[0]);
++      close(svc_sock[1]);
++err1:
 +      pthread_mutex_unlock(&mut);
 +      return ret;
 +}
@@ -425,11 +555,14 @@ index 58fcb8e..d7c3163 100644
 +      msg.op = RS_SVC_REMOVE;
 +      msg.status = EINVAL;
 +      msg.rs = rs;
-+      write(svc_fds[0], &msg, sizeof msg);
-+      read(svc_fds[0], &msg, sizeof msg);
++      write(svc_sock[0], &msg, sizeof msg);
++      read(svc_sock[0], &msg, sizeof msg);
 +      ret = ERR(msg.status);
-+      if (!ret && !--svn_cnt)
++      if (!svn_cnt) {
 +              pthread_join(svc_id, NULL);
++              close(svc_sock[0]);
++              close(svc_sock[1]);
++      }
 +
 +      pthread_mutex_unlock(&mut);
 +      return ret;
@@ -438,7 +571,7 @@ index 58fcb8e..d7c3163 100644
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -306,10 +493,10 @@ out:
+@@ -306,10 +626,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -451,7 +584,7 @@ index 58fcb8e..d7c3163 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -321,7 +508,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +641,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -460,7 +593,7 @@ index 58fcb8e..d7c3163 100644
  {
        struct rsocket *rs;
  
-@@ -329,7 +516,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +649,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
@@ -472,7 +605,7 @@ index 58fcb8e..d7c3163 100644
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +542,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,7 +675,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
@@ -481,7 +614,7 @@ index 58fcb8e..d7c3163 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
-@@ -359,13 +550,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -359,13 +683,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
  
  static int rs_set_nonblocking(struct rsocket *rs, long arg)
  {
@@ -513,7 +646,7 @@ index 58fcb8e..d7c3163 100644
  
        return ret;
  }
-@@ -389,17 +594,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -389,17 +727,39 @@ static void rs_set_qp_size(struct rsocket *rs)
                rs->rq_size = 2;
  }
  
@@ -555,7 +688,7 @@ index 58fcb8e..d7c3163 100644
  
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
-@@ -409,7 +636,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -409,7 +769,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
@@ -564,7 +697,7 @@ index 58fcb8e..d7c3163 100644
  
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
-@@ -422,7 +649,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -422,7 +782,7 @@ static int rs_init_bufs(struct rsocket *rs)
  
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
@@ -573,7 +706,7 @@ index 58fcb8e..d7c3163 100644
  
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
-@@ -439,15 +666,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -439,15 +799,32 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
  }
  
@@ -592,11 +725,8 @@ index 58fcb8e..d7c3163 100644
 +
 +      qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
 +      if (!qp->rmr)
-               return -1;
--      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
--                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
--      if (!rs->cm_id->recv_cq)
++              return -1;
++
 +      return 0;
 +}
 +
@@ -604,15 +734,18 @@ index 58fcb8e..d7c3163 100644
 +{
 +      cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
 +      if (!cm_id->recv_cq_channel)
-+              return -1;
-+
+               return -1;
+-      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+-                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+-      if (!rs->cm_id->recv_cq)
 +      cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
 +                                     cm_id, cm_id->recv_cq_channel, 0);
 +      if (!cm_id->recv_cq)
                goto err1;
  
        if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +699,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -455,21 +832,20 @@ static int rs_create_cq(struct rsocket *rs)
                        goto err2;
        }
  
@@ -641,7 +774,7 @@ index 58fcb8e..d7c3163 100644
  {
        struct ibv_recv_wr wr, *bad;
  
-@@ -481,6 +724,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -481,6 +857,23 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -665,7 +798,7 @@ index 58fcb8e..d7c3163 100644
  static int rs_create_ep(struct rsocket *rs)
  {
        struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +751,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -491,7 +884,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
  
@@ -674,7 +807,7 @@ index 58fcb8e..d7c3163 100644
        if (ret)
                return ret;
  
-@@ -548,8 +808,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -548,8 +941,76 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
  }
  
@@ -691,12 +824,14 @@ index 58fcb8e..d7c3163 100644
 +
 +      if (qp->cm_id) {
 +              if (qp->cm_id->qp) {
++                      tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
 +                      epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
 +                                qp->cm_id->recv_cq_channel->fd, NULL);
 +                      rdma_destroy_qp(qp->cm_id);
 +              }
 +              rdma_destroy_id(qp->cm_id);
 +      }
++
 +      free(qp);
 +}
 +
@@ -730,6 +865,7 @@ index 58fcb8e..d7c3163 100644
 +      if (rs->sbuf)
 +              free(rs->sbuf);
 +
++      tdestroy(rs->dest_map, free);
 +      fastlock_destroy(&rs->map_lock);
 +      fastlock_destroy(&rs->cq_wait_lock);
 +      fastlock_destroy(&rs->cq_lock);
@@ -748,7 +884,7 @@ index 58fcb8e..d7c3163 100644
        if (rs->index >= 0)
                rs_remove(rs);
  
-@@ -581,7 +906,7 @@ static void rs_free(struct rsocket *rs)
+@@ -581,7 +1042,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
  
@@ -757,7 +893,7 @@ index 58fcb8e..d7c3163 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -635,29 +960,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +1096,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
@@ -820,7 +956,7 @@ index 58fcb8e..d7c3163 100644
        return rs->index;
  
  err:
-@@ -671,9 +1021,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +1157,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -842,7 +978,7 @@ index 58fcb8e..d7c3163 100644
        return ret;
  }
  
-@@ -709,7 +1068,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +1204,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -851,7 +987,7 @@ index 58fcb8e..d7c3163 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -717,7 +1076,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +1212,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -860,10 +996,45 @@ index 58fcb8e..d7c3163 100644
        if (ret < 0)
                goto err;
  
-@@ -854,13 +1213,248 @@ connected:
-       return ret;
- }
+@@ -825,42 +1320,309 @@ connected:
+                       break;
              }
  
+-              rs_save_conn_data(rs, cresp);
+-              rs->state = rs_connect_rdwr;
+-              break;
+-      case rs_accepting:
+-              if (!(rs->fd_flags & O_NONBLOCK))
+-                      rs_set_nonblocking(rs, 0);
++              rs_save_conn_data(rs, cresp);
++              rs->state = rs_connect_rdwr;
++              break;
++      case rs_accepting:
++              if (!(rs->fd_flags & O_NONBLOCK))
++                      rs_set_nonblocking(rs, 0);
++
++              ret = ucma_complete(rs->cm_id);
++              if (ret)
++                      break;
++
++              rs->state = rs_connect_rdwr;
++              break;
++      default:
++              ret = ERR(EINVAL);
++              break;
++      }
++
++      if (ret) {
++              if (errno == EAGAIN || errno == EWOULDBLOCK) {
++                      errno = EINPROGRESS;
++              } else {
++                      rs->state = rs_connect_error;
++                      rs->err = errno;
++              }
++      }
++      return ret;
++}
++
 +static int ds_init_ep(struct rsocket *rs)
 +{
 +      struct ds_smsg *msg;
@@ -968,6 +1139,32 @@ index 58fcb8e..d7c3163 100644
 +      }
 +}
 +
++static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
++                        socklen_t addrlen)
++{
++      struct ibv_port_attr port_attr;
++      struct ibv_ah_attr attr;
++      int ret;
++
++      memcpy(&qp->dest.addr, addr, addrlen);
++      qp->dest.qp = qp;
++      qp->dest.qpn = qp->cm_id->qp->qp_num;
++
++      ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
++      if (ret)
++              return ret;
++
++      memset(&attr, 0, sizeof attr);
++      attr.dlid = port_attr.lid;
++      attr.port_num = qp->cm_id->port_num;
++      qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
++      if (!qp->dest.ah)
++              return ERR(ENOMEM);
++
++      tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
++      return 0;
++}
++
 +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
 +                      socklen_t addrlen, struct ds_qp **qp)
 +{
@@ -1012,7 +1209,11 @@ index 58fcb8e..d7c3163 100644
 +      if (ret)
 +              goto err;
 +
-+      event.events = EPOLLIN | EPOLLOUT;
++      ret = ds_add_qp_dest(*qp, src_addr, addrlen);
++      if (ret)
++              goto err;
++
++      event.events = EPOLLIN;
 +      event.data.ptr = *qp;
 +      ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
 +                      (*qp)->cm_id->recv_cq_channel->fd, &event);
@@ -1055,18 +1256,32 @@ index 58fcb8e..d7c3163 100644
 +      socklen_t src_len;
 +      struct ds_qp *qp;
 +      int ret = 0;
-+
+-              ret = ucma_complete(rs->cm_id);
+-              if (ret)
+-                      break;
 +      fastlock_acquire(&rs->map_lock);
 +      dest = tfind(addr, &rs->dest_map, ds_compare_addr);
 +      if (dest)
 +              goto out;
-+
+-              rs->state = rs_connect_rdwr;
+-              break;
+-      default:
+-              ret = ERR(EINVAL);
+-              break;
 +      if (rs->state == rs_init) {
 +              ret = ds_init_ep(rs);
 +              if (ret)
 +                      goto out;
-+      }
-+
+       }
+-      if (ret) {
+-              if (errno == EAGAIN || errno == EWOULDBLOCK) {
+-                      errno = EINPROGRESS;
+-              } else {
+-                      rs->state = rs_connect_error;
+-                      rs->err = errno;
 +      ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
 +      if (ret)
 +              goto out;
@@ -1075,20 +1290,22 @@ index 58fcb8e..d7c3163 100644
 +      if (ret)
 +              goto out;
 +
-+      *dest = calloc(1, sizeof(struct ds_dest));
-+      if (!*dest) {
-+              ret = ERR(ENOMEM);
-+              goto out;
-+      }
++      if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
++              *dest = calloc(1, sizeof(struct ds_dest));
++              if (!*dest) {
++                      ret = ERR(ENOMEM);
++                      goto out;
+               }
 +
-+      memcpy(&(*dest)->addr, addr, addrlen);
-+      (*dest)->qp = qp;
-+      tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
++              memcpy(&(*dest)->addr, addr, addrlen);
++              (*dest)->qp = qp;
++              tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+       }
 +out:
 +      fastlock_release(&rs->map_lock);
-+      return ret;
-+}
-+
+       return ret;
+ }
  int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
  {
        struct rsocket *rs;
@@ -1111,7 +1328,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1496,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -902,6 +1664,24 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -1136,7 +1353,7 @@ index 58fcb8e..d7c3163 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -1045,7 +1657,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1045,7 +1825,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
@@ -1145,7 +1362,7 @@ index 58fcb8e..d7c3163 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1187,6 +1799,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1187,6 +1967,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
        return ret;
  }
  
@@ -1311,7 +1528,7 @@ index 58fcb8e..d7c3163 100644
  static int rs_nonblocking(struct rsocket *rs, int flags)
  {
        return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1218,9 +1989,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1218,9 +2157,19 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
@@ -1332,7 +1549,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +2016,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1235,7 +2184,7 @@ static int rs_have_rdata(struct rsocket *rs)
  
  static int rs_conn_have_rdata(struct rsocket *rs)
  {
@@ -1341,7 +1558,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1244,6 +2025,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1244,6 +2193,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
               !(rs->state & rs_connected);
  }
  
@@ -1408,7 +1625,7 @@ index 58fcb8e..d7c3163 100644
  static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
  {
        size_t left = len;
-@@ -1289,6 +2130,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1289,6 +2298,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -1422,7 +1639,7 @@ index 58fcb8e..d7c3163 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1338,7 +2186,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1338,7 +2354,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
  
@@ -1431,7 +1648,7 @@ index 58fcb8e..d7c3163 100644
  
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
-@@ -1349,6 +2197,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
+@@ -1349,6 +2365,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
  {
        int ret;
  
@@ -1446,7 +1663,7 @@ index 58fcb8e..d7c3163 100644
        ret = rrecv(socket, buf, len, flags);
        if (ret > 0 && src_addr)
                rgetpeername(socket, src_addr, addrlen);
-@@ -1390,14 +2246,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1390,14 +2414,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -1463,7 +1680,7 @@ index 58fcb8e..d7c3163 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1446,10 +2302,75 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1446,10 +2470,81 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1485,6 +1702,7 @@ index 58fcb8e..d7c3163 100644
 +
 +      hdr.tag = htonll(DS_UDP_TAG);
 +      hdr.version = 1;
++      hdr.op = op;
 +      memset(&hdr->reserved, 0, sizeof hdr->reserved);
 +      hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
 +
@@ -1495,15 +1713,20 @@ index 58fcb8e..d7c3163 100644
 +      memset(&msg, 0, sizeof msg);
 +      msg.msg_iov = miov;
 +      msg.msg_iovlen = iovcnt + 1;
-+      return sendmsg(rs->fd, msg, flags);
++      return sendmsg(rs->udp_sock, msg, flags);
 +}
 +
-+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
++static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
++                         int flags, uint8_t op)
 +{
 +      struct iovec iov;
-+      iov.iov_base = buf;
-+      iov_iov_len = len;
-+      return ds_sendv_udp(s, &iov, 1, flags);
++      if (buf && len) {
++              iov.iov_base = buf;
++              iov_iov_len = len;
++              return ds_sendv_udp(rs, &iov, 1, flags, op);
++      } else {
++              return ds_sendv_udp(rs, NULL, 0, flags, op);
++      }
 +}
 +
 +static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
@@ -1514,7 +1737,7 @@ index 58fcb8e..d7c3163 100644
 +      int flags, ret = 0;
 +
 +      if (!rs->conn_dest->ah)
-+              return ds_send_udp(rs, buf, len, flags);
++              return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
 +
 +      if (!ds_can_send(rs)) {
 +              ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
@@ -1540,7 +1763,7 @@ index 58fcb8e..d7c3163 100644
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1463,6 +2384,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2558,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1554,7 +1777,7 @@ index 58fcb8e..d7c3163 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1484,7 +2412,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1484,7 +2586,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1563,20 +1786,20 @@ index 58fcb8e..d7c3163 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1537,10 +2465,26 @@ out:
+@@ -1537,10 +2639,26 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
 -      if (dest_addr || addrlen)
 -              return ERR(EISCONN);
 +      struct rsocket *rs;
--      return rsend(socket, buf, len, flags);
++
 +      rs = idm_at(&idm, socket);
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
 +                      return ERR(EISCONN);
-+
+-      return rsend(socket, buf, len, flags);
 +              return rsend(socket, buf, len, flags);
 +      }
 +
@@ -1593,7 +1816,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2543,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1599,7 +2717,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1602,7 +1825,7 @@ index 58fcb8e..d7c3163 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1652,7 +2596,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1652,7 +2770,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1611,7 +1834,67 @@ index 58fcb8e..d7c3163 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1948,7 +2892,7 @@ int rshutdown(int socket, int how)
+@@ -1689,8 +2807,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+       int ret;
+ check_cq:
+-      if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
+-          (rs->state & rs_error)) {
++      if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
++           (rs->state == rs_disconnected) || (rs->state & rs_error))) {
+               rs_process_cq(rs, nonblock, test);
+               revents = 0;
+@@ -1706,6 +2824,16 @@ check_cq:
+               }
+               return revents;
++      } else if (rs->type == SOCK_DGRAM) {
++              ds_process_cqs(rs, nonblock, test);
++
++              revents = 0;
++              if ((events & POLLIN) && rs_have_rdata(rs))
++                      revents |= POLLIN;
++              if ((events & POLLOUT) && ds_can_send(rs))
++                      revents |= POLLOUT;
++
++              return revents;
+       }
+       if (rs->state == rs_listening) {
+@@ -1765,11 +2893,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+                       if (fds[i].revents)
+                               return 1;
+-                      if (rs->state >= rs_connected)
+-                              rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
+-                      else
+-                              rfds[i].fd = rs->cm_id->channel->fd;
+-
++                      if (rs->type == SOCK_STREAM) {
++                              if (rs->state >= rs_connected)
++                                      rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
++                              else
++                                      rfds[i].fd = rs->cm_id->channel->fd;
++                      } else {
++                              rfds[i].fd = rs->epfd;
++                      }
+                       rfds[i].events = POLLIN;
+               } else {
+                       rfds[i].fd = fds[i].fd;
+@@ -1792,7 +2923,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+               rs = idm_lookup(&idm, fds[i].fd);
+               if (rs) {
+-                      rs_get_cq_event(rs);
++                      if (rs->type == SOCK_STREAM)
++                              rs_get_cq_event(rs);
++                      else
++                              ds_get_cq_event(rs);
+                       fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
+               } else {
+                       fds[i].revents = rfds[i].revents;
+@@ -1948,7 +3082,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1620,7 +1903,7 @@ index 58fcb8e..d7c3163 100644
                return 0;
        }
  
-@@ -1958,10 +2902,10 @@ int rshutdown(int socket, int how)
+@@ -1958,10 +3092,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -1634,7 +1917,7 @@ index 58fcb8e..d7c3163 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -1986,13 +2930,31 @@ int rshutdown(int socket, int how)
+@@ -1986,13 +3120,31 @@ int rshutdown(int socket, int how)
        return 0;
  }
  
@@ -1668,7 +1951,7 @@ index 58fcb8e..d7c3163 100644
  
        rs_free(rs);
        return 0;
-@@ -2017,8 +2979,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2017,8 +3169,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1683,7 +1966,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2992,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +3182,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1698,7 +1981,7 @@ index 58fcb8e..d7c3163 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2039,18 +3009,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,18 +3199,26 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -1732,7 +2015,7 @@ index 58fcb8e..d7c3163 100644
                        opt_on = *(int *) optval;
                        break;
                case SO_RCVBUF:
-@@ -2100,9 +3078,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2100,9 +3268,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
@@ -1747,7 +2030,7 @@ index 58fcb8e..d7c3163 100644
                        opt_on = *(int *) optval;
                        break;
                default:
-@@ -2314,7 +3294,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2314,7 +3484,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -1756,7 +2039,7 @@ index 58fcb8e..d7c3163 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +3328,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +3518,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -1765,7 +2048,7 @@ index 58fcb8e..d7c3163 100644
        return offset;
  }
  
-@@ -2360,7 +3340,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +3530,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1774,7 +2057,7 @@ index 58fcb8e..d7c3163 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2381,7 +3361,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3551,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -1783,7 +2066,7 @@ index 58fcb8e..d7c3163 100644
        return ret;
  }
  
-@@ -2425,7 +3405,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2425,7 +3595,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 8a59617..0000000
+++ /dev/null
@@ -1,466 +0,0 @@
-Bottom: 0ddd3b5ffa7ef87e8179ff9779cbd2f76c69ac89
-Top:    bd15dfc9adaf449efbd6fcccaac92ea3ed7ad81b
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-12-03 11:22:59 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index d7c3163..07cf31d 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -78,8 +78,11 @@ struct rs_svc_msg {
- };
- static pthread_t svc_id;
-+static int svc_sock[2];
- static int svc_cnt;
--static int svc_fds[2];
-+static int svc_size;
-+static struct rsocket **svc_rss;
-+static struct pollfd *svc_fds;
- static uint16_t def_iomap_size = 0;
- static uint16_t def_inline = 64;
-@@ -227,11 +230,19 @@ struct ds_header {
- #define DS_IPV4_HDR_LEN  8
- #define DS_IPV6_HDR_LEN 24
-+struct ds_dest {
-+      union socket_addr addr; /* must be first */
-+      struct ds_qp      *qp;
-+      struct ibv_ah     *ah;
-+      uint32_t           qpn;
-+};
-+
- struct ds_qp {
-       dlist_t           list;
-       struct rsocket    *rs;
-       struct rdma_cm_id *cm_id;
-       struct ds_header  hdr;
-+      struct ds_dest    dest;
-       struct ibv_mr     *smr;
-       struct ibv_mr     *rmr;
-@@ -240,13 +251,6 @@ struct ds_qp {
-       int               cq_armed;
- };
--struct ds_dest {
--      union socket_addr addr; /* must be first */
--      struct ds_qp      *qp;
--      struct ibv_ah     *ah;
--      uint32_t           qpn;
--};
--
- struct rsocket {
-       int               type;
-       int               index;
-@@ -332,13 +336,21 @@ struct rsocket {
-       int               iomap_pending;
- };
--#define DS_UDP_TAG 0x5555555555555555ULL
-+#define DS_UDP_TAG 0x55555555
- struct ds_udp_header {
--      uint64_t          tag;
-+      uint32_t          tag;
-       uint8_t           version;
--      uint8_t           reserved[3];
--      uint32_t          qpn;  /* upper 8-bits reserved */
-+      uint8_t           op;
-+      uint8_t           length;
-+      uint8_t           reserved;
-+      uint32_t          qpn;  /* lower 8-bits reserved */
-+      union {
-+              uint32_t ipv4;
-+              struct {
-+                      uint8_t  addr[16];
-+              } ipv6;
-+      } addr;
- };
- #define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
-@@ -362,8 +374,118 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
-       }
- }
-+static int rs_svc_grow_sets(void)
-+{
-+      struct rsocket **rss;
-+      struct pollfd *fds;
-+      void *set;
-+
-+      set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
-+      if (!set)
-+              return ENOMEM;
-+
-+      svc_size += 2;
-+      rss = set;
-+      fds = set + sizeof(*rss) * svc_size;
-+      if (svc_cnt) {
-+              memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
-+              memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
-+      }
-+
-+      free(svc_rss);
-+      free(svc_fds);
-+      svc_rss = rss;
-+      svc_fds = fds;
-+      return 0;
-+}
-+
-+/*
-+ * Index 0 is reserved for the service's communication socket.
-+ */
-+static int rs_svc_add_rs(struct rsocket *rs)
-+{
-+      int ret;
-+
-+      if (svc_cnt >= svc_size - 1) {
-+              ret = rs_svc_grow_sets();
-+              if (ret)
-+                      return ret;
-+      }
-+
-+      svc_rss[++svc_cnt] = rs;
-+      svc_fds[svc_cnt].fd = rs->udp_sock;
-+      svc_fds[svc_cnt].events = POLLIN;
-+      svc_fds[svc_cnt].revents = 0;
-+      return 0;
-+}
-+
-+static int rs_svc_rm_rs(struct rsocket *rs)
-+{
-+      int i;
-+
-+      for (i = 1; i <= svc_cnt; i++) {
-+              if (svc_rss[i] == rs) {
-+                      svc_cnt--;
-+                      svc_rss[i] = svc_rss[svc_cnt];
-+                      svc_fds[i] = svc_fds[svc_cnt];
-+                      return 0;
-+              }
-+      }
-+      return EBADF;
-+}
-+
-+static void rs_svc_process_sock(void)
-+{
-+      struct rs_svc_msg msg;
-+
-+      read(svc_sock[1], &msg, sizeof msg);
-+      switch (msg.op) {
-+      case RS_SVC_INSERT:
-+              msg.status = rs_svc_add_rs(msg.rs);
-+              break;
-+      case RS_SVC_REMOVE:
-+              msg.status = rs_svc_rm_rs(msg.rs);
-+              break;
-+      default:
-+              msg.status = ENOTSUP;
-+              break;
-+      }
-+      write(svc_sock[1], &msg, sizeof msg);
-+}
-+
-+static void rs_svc_process_rs(struct rsocket *rs)
-+{
-+
-+}
-+
- static int rs_svc_run(void *arg)
- {
-+      struct rs_svc_msg msg;
-+      int i, ret;
-+
-+      ret = rs_svc_grow_sets();
-+      if (ret) {
-+              msg.status = ret;
-+              write(svc_sock[1] &msg, sizeof msg);
-+              return ret;
-+      }
-+
-+      svc_fds[0].fd = svc_sock[1];
-+      svc_fds[0].events = POLLIN;
-+      do {
-+              for (i = 0; i <= svc_cnt; i++)
-+                      svc_fds[i].revents = 0;
-+
-+              poll(svc_fds, svc_cnt + 1, -1);
-+              if (svc_fds[0].revents)
-+                      rs_svc_process_sock();
-+
-+              for (i = 1; i <= svc_cnt; i++) {
-+                      if (svc_fds[i].revents)
-+                              rs_svc_process_rs(svc_rss[i]);
-+              }
-+      } while (svc_cnt > 1);
-+
-       return 0;
- }
-@@ -374,27 +496,35 @@ static int rs_svc_insert(struct rsocket *rs)
-       pthread_mutex_lock(&mut);
-       if (!svc_cnt) {
--              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
-+              ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
-               if (ret)
--                      goto out;
-+                      goto err1;
-               ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
-               if (ret) {
--                      close(svc_fds[0]);
--                      close(svc_fds[1]);
-                       ret = ERR(ret);
--                      goto out;
-+                      goto err2;
-               }
-       }
-       msg.op = RS_SVC_INSERT;
-       msg.status = EINVAL;
-       msg.rs = rs;
--      svc_cnt++;
--      write(svc_fds[0], &msg, sizeof msg);
--      read(svc_fds[0], &msg, sizeof msg);
-+      write(svc_sock[0], &msg, sizeof msg);
-+      read(svc_sock[0], &msg, sizeof msg);
-       ret = ERR(msg.status);
--out:
-+      if (ret && !svn_cnt)
-+              goto err3;
-+
-+      pthread_mutex_unlock(&mut);
-+      return ret;
-+
-+err3:
-+      pthread_join(svc_id, NULL);
-+err2:
-+      close(svc_sock[0]);
-+      close(svc_sock[1]);
-+err1:
-       pthread_mutex_unlock(&mut);
-       return ret;
- }
-@@ -408,11 +538,14 @@ static int rs_svc_remove(struct rsocket *rs)
-       msg.op = RS_SVC_REMOVE;
-       msg.status = EINVAL;
-       msg.rs = rs;
--      write(svc_fds[0], &msg, sizeof msg);
--      read(svc_fds[0], &msg, sizeof msg);
-+      write(svc_sock[0], &msg, sizeof msg);
-+      read(svc_sock[0], &msg, sizeof msg);
-       ret = ERR(msg.status);
--      if (!ret && !--svn_cnt)
-+      if (!svn_cnt) {
-               pthread_join(svc_id, NULL);
-+              close(svc_sock[0]);
-+              close(svc_sock[1]);
-+      }
-       pthread_mutex_unlock(&mut);
-       return ret;
-@@ -821,12 +954,14 @@ static void ds_free_qp(struct ds_qp *qp)
-       if (qp->cm_id) {
-               if (qp->cm_id->qp) {
-+                      tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
-                       epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
-                                 qp->cm_id->recv_cq_channel->fd, NULL);
-                       rdma_destroy_qp(qp->cm_id);
-               }
-               rdma_destroy_id(qp->cm_id);
-       }
-+
-       free(qp);
- }
-@@ -860,6 +995,7 @@ static void ds_free(struct rsocket *rs)
-       if (rs->sbuf)
-               free(rs->sbuf);
-+      tdestroy(rs->dest_map, free);
-       fastlock_destroy(&rs->map_lock);
-       fastlock_destroy(&rs->cq_wait_lock);
-       fastlock_destroy(&rs->cq_lock);
-@@ -1317,6 +1453,32 @@ static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
-       }
- }
-+static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
-+                        socklen_t addrlen)
-+{
-+      struct ibv_port_attr port_attr;
-+      struct ibv_ah_attr attr;
-+      int ret;
-+
-+      memcpy(&qp->dest.addr, addr, addrlen);
-+      qp->dest.qp = qp;
-+      qp->dest.qpn = qp->cm_id->qp->qp_num;
-+
-+      ret = ibv_query_port(qp->cm_id->verbs, qp->cm_id->port_num, &port_attr);
-+      if (ret)
-+              return ret;
-+
-+      memset(&attr, 0, sizeof attr);
-+      attr.dlid = port_attr.lid;
-+      attr.port_num = qp->cm_id->port_num;
-+      qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
-+      if (!qp->dest.ah)
-+              return ERR(ENOMEM);
-+
-+      tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
-+      return 0;
-+}
-+
- static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-                       socklen_t addrlen, struct ds_qp **qp)
- {
-@@ -1361,7 +1523,11 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-       if (ret)
-               goto err;
--      event.events = EPOLLIN | EPOLLOUT;
-+      ret = ds_add_qp_dest(*qp, src_addr, addrlen);
-+      if (ret)
-+              goto err;
-+
-+      event.events = EPOLLIN;
-       event.data.ptr = *qp;
-       ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
-                       (*qp)->cm_id->recv_cq_channel->fd, &event);
-@@ -1424,15 +1590,17 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
-       if (ret)
-               goto out;
--      *dest = calloc(1, sizeof(struct ds_dest));
--      if (!*dest) {
--              ret = ERR(ENOMEM);
--              goto out;
--      }
-+      if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
-+              *dest = calloc(1, sizeof(struct ds_dest));
-+              if (!*dest) {
-+                      ret = ERR(ENOMEM);
-+                      goto out;
-+              }
--      memcpy(&(*dest)->addr, addr, addrlen);
--      (*dest)->qp = qp;
--      tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
-+              memcpy(&(*dest)->addr, addr, addrlen);
-+              (*dest)->qp = qp;
-+              tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
-+      }
- out:
-       fastlock_release(&rs->map_lock);
-       return ret;
-@@ -2319,6 +2487,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       hdr.tag = htonll(DS_UDP_TAG);
-       hdr.version = 1;
-+      hdr.op = op;
-       memset(&hdr->reserved, 0, sizeof hdr->reserved);
-       hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-@@ -2329,15 +2498,20 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       memset(&msg, 0, sizeof msg);
-       msg.msg_iov = miov;
-       msg.msg_iovlen = iovcnt + 1;
--      return sendmsg(rs->fd, msg, flags);
-+      return sendmsg(rs->udp_sock, msg, flags);
- }
--static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
-+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
-+                         int flags, uint8_t op)
- {
-       struct iovec iov;
--      iov.iov_base = buf;
--      iov_iov_len = len;
--      return ds_sendv_udp(s, &iov, 1, flags);
-+      if (buf && len) {
-+              iov.iov_base = buf;
-+              iov_iov_len = len;
-+              return ds_sendv_udp(rs, &iov, 1, flags, op);
-+      } else {
-+              return ds_sendv_udp(rs, NULL, 0, flags, op);
-+      }
- }
- static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-@@ -2348,7 +2522,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-       int flags, ret = 0;
-       if (!rs->conn_dest->ah)
--              return ds_send_udp(rs, buf, len, flags);
-+              return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-       if (!ds_can_send(rs)) {
-               ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-@@ -2633,8 +2807,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
-       int ret;
- check_cq:
--      if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
--          (rs->state & rs_error)) {
-+      if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) ||
-+           (rs->state == rs_disconnected) || (rs->state & rs_error))) {
-               rs_process_cq(rs, nonblock, test);
-               revents = 0;
-@@ -2650,6 +2824,16 @@ check_cq:
-               }
-               return revents;
-+      } else if (rs->type == SOCK_DGRAM) {
-+              ds_process_cqs(rs, nonblock, test);
-+
-+              revents = 0;
-+              if ((events & POLLIN) && rs_have_rdata(rs))
-+                      revents |= POLLIN;
-+              if ((events & POLLOUT) && ds_can_send(rs))
-+                      revents |= POLLOUT;
-+
-+              return revents;
-       }
-       if (rs->state == rs_listening) {
-@@ -2709,11 +2893,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
-                       if (fds[i].revents)
-                               return 1;
--                      if (rs->state >= rs_connected)
--                              rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
--                      else
--                              rfds[i].fd = rs->cm_id->channel->fd;
--
-+                      if (rs->type == SOCK_STREAM) {
-+                              if (rs->state >= rs_connected)
-+                                      rfds[i].fd = rs->cm_id->recv_cq_channel->fd;
-+                              else
-+                                      rfds[i].fd = rs->cm_id->channel->fd;
-+                      } else {
-+                              rfds[i].fd = rs->epfd;
-+                      }
-                       rfds[i].events = POLLIN;
-               } else {
-                       rfds[i].fd = fds[i].fd;
-@@ -2736,7 +2923,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
-               rs = idm_lookup(&idm, fds[i].fd);
-               if (rs) {
--                      rs_get_cq_event(rs);
-+                      if (rs->type == SOCK_STREAM)
-+                              rs_get_cq_event(rs);
-+                      else
-+                              ds_get_cq_event(rs);
-                       fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
-               } else {
-                       fds[i].revents = rfds[i].revents;