]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Mon, 17 Dec 2012 21:41:21 +0000 (13:41 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 17 Dec 2012 21:41:21 +0000 (13:41 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index 9c4fe862097e0e5a382f3ddc809c709dfff9afce..dbb3311481beddc6a23aa2eeafc081f9f841d328 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: 42a80d310cd3e2086e62e4a02d65302a705b5031
-Head: 5ecf5331f42951a4a7ead43657894630be857283
+Previous: e81ba1409286ee11cf6eb8ad8d4bf1c6271da316
+Head: bc7d39b5bd7e376900e5b0d0024e5b9a521697c7
 Applied:
-  dsocket: 9a45fe716b7886d96ac080929ecba4eeb1f12d0f
-  refresh-temp: 5ecf5331f42951a4a7ead43657894630be857283
+  dsocket: bc7d39b5bd7e376900e5b0d0024e5b9a521697c7
 Unapplied:
   udpong: a003dda8e0d2f6ea134932bdd8bf5a5f29b7bfce
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
index a869c8d2817b406847c1ca98244f88de4222d424..8c69228e57851f95c5979bbeaaa137a3fe84a03b 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top:    f8258dbc93f14acdb67621e51c1696fca92c841a
+Top:    89a1bd159135ab5aae205202e62cdc91b9ab3b0d
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -113,7 +113,7 @@ index 0a0370e..7135a61 100644
  {
        errno = err;
 diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..4631b1d 100644
+index a060f66..219aa4a 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
 @@ -47,6 +47,8 @@
@@ -134,27 +134,18 @@ index a060f66..4631b1d 100644
  #define RS_QP_MAX_SIZE 0xFFFE
  #define RS_QP_CTRL_SIZE 4
  #define RS_CONN_RETRIES 6
-@@ -64,6 +66,36 @@
+@@ -64,6 +66,27 @@
  static struct index_map idm;
  static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
  
-+enum {
-+      RS_SVC_INSERT,
-+      RS_SVC_REMOVE
-+};
-+
 +struct rsocket;
 +
-+
-+#define PRINTADDR(a) \
-+printf("%s port %x ip %x\n", __func__, \
-+      ((struct sockaddr_in *)a)->sin_port, \
-+      ((struct sockaddr_in *)a)->sin_addr.s_addr)
-+
-+
++enum {
++      RS_SVC_DGRAM = 1 << 0
++};
 +
 +struct rs_svc_msg {
-+      uint32_t op;
++      uint32_t svcs;
 +      uint32_t status;
 +      struct rsocket *rs;
 +};
@@ -171,7 +162,7 @@ index a060f66..4631b1d 100644
  static uint16_t def_iomap_size = 0;
  static uint16_t def_inline = 64;
  static uint16_t def_sqsize = 384;
-@@ -100,6 +132,14 @@ enum {
+@@ -100,6 +123,14 @@ enum {
  #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
  #define rs_msg_op(imm_data)   (imm_data >> 29)
  #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
@@ -186,7 +177,7 @@ index a060f66..4631b1d 100644
  
  enum {
        RS_CTRL_DISCONNECT,
-@@ -111,6 +151,18 @@ struct rs_msg {
+@@ -111,6 +142,18 @@ struct rs_msg {
        uint32_t data;
  };
  
@@ -205,7 +196,7 @@ index a060f66..4631b1d 100644
  struct rs_sge {
        uint64_t addr;
        uint32_t key;
-@@ -145,8 +197,6 @@ struct rs_conn_data {
+@@ -145,8 +188,6 @@ struct rs_conn_data {
        struct rs_sge     data_buf;
  };
  
@@ -214,7 +205,7 @@ index a060f66..4631b1d 100644
  /*
   * rsocket states are ordered as passive, connecting, connected, disconnected.
   */
-@@ -160,9 +210,9 @@ enum rs_state {
+@@ -160,9 +201,9 @@ enum rs_state {
        rs_connecting      = rs_opening |   0x0040,
        rs_accepting       = rs_opening |   0x0080,
        rs_connected       =                0x0100,
@@ -227,7 +218,7 @@ index a060f66..4631b1d 100644
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
-@@ -170,68 +220,249 @@ enum rs_state {
+@@ -170,68 +211,251 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -283,6 +274,7 @@ index a060f66..4631b1d 100644
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
 -      fastlock_t        iomap_lock;
+-
 +      fastlock_t        map_lock; /* acquire slock first if needed */
 +
 +      union {
@@ -330,7 +322,8 @@ index a060f66..4631b1d 100644
 +                      struct ds_smsg    *smsg_free;
 +              };
 +      };
++
++      int               svcs;
        int               opts;
        long              fd_flags;
        uint64_t          so_opts;
@@ -383,12 +376,12 @@ index a060f66..4631b1d 100644
 -      void             *target_buffer_list;
 -      volatile struct rs_sge    *target_sgl;
 -      struct rs_iomap  *target_iomap;
--
++#define DS_UDP_TAG 0x55555555
 -      uint32_t          rbuf_size;
 -      struct ibv_mr    *rmr;
 -      uint8_t           *rbuf;
-+#define DS_UDP_TAG 0x55555555
+-
 -      uint32_t          sbuf_size;
 -      struct ibv_mr    *smr;
 -      struct ibv_sge    ssgl[2];
@@ -430,7 +423,7 @@ index a060f66..4631b1d 100644
 +      }
 +}
 +
-+static int rs_add_to_svc(struct rsocket *rs)
++static int rs_modify_svcs(struct rsocket *rs, int svcs)
 +{
 +      struct rs_svc_msg msg;
 +      int ret;
@@ -439,58 +432,60 @@ index a060f66..4631b1d 100644
 +      if (!svc_cnt) {
 +              ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
 +              if (ret)
-+                      goto err1;
++                      goto unlock;
 +
 +              ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
 +              if (ret) {
 +                      ret = ERR(ret);
-+                      goto err2;
++                      goto closepair;
 +              }
 +      }
 +
-+      msg.op = RS_SVC_INSERT;
++      msg.svcs = svcs;
 +      msg.status = EINVAL;
 +      msg.rs = rs;
 +      write(svc_sock[0], &msg, sizeof msg);
 +      read(svc_sock[0], &msg, sizeof msg);
 +      ret = rdma_seterrno(msg.status);
-+      if (ret && !svc_cnt)
-+              goto err3;
++      if (svc_cnt)
++              goto unlock;
++//    if (ret && !svc_cnt)
++//            goto join;
++//
++//    pthread_mutex_unlock(&mut);
++//    return ret;
 +
-+      pthread_mutex_unlock(&mut);
-+      return ret;
-+
-+err3:
 +      pthread_join(svc_id, NULL);
-+err2:
++closepair:
 +      close(svc_sock[0]);
 +      close(svc_sock[1]);
-+err1:
++unlock:
 +      pthread_mutex_unlock(&mut);
 +      return ret;
 +}
 +
-+static int rs_remove_from_svc(struct rsocket *rs)
-+{
-+      struct rs_svc_msg msg;
-+      int ret;
-+
-+      pthread_mutex_lock(&mut);
-+      msg.op = RS_SVC_REMOVE;
-+      msg.status = EINVAL;
-+      msg.rs = rs;
-+      write(svc_sock[0], &msg, sizeof msg);
-+      read(svc_sock[0], &msg, sizeof msg);
-+      ret = ERR(msg.status);
-+      if (!svc_cnt) {
-+              pthread_join(svc_id, NULL);
-+              close(svc_sock[0]);
-+              close(svc_sock[1]);
-+      }
-+
-+      pthread_mutex_unlock(&mut);
-+      return ret;
-+}
++//static void rs_remove_from_svc(struct rsocket *rs)
++//{
++//    struct rs_svc_msg msg;
++//    int ret;
++//
++//    pthread_mutex_lock(&mut);
++//    if (svc_cnt) {
++//            msg.op = RS_SVC_REMOVE;
++//            msg.status = EINVAL;
++//            msg.rs = rs;
++//            write(svc_sock[0], &msg, sizeof msg);
++//            read(svc_sock[0], &msg, sizeof msg);
++//    }
++//
++//    if (!svc_cnt) {
++//            pthread_join(svc_id, NULL);
++//            close(svc_sock[0]);
++//            close(svc_sock[1]);
++//    }
++//
++//    pthread_mutex_unlock(&mut);
++//}
 +
 +static int ds_compare_addr(const void *dst1, const void *dst2)
 +{
@@ -502,14 +497,13 @@ index a060f66..4631b1d 100644
 +
 +      len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
 +            sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
-+//    printf("%s len %d sizeof sin %d memcmp %d\n", __func__, len, sizeof(struct sockaddr_in), memcmp(dst1,dst2,len));
 +      return memcmp(dst1, dst2, len);
 +}
 +
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -307,10 +538,10 @@ out:
+@@ -307,10 +531,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -522,7 +516,7 @@ index a060f66..4631b1d 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -322,7 +553,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +546,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -531,7 +525,7 @@ index a060f66..4631b1d 100644
  {
        struct rsocket *rs;
  
-@@ -330,29 +561,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,29 +554,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
@@ -576,7 +570,7 @@ index a060f66..4631b1d 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
-@@ -360,13 +601,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +594,26 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
  
  static int rs_set_nonblocking(struct rsocket *rs, long arg)
  {
@@ -607,7 +601,7 @@ index a060f66..4631b1d 100644
  
        return ret;
  }
-@@ -390,17 +644,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +637,39 @@ static void rs_set_qp_size(struct rsocket *rs)
                rs->rq_size = 2;
  }
  
@@ -649,7 +643,7 @@ index a060f66..4631b1d 100644
  
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
-@@ -410,7 +686,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -410,7 +679,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
@@ -658,7 +652,7 @@ index a060f66..4631b1d 100644
  
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
-@@ -423,7 +699,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -423,7 +692,7 @@ static int rs_init_bufs(struct rsocket *rs)
  
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
@@ -667,7 +661,7 @@ index a060f66..4631b1d 100644
  
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
-@@ -440,37 +716,57 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,37 +709,57 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
  }
  
@@ -740,7 +734,7 @@ index a060f66..4631b1d 100644
  {
        struct ibv_recv_wr wr, *bad;
  
-@@ -482,6 +778,26 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +771,26 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -767,7 +761,7 @@ index a060f66..4631b1d 100644
  static int rs_create_ep(struct rsocket *rs)
  {
        struct ibv_qp_init_attr qp_attr;
-@@ -492,7 +808,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +801,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
  
@@ -776,7 +770,7 @@ index a060f66..4631b1d 100644
        if (ret)
                return ret;
  
-@@ -549,8 +865,73 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +858,70 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
  }
  
@@ -808,9 +802,6 @@ index a060f66..4631b1d 100644
 +{
 +      struct ds_qp *qp;
 +
-+      if (rs->state & (rs_readable | rs_writable))
-+              rs_remove_from_svc(rs);
-+
 +      if (rs->udp_sock >= 0)
 +              close(rs->udp_sock);
 +
@@ -850,7 +841,7 @@ index a060f66..4631b1d 100644
        if (rs->index >= 0)
                rs_remove(rs);
  
-@@ -582,7 +963,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +953,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
  
@@ -859,7 +850,7 @@ index a060f66..4631b1d 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -636,29 +1017,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1007,88 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
@@ -902,7 +893,7 @@ index a060f66..4631b1d 100644
 +      }
 +      msg->next = NULL;
 +
-+      ret = rs_add_to_svc(rs);
++      ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
 +      if (ret)
 +              return ret;
 +
@@ -956,7 +947,7 @@ index a060f66..4631b1d 100644
        return rs->index;
  
  err:
-@@ -672,9 +1112,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1102,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -978,7 +969,7 @@ index a060f66..4631b1d 100644
        return ret;
  }
  
-@@ -710,7 +1159,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1149,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -987,7 +978,7 @@ index a060f66..4631b1d 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -718,7 +1167,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1157,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -996,7 +987,7 @@ index a060f66..4631b1d 100644
        if (ret < 0)
                goto err;
  
-@@ -729,7 +1178,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -729,7 +1168,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        }
  
        if (rs->fd_flags & O_NONBLOCK)
@@ -1005,7 +996,7 @@ index a060f66..4631b1d 100644
  
        ret = rs_create_ep(new_rs);
        if (ret)
-@@ -831,7 +1280,7 @@ connected:
+@@ -831,7 +1270,7 @@ connected:
                break;
        case rs_accepting:
                if (!(rs->fd_flags & O_NONBLOCK))
@@ -1014,7 +1005,7 @@ index a060f66..4631b1d 100644
  
                ret = ucma_complete(rs->cm_id);
                if (ret)
-@@ -855,13 +1304,251 @@ connected:
+@@ -855,13 +1294,240 @@ connected:
        return ret;
  }
  
@@ -1036,10 +1027,8 @@ index a060f66..4631b1d 100644
 +      int sock, ret;
 +      uint16_t port;
 +
-+//    printf("dest: "); PRINTADDR(dest_addr);
-+      *src_len = sizeof src_addr;
++      *src_len = sizeof *src_addr;
 +      ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
-+//    printf("src: "); PRINTADDR(src_addr);
 +      if (ret || !rs_any_addr(src_addr))
 +              return ret;
 +
@@ -1052,10 +1041,9 @@ index a060f66..4631b1d 100644
 +      if (ret)
 +              goto out;
 +
-+      *src_len = sizeof src_addr;
++      *src_len = sizeof *src_addr;
 +      ret = getsockname(sock, &src_addr->sa, src_len);
 +      src_addr->sin.sin_port = port;
-+//    printf("selected src: ");
 +out:
 +      close(sock);
 +      return ret;
@@ -1064,7 +1052,6 @@ index a060f66..4631b1d 100644
 +static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
 +{
 +      if (addr->sa.sa_family == AF_INET) {
-+//            PRINTADDR(addr);
 +              hdr->version = 4;
 +              hdr->length = DS_IPV4_HDR_LEN;
 +              hdr->port = addr->sin.sin_port;
@@ -1085,7 +1072,6 @@ index a060f66..4631b1d 100644
 +      struct ibv_ah_attr attr;
 +      int ret;
 +
-+//    printf("%s\n", __func__);
 +      memcpy(&qp->dest.addr, addr, addrlen);
 +      qp->dest.qp = qp;
 +      qp->dest.qpn = qp->cm_id->qp->qp_num;
@@ -1098,8 +1084,6 @@ index a060f66..4631b1d 100644
 +      attr.dlid = port_attr.lid;
 +      attr.port_num = qp->cm_id->port_num;
 +      qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
-+//    printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
-+//            attr.port_num, qp->dest.qpn);
 +      if (!qp->dest.ah)
 +              return ERR(ENOMEM);
 +
@@ -1115,7 +1099,6 @@ index a060f66..4631b1d 100644
 +      struct epoll_event event;
 +      int i, ret;
 +
-+//    PRINTADDR(src_addr);
 +      qp = calloc(1, sizeof(*qp));
 +      if (!qp)
 +              return ERR(ENOMEM);
@@ -1204,14 +1187,12 @@ index a060f66..4631b1d 100644
 +      struct ds_dest **tdest, *new_dest;
 +      int ret = 0;
 +
-+//    PRINTADDR(addr);
 +      fastlock_acquire(&rs->map_lock);
 +      tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
 +      if (tdest)
 +              goto found;
 +
 +      ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+//    printf("get src: "); PRINTADDR(&src_addr);
 +      if (ret)
 +              goto out;
 +
@@ -1258,7 +1239,6 @@ index a060f66..4631b1d 100644
 +              }
 +
 +              fastlock_acquire(&rs->slock);
-+//            PRINTADDR(addr);
 +              ret = connect(rs->udp_sock, addr, addrlen);
 +              if (!ret)
 +                      ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
@@ -1268,7 +1248,7 @@ index a060f66..4631b1d 100644
  }
  
  static int rs_post_write_msg(struct rsocket *rs,
-@@ -903,6 +1590,26 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1569,24 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -1286,8 +1266,6 @@ index a060f66..4631b1d 100644
 +      wr.wr.ud.ah = rs->conn_dest->ah;
 +      wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
 +      wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
-+//    printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
-+//            rs->conn_dest->qpn);
 +
 +      return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
 +}
@@ -1295,7 +1273,7 @@ index a060f66..4631b1d 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -1046,7 +1753,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1046,7 +1730,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
@@ -1304,7 +1282,7 @@ index a060f66..4631b1d 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1133,46 +1840,230 @@ static int rs_get_cq_event(struct rsocket *rs)
+@@ -1133,46 +1817,213 @@ static int rs_get_cq_event(struct rsocket *rs)
   */
  static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
  {
@@ -1404,7 +1382,6 @@ index a060f66..4631b1d 100644
 +                      if (ds_wr_is_recv(wc.wr_id)) {
 +                              if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
 +                                  ds_valid_recv(qp, &wc)) {
-+//                                    printf("%s recv over QP\n", __func__);
 +                                      rs->rqe_avail--;
 +                                      rmsg = &rs->dmsg[rs->rmsg_tail];
 +                                      rmsg->qp = qp;
@@ -1413,13 +1390,11 @@ index a060f66..4631b1d 100644
 +                                      if (++rs->rmsg_tail == rs->rq_size + 1)
 +                                              rs->rmsg_tail = 0;
 +                              } else {
-+//                                    printf("%s invalid recv\n", __func__);
 +                                      ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
 +                              }
 +                      } else {
 +                              smsg = (struct ds_smsg *)
 +                                     (rs->sbuf + ds_wr_offset(wc.wr_id));
-+//                            printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
 +                              smsg->next = rs->smsg_free;
 +                              rs->smsg_free = smsg;
 +                              rs->sqe_avail++;
@@ -1462,9 +1437,7 @@ index a060f66..4631b1d 100644
 +      if (!rs->cq_armed)
 +              return 0;
 +
-+//    printf("wait for epoll event\n");
 +      ret = epoll_wait(rs->epfd, &event, 1, -1);
-+//    printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
 +      if (ret <= 0)
 +              return ret;
 +
@@ -1481,17 +1454,12 @@ index a060f66..4631b1d 100644
 +
 +static int rs_have_rdata(struct rsocket *rs);
 +static int ds_can_send(struct rsocket *rs);
++static int rs_poll_all(struct rsocket *rs);
++static int ds_all_sends_done(struct rsocket *rs);
 +
 +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
 +{
 +      int ret = 0;
-+
-+      if (test == rs_have_rdata)
-+              printf("%s test rs_have_rdata\n", __func__);
-+      else if (test == ds_can_send)
-+              printf("%s test ds_can_send\n", __func__);
-+      else
-+              printf("%s test ?\n", __func__);
  
        fastlock_acquire(&rs->cq_lock);
        do {
@@ -1499,17 +1467,14 @@ index a060f66..4631b1d 100644
 -              ret = rs_poll_cq(rs);
 +              ds_poll_cqs(rs);
                if (test(rs)) {
-+                      printf("%s test succeeded\n", __func__);
                        ret = 0;
                        break;
 -              } else if (ret) {
 -                      break;
                } else if (nonblock) {
                        ret = ERR(EWOULDBLOCK);
-+                      printf("%s nonblocking \n", __func__);
                } else if (!rs->cq_armed) {
 -                      ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
-+                      printf("%s req notify \n", __func__);
 +                      ds_req_notify_cqs(rs);
                        rs->cq_armed = 1;
                } else {
@@ -1518,9 +1483,7 @@ index a060f66..4631b1d 100644
                        fastlock_release(&rs->cq_lock);
  
 -                      ret = rs_get_cq_event(rs);
-+                      printf("%s wait for event \n", __func__);
 +                      ret = ds_get_cq_event(rs);
-+                      printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
                        fastlock_release(&rs->cq_wait_lock);
                        fastlock_acquire(&rs->cq_lock);
                }
@@ -1528,7 +1491,6 @@ index a060f66..4631b1d 100644
  
 -      rs_update_credits(rs);
        fastlock_release(&rs->cq_lock);
-+//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
        return ret;
  }
  
@@ -1542,11 +1504,10 @@ index a060f66..4631b1d 100644
        do {
 -              ret = rs_process_cq(rs, 1, test);
 +              ret = ds_process_cqs(rs, 1, test);
-+//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
                if (!ret || nonblock || errno != EWOULDBLOCK)
                        return ret;
  
-@@ -1184,7 +2075,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2035,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
                            (e.tv_usec - s.tv_usec) + 1;
        } while (poll_time <= polling_time);
  
@@ -1555,7 +1516,7 @@ index a060f66..4631b1d 100644
        return ret;
  }
  
-@@ -1219,9 +2110,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2070,19 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
@@ -1576,7 +1537,7 @@ index a060f66..4631b1d 100644
  }
  
  static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2137,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2097,7 @@ static int rs_have_rdata(struct rsocket *rs)
  
  static int rs_conn_have_rdata(struct rsocket *rs)
  {
@@ -1585,7 +1546,7 @@ index a060f66..4631b1d 100644
  }
  
  static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2146,74 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2106,67 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
               !(rs->state & rs_connected);
  }
  
@@ -1621,15 +1582,12 @@ index a060f66..4631b1d 100644
 +      struct ds_header *hdr;
 +      int ret;
 +
-+//    printf("%s \n", __func__);
 +      if (!(rs->state & rs_readable))
 +              return ERR(EINVAL);
 +
 +      if (!rs_have_rdata(rs)) {
-+//            printf("%s need rdata \n", __func__);
 +              ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
 +                                rs_have_rdata);
-+//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
 +              if (ret)
 +                      return ret;
 +      }
@@ -1641,10 +1599,7 @@ index a060f66..4631b1d 100644
 +
 +      memcpy(buf, (void *) hdr + hdr->length, len);
 +      if (addrlen)
-+{
 +              ds_set_src(src_addr, addrlen, hdr);
-+//PRINTADDR(src_addr);
-+}
 +
 +      if (!(flags & MSG_PEEK)) {
 +              ds_post_recv(rs, rmsg->qp, rmsg->offset);
@@ -1653,14 +1608,13 @@ index a060f66..4631b1d 100644
 +              rs->rqe_avail++;
 +      }
 +
-+//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
 +      return len;
 +}
 +
  static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
  {
        size_t left = len;
-@@ -1290,6 +2259,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2212,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -1674,7 +1628,7 @@ index a060f66..4631b1d 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1339,7 +2315,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2268,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
  
@@ -1683,7 +1637,7 @@ index a060f66..4631b1d 100644
  
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
-@@ -1348,8 +2324,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2277,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
  ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
                  struct sockaddr *src_addr, socklen_t *addrlen)
  {
@@ -1701,7 +1655,7 @@ index a060f66..4631b1d 100644
        ret = rrecv(socket, buf, len, flags);
        if (ret > 0 && src_addr)
                rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2376,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2329,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -1718,7 +1672,7 @@ index a060f66..4631b1d 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1447,10 +2432,102 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2385,92 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1735,7 +1689,6 @@ index a060f66..4631b1d 100644
 +      struct iovec miov[8];
 +      ssize_t ret;
 +
-+//    printf("%s\n", __func__);
 +      if (iovcnt > 8)
 +              return ERR(ENOTSUP);
 +
@@ -1762,7 +1715,6 @@ index a060f66..4631b1d 100644
 +      msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
 +      msg.msg_iov = miov;
 +      msg.msg_iovlen = iovcnt + 1;
-+//    printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
 +      ret = sendmsg(rs->udp_sock, &msg, flags);
 +      return ret > 0 ? ret - hdr.length : ret;
 +}
@@ -1771,14 +1723,11 @@ index a060f66..4631b1d 100644
 +                         int flags, uint8_t op)
 +{
 +      struct iovec iov;
-+//    printf("%s\n", __func__);
 +      if (buf && len) {
-+//            printf("%s have buffer\n", __func__);
 +              iov.iov_base = (void *) buf;
 +              iov.iov_len = len;
 +              return ds_sendv_udp(rs, &iov, 1, flags, op);
 +      } else {
-+//            printf("%s no buffer\n", __func__);
 +              return ds_sendv_udp(rs, NULL, 0, flags, op);
 +      }
 +}
@@ -1790,14 +1739,11 @@ index a060f66..4631b1d 100644
 +      uint64_t offset;
 +      int ret = 0;
 +
-+//    printf("%s\n", __func__);
 +      if (!rs->conn_dest->ah)
 +              return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
 +
 +      if (!ds_can_send(rs)) {
-+              printf("can't send\n");
 +              ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
-+              printf("ds_get_comp %d\n", ret);
 +              if (ret)
 +                      return ret;
 +      }
@@ -1813,16 +1759,14 @@ index a060f66..4631b1d 100644
 +      sge.lkey = rs->conn_dest->qp->smr->lkey;
 +      offset = (uint8_t *) msg - rs->sbuf;
 +
-+//    printf("%s - sending over QP\n", __func__);
 +      ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
-+//    printf("%s - ds_post_send %d %s\n", __func__, ret, strerror(errno));
 +      return ret ? ret : len;
 +}
 +
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1464,6 +2541,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2484,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1836,7 +1780,7 @@ index a060f66..4631b1d 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1485,7 +2569,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2512,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1845,7 +1789,7 @@ index a060f66..4631b1d 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1538,10 +2622,51 @@ out:
+@@ -1538,10 +2565,34 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
@@ -1855,8 +1799,6 @@ index a060f66..4631b1d 100644
 +      int ret;
  
 -      return rsend(socket, buf, len, flags);
-+//    PRINTADDR(dest_addr);
-+//    printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
 +      rs = idm_at(&idm, socket);
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
@@ -1872,35 +1814,20 @@ index a060f66..4631b1d 100644
 +      }
 +
 +      fastlock_acquire(&rs->slock);
-+//    printf("%s - checking conn dest %p\n", __func__, rs->conn_dest);
-+//    printf("dest addr: af %d", dest_addr->sa_family); PRINTADDR(dest_addr);
-+//    if (rs->conn_dest) {
-+//            int i;
-+//            printf("conn addr: af %d", rs->conn_dest->addr.sa.sa_family); PRINTADDR(&rs->conn_dest->addr);
-+//            for (i=0;i<16;i++)
-+//                    printf("%x", ((uint8_t *)dest_addr)[i]);
-+//            printf("\n");
-+//            for (i=0;i<16;i++)
-+//                    printf("%x", ((uint8_t *)&rs->conn_dest->addr)[i]);
-+//            printf("\n");
-+//    }
 +      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+//            printf("%s - getting conn dest\n", __func__);
 +              ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
-+//            printf("%s - get conn dest %d %s\n", __func__, ret, strerror(errno));
 +              if (ret)
 +                      goto out;
 +      }
 +
 +      ret = dsend(rs, buf, len, flags);
-+//    printf("%s - dsend %d %s\n", __func__, ret, strerror(errno));
 +out:
 +      fastlock_release(&rs->slock);
 +      return ret;
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2725,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2651,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1909,7 +1836,7 @@ index a060f66..4631b1d 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1653,7 +2778,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2704,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1918,7 +1845,7 @@ index a060f66..4631b1d 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2815,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2741,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
        int ret;
  
  check_cq:
@@ -1929,7 +1856,7 @@ index a060f66..4631b1d 100644
                rs_process_cq(rs, nonblock, test);
  
                revents = 0;
-@@ -1707,6 +2832,16 @@ check_cq:
+@@ -1707,6 +2758,16 @@ check_cq:
                }
  
                return revents;
@@ -1946,7 +1873,7 @@ index a060f66..4631b1d 100644
        }
  
        if (rs->state == rs_listening) {
-@@ -1766,11 +2901,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2827,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
                        if (fds[i].revents)
                                return 1;
  
@@ -1966,7 +1893,7 @@ index a060f66..4631b1d 100644
                        rfds[i].events = POLLIN;
                } else {
                        rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2931,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2857,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
  
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
@@ -1978,7 +1905,7 @@ index a060f66..4631b1d 100644
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3090,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3016,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1987,7 +1914,7 @@ index a060f66..4631b1d 100644
                return 0;
        }
  
-@@ -1959,10 +3100,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3026,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -2001,12 +1928,15 @@ index a060f66..4631b1d 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -1987,13 +3128,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3054,32 @@ int rshutdown(int socket, int how)
        return 0;
  }
  
 +static void ds_shutdown(struct rsocket *rs)
 +{
++      if (rs->svcs)
++              rs_modify_svcs(rs, 0);
++
 +      if (rs->fd_flags & O_NONBLOCK)
 +              rs_set_nonblocking(rs, 0);
 +
@@ -2033,7 +1963,7 @@ index a060f66..4631b1d 100644
  
        rs_free(rs);
        return 0;
-@@ -2018,8 +3175,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3104,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -2048,7 +1978,7 @@ index a060f66..4631b1d 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3188,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3117,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -2063,7 +1993,7 @@ index a060f66..4631b1d 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2040,22 +3205,31 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,22 +3134,31 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -2103,7 +2033,7 @@ index a060f66..4631b1d 100644
                                rs->rbuf_size = (*(uint32_t *) optval) << 1;
                        ret = 0;
                        break;
-@@ -2101,9 +3275,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3204,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
@@ -2118,7 +2048,7 @@ index a060f66..4631b1d 100644
                        opt_on = *(int *) optval;
                        break;
                default:
-@@ -2315,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3420,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -2127,7 +2057,7 @@ index a060f66..4631b1d 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3525,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3454,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -2136,7 +2066,7 @@ index a060f66..4631b1d 100644
        return offset;
  }
  
-@@ -2361,7 +3537,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3466,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -2145,7 +2075,7 @@ index a060f66..4631b1d 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2382,7 +3558,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3487,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -2154,7 +2084,7 @@ index a060f66..4631b1d 100644
        return ret;
  }
  
-@@ -2426,7 +3602,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3531,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -2163,7 +2093,7 @@ index a060f66..4631b1d 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -2476,3 +3652,278 @@ out:
+@@ -2476,3 +3581,272 @@ out:
  
        return (ret && left == count) ? ret : count - left;
  }
@@ -2233,17 +2163,14 @@ index a060f66..4631b1d 100644
 +      struct rs_svc_msg msg;
 +
 +      read(svc_sock[1], &msg, sizeof msg);
-+      switch (msg.op) {
-+      case RS_SVC_INSERT:
++      if (msg.svcs & RS_SVC_DGRAM) {
 +              msg.status = rs_svc_add_rs(msg.rs);
-+              break;
-+      case RS_SVC_REMOVE:
++      } else if (!msg.svcs) {
 +              msg.status = rs_svc_rm_rs(msg.rs);
-+              break;
-+      default:
-+              msg.status = ENOTSUP;
-+              break;
 +      }
++
++      if (!msg.status)
++              msg.rs->svcs = msg.svcs;
 +      write(svc_sock[1], &msg, sizeof msg);
 +}
 +
@@ -2343,7 +2270,6 @@ index a060f66..4631b1d 100644
 +      struct ibv_sge sge;
 +      uint64_t offset;
 +
-+//    PRINTADDR(src);
 +      if (!ds_can_send(rs)) {
 +              if (ds_get_comp(rs, 0, ds_can_send))
 +                      return;
@@ -2354,18 +2280,13 @@ index a060f66..4631b1d 100644
 +      rs->sqe_avail--;
 +
 +      ds_format_hdr(&hdr, src);
-+//    printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
-+//                    hdr.length, hdr.port);
 +      memcpy((void *) msg, &hdr, hdr.length);
 +      memcpy((void *) msg + hdr.length, buf, len);
-+//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
 +      sge.addr = (uintptr_t) msg;
 +      sge.length = hdr.length + len;
 +      sge.lkey = rs->conn_dest->qp->smr->lkey;
 +      offset = (uint8_t *) msg - rs->sbuf;
 +
-+//    printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
-+//                    ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
 +      ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
 +}
 +
@@ -2378,8 +2299,6 @@ index a060f66..4631b1d 100644
 +      int len, ret;
 +
 +      ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-+//    PRINTADDR(&addr);
-+//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
 +      if (ret < DS_UDP_IPV4_HDR_LEN)
 +              return;
 +
@@ -2394,22 +2313,27 @@ index a060f66..4631b1d 100644
 +      if (ret)
 +              return;
 +
++      if (udp_hdr->op == RS_OP_DATA) {
++              fastlock_acquire(&rs->slock);
++              cur_dest = rs->conn_dest;
++              rs->conn_dest = dest;
++              ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
++              rs->conn_dest = cur_dest;
++              fastlock_release(&rs->slock);
++      }
++
 +      if (!dest->ah || (dest->qpn != udp_hdr->qpn))
 +              rs_svc_create_ah(rs, dest, udp_hdr->qpn);
 +
 +      /* to do: handle when dest local ip address doesn't match udp ip */
-+      if (udp_hdr->op != RS_OP_DATA)
-+              return;
-+
-+      fastlock_acquire(&rs->slock);
-+      cur_dest = rs->conn_dest;
-+      rs->conn_dest = &dest->qp->dest;
-+      rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-+
-+      rs->conn_dest = dest;
-+      ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
-+      rs->conn_dest = cur_dest;
-+      fastlock_release(&rs->slock);
++      if (udp_hdr->op == RS_OP_DATA) {
++              fastlock_acquire(&rs->slock);
++              cur_dest = rs->conn_dest;
++              rs->conn_dest = &dest->qp->dest;
++              rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
++              rs->conn_dest = cur_dest;
++              fastlock_release(&rs->slock);
++      }
 +}
 +
 +static void *rs_svc_run(void *arg)
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 0f6cf00..0000000
+++ /dev/null
@@ -1,594 +0,0 @@
-Bottom: f8258dbc93f14acdb67621e51c1696fca92c841a
-Top:    89a1bd159135ab5aae205202e62cdc91b9ab3b0d
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-12-17 13:41:21 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 4631b1d..219aa4a 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -66,23 +66,14 @@
- static struct index_map idm;
- static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
--enum {
--      RS_SVC_INSERT,
--      RS_SVC_REMOVE
--};
--
- struct rsocket;
--
--#define PRINTADDR(a) \
--printf("%s port %x ip %x\n", __func__, \
--      ((struct sockaddr_in *)a)->sin_port, \
--      ((struct sockaddr_in *)a)->sin_addr.s_addr)
--
--
-+enum {
-+      RS_SVC_DGRAM = 1 << 0
-+};
- struct rs_svc_msg {
--      uint32_t op;
-+      uint32_t svcs;
-       uint32_t status;
-       struct rsocket *rs;
- };
-@@ -318,6 +309,7 @@ struct rsocket {
-               };
-       };
-+      int               svcs;
-       int               opts;
-       long              fd_flags;
-       uint64_t          so_opts;
-@@ -387,7 +379,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
-       }
- }
--static int rs_add_to_svc(struct rsocket *rs)
-+static int rs_modify_svcs(struct rsocket *rs, int svcs)
- {
-       struct rs_svc_msg msg;
-       int ret;
-@@ -396,58 +388,60 @@ static int rs_add_to_svc(struct rsocket *rs)
-       if (!svc_cnt) {
-               ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
-               if (ret)
--                      goto err1;
-+                      goto unlock;
-               ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
-               if (ret) {
-                       ret = ERR(ret);
--                      goto err2;
-+                      goto closepair;
-               }
-       }
--      msg.op = RS_SVC_INSERT;
-+      msg.svcs = svcs;
-       msg.status = EINVAL;
-       msg.rs = rs;
-       write(svc_sock[0], &msg, sizeof msg);
-       read(svc_sock[0], &msg, sizeof msg);
-       ret = rdma_seterrno(msg.status);
--      if (ret && !svc_cnt)
--              goto err3;
-+      if (svc_cnt)
-+              goto unlock;
-+//    if (ret && !svc_cnt)
-+//            goto join;
-+//
-+//    pthread_mutex_unlock(&mut);
-+//    return ret;
--      pthread_mutex_unlock(&mut);
--      return ret;
--
--err3:
-       pthread_join(svc_id, NULL);
--err2:
-+closepair:
-       close(svc_sock[0]);
-       close(svc_sock[1]);
--err1:
-+unlock:
-       pthread_mutex_unlock(&mut);
-       return ret;
- }
--static int rs_remove_from_svc(struct rsocket *rs)
--{
--      struct rs_svc_msg msg;
--      int ret;
--
--      pthread_mutex_lock(&mut);
--      msg.op = RS_SVC_REMOVE;
--      msg.status = EINVAL;
--      msg.rs = rs;
--      write(svc_sock[0], &msg, sizeof msg);
--      read(svc_sock[0], &msg, sizeof msg);
--      ret = ERR(msg.status);
--      if (!svc_cnt) {
--              pthread_join(svc_id, NULL);
--              close(svc_sock[0]);
--              close(svc_sock[1]);
--      }
--
--      pthread_mutex_unlock(&mut);
--      return ret;
--}
-+//static void rs_remove_from_svc(struct rsocket *rs)
-+//{
-+//    struct rs_svc_msg msg;
-+//    int ret;
-+//
-+//    pthread_mutex_lock(&mut);
-+//    if (svc_cnt) {
-+//            msg.op = RS_SVC_REMOVE;
-+//            msg.status = EINVAL;
-+//            msg.rs = rs;
-+//            write(svc_sock[0], &msg, sizeof msg);
-+//            read(svc_sock[0], &msg, sizeof msg);
-+//    }
-+//
-+//    if (!svc_cnt) {
-+//            pthread_join(svc_id, NULL);
-+//            close(svc_sock[0]);
-+//            close(svc_sock[1]);
-+//    }
-+//
-+//    pthread_mutex_unlock(&mut);
-+//}
- static int ds_compare_addr(const void *dst1, const void *dst2)
- {
-@@ -459,7 +453,6 @@ static int ds_compare_addr(const void *dst1, const void *dst2)
-       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
-             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
--//    printf("%s len %d sizeof sin %d memcmp %d\n", __func__, len, sizeof(struct sockaddr_in), memcmp(dst1,dst2,len));
-       return memcmp(dst1, dst2, len);
- }
-@@ -893,9 +886,6 @@ static void ds_free(struct rsocket *rs)
- {
-       struct ds_qp *qp;
--      if (rs->state & (rs_readable | rs_writable))
--              rs_remove_from_svc(rs);
--
-       if (rs->udp_sock >= 0)
-               close(rs->udp_sock);
-@@ -1056,7 +1046,7 @@ static int ds_init_ep(struct rsocket *rs)
-       }
-       msg->next = NULL;
--      ret = rs_add_to_svc(rs);
-+      ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
-       if (ret)
-               return ret;
-@@ -1322,10 +1312,8 @@ static int ds_get_src_addr(struct rsocket *rs,
-       int sock, ret;
-       uint16_t port;
--//    printf("dest: "); PRINTADDR(dest_addr);
--      *src_len = sizeof src_addr;
-+      *src_len = sizeof *src_addr;
-       ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
--//    printf("src: "); PRINTADDR(src_addr);
-       if (ret || !rs_any_addr(src_addr))
-               return ret;
-@@ -1338,10 +1326,9 @@ static int ds_get_src_addr(struct rsocket *rs,
-       if (ret)
-               goto out;
--      *src_len = sizeof src_addr;
-+      *src_len = sizeof *src_addr;
-       ret = getsockname(sock, &src_addr->sa, src_len);
-       src_addr->sin.sin_port = port;
--//    printf("selected src: ");
- out:
-       close(sock);
-       return ret;
-@@ -1350,7 +1337,6 @@ out:
- static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
- {
-       if (addr->sa.sa_family == AF_INET) {
--//            PRINTADDR(addr);
-               hdr->version = 4;
-               hdr->length = DS_IPV4_HDR_LEN;
-               hdr->port = addr->sin.sin_port;
-@@ -1371,7 +1357,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
-       struct ibv_ah_attr attr;
-       int ret;
--//    printf("%s\n", __func__);
-       memcpy(&qp->dest.addr, addr, addrlen);
-       qp->dest.qp = qp;
-       qp->dest.qpn = qp->cm_id->qp->qp_num;
-@@ -1384,8 +1369,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
-       attr.dlid = port_attr.lid;
-       attr.port_num = qp->cm_id->port_num;
-       qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
--//    printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
--//            attr.port_num, qp->dest.qpn);
-       if (!qp->dest.ah)
-               return ERR(ENOMEM);
-@@ -1401,7 +1384,6 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-       struct epoll_event event;
-       int i, ret;
--//    PRINTADDR(src_addr);
-       qp = calloc(1, sizeof(*qp));
-       if (!qp)
-               return ERR(ENOMEM);
-@@ -1490,14 +1472,12 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
-       struct ds_dest **tdest, *new_dest;
-       int ret = 0;
--//    PRINTADDR(addr);
-       fastlock_acquire(&rs->map_lock);
-       tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-       if (tdest)
-               goto found;
-       ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
--//    printf("get src: "); PRINTADDR(&src_addr);
-       if (ret)
-               goto out;
-@@ -1542,7 +1522,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               }
-               fastlock_acquire(&rs->slock);
--//            PRINTADDR(addr);
-               ret = connect(rs->udp_sock, addr, addrlen);
-               if (!ret)
-                       ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-@@ -1604,8 +1583,6 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
-       wr.wr.ud.ah = rs->conn_dest->ah;
-       wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
-       wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
--//    printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
--//            rs->conn_dest->qpn);
-       return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
- }
-@@ -1935,7 +1912,6 @@ static void ds_poll_cqs(struct rsocket *rs)
-                       if (ds_wr_is_recv(wc.wr_id)) {
-                               if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
-                                   ds_valid_recv(qp, &wc)) {
--//                                    printf("%s recv over QP\n", __func__);
-                                       rs->rqe_avail--;
-                                       rmsg = &rs->dmsg[rs->rmsg_tail];
-                                       rmsg->qp = qp;
-@@ -1944,13 +1920,11 @@ static void ds_poll_cqs(struct rsocket *rs)
-                                       if (++rs->rmsg_tail == rs->rq_size + 1)
-                                               rs->rmsg_tail = 0;
-                               } else {
--//                                    printf("%s invalid recv\n", __func__);
-                                       ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
-                               }
-                       } else {
-                               smsg = (struct ds_smsg *)
-                                      (rs->sbuf + ds_wr_offset(wc.wr_id));
--//                            printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
-                               smsg->next = rs->smsg_free;
-                               rs->smsg_free = smsg;
-                               rs->sqe_avail++;
-@@ -1993,9 +1967,7 @@ static int ds_get_cq_event(struct rsocket *rs)
-       if (!rs->cq_armed)
-               return 0;
--//    printf("wait for epoll event\n");
-       ret = epoll_wait(rs->epfd, &event, 1, -1);
--//    printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
-       if (ret <= 0)
-               return ret;
-@@ -2012,46 +1984,35 @@ static int ds_get_cq_event(struct rsocket *rs)
- static int rs_have_rdata(struct rsocket *rs);
- static int ds_can_send(struct rsocket *rs);
-+static int rs_poll_all(struct rsocket *rs);
-+static int ds_all_sends_done(struct rsocket *rs);
- static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
-       int ret = 0;
--      if (test == rs_have_rdata)
--              printf("%s test rs_have_rdata\n", __func__);
--      else if (test == ds_can_send)
--              printf("%s test ds_can_send\n", __func__);
--      else
--              printf("%s test ?\n", __func__);
--
-       fastlock_acquire(&rs->cq_lock);
-       do {
-               ds_poll_cqs(rs);
-               if (test(rs)) {
--                      printf("%s test succeeded\n", __func__);
-                       ret = 0;
-                       break;
-               } else if (nonblock) {
-                       ret = ERR(EWOULDBLOCK);
--                      printf("%s nonblocking \n", __func__);
-               } else if (!rs->cq_armed) {
--                      printf("%s req notify \n", __func__);
-                       ds_req_notify_cqs(rs);
-                       rs->cq_armed = 1;
-               } else {
-                       fastlock_acquire(&rs->cq_wait_lock);
-                       fastlock_release(&rs->cq_lock);
--                      printf("%s wait for event \n", __func__);
-                       ret = ds_get_cq_event(rs);
--                      printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
-                       fastlock_release(&rs->cq_wait_lock);
-                       fastlock_acquire(&rs->cq_lock);
-               }
-       } while (!ret);
-       fastlock_release(&rs->cq_lock);
--//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-       return ret;
- }
-@@ -2063,7 +2024,6 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-       do {
-               ret = ds_process_cqs(rs, 1, test);
--//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-               if (!ret || nonblock || errno != EWOULDBLOCK)
-                       return ret;
-@@ -2178,15 +2138,12 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-       struct ds_header *hdr;
-       int ret;
--//    printf("%s \n", __func__);
-       if (!(rs->state & rs_readable))
-               return ERR(EINVAL);
-       if (!rs_have_rdata(rs)) {
--//            printf("%s need rdata \n", __func__);
-               ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
-                                 rs_have_rdata);
--//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-               if (ret)
-                       return ret;
-       }
-@@ -2198,10 +2155,7 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-       memcpy(buf, (void *) hdr + hdr->length, len);
-       if (addrlen)
--{
-               ds_set_src(src_addr, addrlen, hdr);
--//PRINTADDR(src_addr);
--}
-       if (!(flags & MSG_PEEK)) {
-               ds_post_recv(rs, rmsg->qp, rmsg->offset);
-@@ -2210,7 +2164,6 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-               rs->rqe_avail++;
-       }
--//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-       return len;
- }
-@@ -2444,7 +2397,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       struct iovec miov[8];
-       ssize_t ret;
--//    printf("%s\n", __func__);
-       if (iovcnt > 8)
-               return ERR(ENOTSUP);
-@@ -2471,7 +2423,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
-       msg.msg_iov = miov;
-       msg.msg_iovlen = iovcnt + 1;
--//    printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
-       ret = sendmsg(rs->udp_sock, &msg, flags);
-       return ret > 0 ? ret - hdr.length : ret;
- }
-@@ -2480,14 +2431,11 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
-                          int flags, uint8_t op)
- {
-       struct iovec iov;
--//    printf("%s\n", __func__);
-       if (buf && len) {
--//            printf("%s have buffer\n", __func__);
-               iov.iov_base = (void *) buf;
-               iov.iov_len = len;
-               return ds_sendv_udp(rs, &iov, 1, flags, op);
-       } else {
--//            printf("%s no buffer\n", __func__);
-               return ds_sendv_udp(rs, NULL, 0, flags, op);
-       }
- }
-@@ -2499,14 +2447,11 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-       uint64_t offset;
-       int ret = 0;
--//    printf("%s\n", __func__);
-       if (!rs->conn_dest->ah)
-               return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-       if (!ds_can_send(rs)) {
--              printf("can't send\n");
-               ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
--              printf("ds_get_comp %d\n", ret);
-               if (ret)
-                       return ret;
-       }
-@@ -2522,9 +2467,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-       sge.lkey = rs->conn_dest->qp->smr->lkey;
-       offset = (uint8_t *) msg - rs->sbuf;
--//    printf("%s - sending over QP\n", __func__);
-       ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
--//    printf("%s - ds_post_send %d %s\n", __func__, ret, strerror(errno));
-       return ret ? ret : len;
- }
-@@ -2625,8 +2568,6 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-       struct rsocket *rs;
-       int ret;
--//    PRINTADDR(dest_addr);
--//    printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
-       rs = idm_at(&idm, socket);
-       if (rs->type == SOCK_STREAM) {
-               if (dest_addr || addrlen)
-@@ -2642,28 +2583,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-       }
-       fastlock_acquire(&rs->slock);
--//    printf("%s - checking conn dest %p\n", __func__, rs->conn_dest);
--//    printf("dest addr: af %d", dest_addr->sa_family); PRINTADDR(dest_addr);
--//    if (rs->conn_dest) {
--//            int i;
--//            printf("conn addr: af %d", rs->conn_dest->addr.sa.sa_family); PRINTADDR(&rs->conn_dest->addr);
--//            for (i=0;i<16;i++)
--//                    printf("%x", ((uint8_t *)dest_addr)[i]);
--//            printf("\n");
--//            for (i=0;i<16;i++)
--//                    printf("%x", ((uint8_t *)&rs->conn_dest->addr)[i]);
--//            printf("\n");
--//    }
-       if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
--//            printf("%s - getting conn dest\n", __func__);
-               ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
--//            printf("%s - get conn dest %d %s\n", __func__, ret, strerror(errno));
-               if (ret)
-                       goto out;
-       }
-       ret = dsend(rs, buf, len, flags);
--//    printf("%s - dsend %d %s\n", __func__, ret, strerror(errno));
- out:
-       fastlock_release(&rs->slock);
-       return ret;
-@@ -3130,6 +3056,9 @@ int rshutdown(int socket, int how)
- static void ds_shutdown(struct rsocket *rs)
- {
-+      if (rs->svcs)
-+              rs_modify_svcs(rs, 0);
-+
-       if (rs->fd_flags & O_NONBLOCK)
-               rs_set_nonblocking(rs, 0);
-@@ -3718,17 +3647,14 @@ static void rs_svc_process_sock(void)
-       struct rs_svc_msg msg;
-       read(svc_sock[1], &msg, sizeof msg);
--      switch (msg.op) {
--      case RS_SVC_INSERT:
-+      if (msg.svcs & RS_SVC_DGRAM) {
-               msg.status = rs_svc_add_rs(msg.rs);
--              break;
--      case RS_SVC_REMOVE:
-+      } else if (!msg.svcs) {
-               msg.status = rs_svc_rm_rs(msg.rs);
--              break;
--      default:
--              msg.status = ENOTSUP;
--              break;
-       }
-+
-+      if (!msg.status)
-+              msg.rs->svcs = msg.svcs;
-       write(svc_sock[1], &msg, sizeof msg);
- }
-@@ -3828,7 +3754,6 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
-       struct ibv_sge sge;
-       uint64_t offset;
--//    PRINTADDR(src);
-       if (!ds_can_send(rs)) {
-               if (ds_get_comp(rs, 0, ds_can_send))
-                       return;
-@@ -3839,18 +3764,13 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
-       rs->sqe_avail--;
-       ds_format_hdr(&hdr, src);
--//    printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
--//                    hdr.length, hdr.port);
-       memcpy((void *) msg, &hdr, hdr.length);
-       memcpy((void *) msg + hdr.length, buf, len);
--//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
-       sge.addr = (uintptr_t) msg;
-       sge.length = hdr.length + len;
-       sge.lkey = rs->conn_dest->qp->smr->lkey;
-       offset = (uint8_t *) msg - rs->sbuf;
--//    printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
--//                    ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
-       ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
- }
-@@ -3863,8 +3783,6 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       int len, ret;
-       ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
--//    PRINTADDR(&addr);
--//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
-       if (ret < DS_UDP_IPV4_HDR_LEN)
-               return;
-@@ -3879,22 +3797,27 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       if (ret)
-               return;
-+      if (udp_hdr->op == RS_OP_DATA) {
-+              fastlock_acquire(&rs->slock);
-+              cur_dest = rs->conn_dest;
-+              rs->conn_dest = dest;
-+              ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
-+              rs->conn_dest = cur_dest;
-+              fastlock_release(&rs->slock);
-+      }
-+
-       if (!dest->ah || (dest->qpn != udp_hdr->qpn))
-               rs_svc_create_ah(rs, dest, udp_hdr->qpn);
-       /* to do: handle when dest local ip address doesn't match udp ip */
--      if (udp_hdr->op != RS_OP_DATA)
--              return;
--
--      fastlock_acquire(&rs->slock);
--      cur_dest = rs->conn_dest;
--      rs->conn_dest = &dest->qp->dest;
--      rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
--
--      rs->conn_dest = dest;
--      ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
--      rs->conn_dest = cur_dest;
--      fastlock_release(&rs->slock);
-+      if (udp_hdr->op == RS_OP_DATA) {
-+              fastlock_acquire(&rs->slock);
-+              cur_dest = rs->conn_dest;
-+              rs->conn_dest = &dest->qp->dest;
-+              rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-+              rs->conn_dest = cur_dest;
-+              fastlock_release(&rs->slock);
-+      }
- }
- static void *rs_svc_run(void *arg)