]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Thu, 13 Dec 2012 00:49:45 +0000 (16:49 -0800)
committerSean Hefty <sean.hefty@intel.com>
Thu, 13 Dec 2012 00:49:45 +0000 (16:49 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index 9645b984a169fe10a41cb10e76850c132abfc4c4..8092728491024a1fbc339a4f7970c2d25fa819e7 100644 (file)
--- a/meta
+++ b/meta
@@ -1,9 +1,8 @@
 Version: 1
-Previous: f9129f248e598f6d687a22e32bafc7e8164d6a4b
-Head: 39d8533b49387b416ce7c88b8f665fe1e3b784cc
+Previous: 4942c4abe0f1953b07ee7dffaf9d79be8983f692
+Head: b2ae615cc6c0119babd24414c657cb1962392e02
 Applied:
-  dsocket: 13732c8a437be83b664fe683516ec300a145f76a
-  refresh-temp: 39d8533b49387b416ce7c88b8f665fe1e3b784cc
+  dsocket: b2ae615cc6c0119babd24414c657cb1962392e02
 Unapplied:
   udpong: a42957509acbde99a7d8469e0819b7d75af51289
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
index 7ce47b8044497da0863de1929dca4908f338a49c..25e35aa6d64c92bfbd5442434c6eed9842776f42 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top:    f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
+Top:    136936c0a82503ee0da9daccd8b948cd09e58b64
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -62,7 +62,7 @@ index 1484f65..a660208 100644
 +rsocket QP.
 \ No newline at end of file
 diff --git a/src/cma.c b/src/cma.c
-index 388be61..0f58966 100755
+index 388be61..ff9b426 100755
 --- a/src/cma.c
 +++ b/src/cma.c
 @@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id)
@@ -74,7 +74,7 @@ index 388be61..0f58966 100755
  {
        if (!addr)
                return 0;
-@@ -2232,9 +2232,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
+@@ -2232,9 +2232,19 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
  int ucma_max_qpsize(struct rdma_cm_id *id)
  {
        struct cma_id_private *id_priv;
@@ -85,6 +85,7 @@ index 388be61..0f58966 100755
 +      if (id && id_priv->cma_dev) {
 +              max_size = id_priv->cma_dev->max_qpsize;
 +      } else {
++              ucma_init();
 +              for (i = 0; i < cma_dev_cnt; i++) {
 +                      if (!max_size || max_size > cma_dev_array[i].max_qpsize)
 +                              max_size = cma_dev_array[i].max_qpsize;
@@ -112,7 +113,7 @@ index 0a0370e..7135a61 100644
  {
        errno = err;
 diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..c61d689 100644
+index a060f66..6fa4c68 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
 @@ -47,6 +47,8 @@
@@ -218,7 +219,7 @@ index a060f66..c61d689 100644
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
-@@ -170,68 +212,248 @@ enum rs_state {
+@@ -170,68 +212,249 @@ enum rs_state {
  
  #define RS_OPT_SWAP_SGL 1
  
@@ -441,6 +442,7 @@ index a060f66..c61d689 100644
 +
 +      msg.op = RS_SVC_INSERT;
 +      msg.status = EINVAL;
++      printf("%s rs %p\n", __func__, rs);
 +      msg.rs = rs;
 +      write(svc_sock[0], &msg, sizeof msg);
 +      read(svc_sock[0], &msg, sizeof msg);
@@ -499,7 +501,7 @@ index a060f66..c61d689 100644
  static int rs_value_to_scale(int value, int bits)
  {
        return value <= (1 << (bits - 1)) ?
-@@ -307,10 +529,10 @@ out:
+@@ -307,10 +530,10 @@ out:
        pthread_mutex_unlock(&mut);
  }
  
@@ -512,7 +514,7 @@ index a060f66..c61d689 100644
        pthread_mutex_unlock(&mut);
        return rs->index;
  }
-@@ -322,7 +544,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +545,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
  }
  
@@ -521,7 +523,7 @@ index a060f66..c61d689 100644
  {
        struct rsocket *rs;
  
-@@ -330,29 +552,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,29 +553,39 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
  
@@ -566,7 +568,7 @@ index a060f66..c61d689 100644
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
-@@ -360,13 +592,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +593,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
  
  static int rs_set_nonblocking(struct rsocket *rs, long arg)
  {
@@ -584,7 +586,9 @@ index a060f66..c61d689 100644
 +              if (!ret && rs->state < rs_connected)
 +                      ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
 +      } else {
++              printf("%s set nonblock\n", __func__);
 +              ret = fcntl(rs->epfd, F_SETFL, arg);
++              printf("%s fcntl %d\n", __func__, ret);
 +
 +              if (!ret && rs->qp_list) {
 +                      qp = rs->qp_list;
@@ -598,7 +602,7 @@ index a060f66..c61d689 100644
  
        return ret;
  }
-@@ -390,17 +636,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +639,43 @@ static void rs_set_qp_size(struct rsocket *rs)
                rs->rq_size = 2;
  }
  
@@ -606,6 +610,8 @@ index a060f66..c61d689 100644
 +{
 +      uint16_t max_size;
 +
++      printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
++                      rs->rq_size, rs->rbuf_size);
 +      max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
 +
 +      if (rs->sq_size > max_size)
@@ -622,6 +628,8 @@ index a060f66..c61d689 100644
 +              rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
 +      else
 +              rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
++      printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
++                      rs->rq_size, rs->rbuf_size);
 +}
 +
  static int rs_init_bufs(struct rsocket *rs)
@@ -640,7 +648,7 @@ index a060f66..c61d689 100644
  
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
-@@ -410,7 +678,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -410,7 +685,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
@@ -649,7 +657,7 @@ index a060f66..c61d689 100644
  
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
-@@ -423,7 +691,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -423,7 +698,7 @@ static int rs_init_bufs(struct rsocket *rs)
  
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
@@ -658,26 +666,21 @@ index a060f66..c61d689 100644
  
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
-@@ -440,15 +708,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,37 +715,56 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
  }
  
 -static int rs_create_cq(struct rsocket *rs)
 +static int ds_init_bufs(struct ds_qp *qp)
- {
--      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
--      if (!rs->cm_id->recv_cq_channel)
++{
 +      qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
 +      if (!qp->rbuf)
 +              return ERR(ENOMEM);
 +
 +      qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
 +      if (!qp->smr)
-               return -1;
--      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
--                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
--      if (!rs->cm_id->recv_cq)
++              return -1;
++
 +      qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
 +      if (!qp->rmr)
 +              return -1;
@@ -686,18 +689,26 @@ index a060f66..c61d689 100644
 +}
 +
 +static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
-+{
+ {
+-      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+-      if (!rs->cm_id->recv_cq_channel)
 +      cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
++      printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
 +      if (!cm_id->recv_cq_channel)
-+              return -1;
-+
+               return -1;
+-      rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+-                                         rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+-      if (!rs->cm_id->recv_cq)
 +      cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
 +                                     cm_id, cm_id->recv_cq_channel, 0);
++      printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
 +      if (!cm_id->recv_cq)
                goto err1;
  
        if (rs->fd_flags & O_NONBLOCK) {
-@@ -456,21 +741,20 @@ static int rs_create_cq(struct rsocket *rs)
++              printf("%s set nonblock\n", __func__);
+               if (rs_set_nonblocking(rs, O_NONBLOCK))
                        goto err2;
        }
  
@@ -726,7 +737,7 @@ index a060f66..c61d689 100644
  {
        struct ibv_recv_wr wr, *bad;
  
-@@ -482,6 +766,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +776,23 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -750,7 +761,7 @@ index a060f66..c61d689 100644
  static int rs_create_ep(struct rsocket *rs)
  {
        struct ibv_qp_init_attr qp_attr;
-@@ -492,7 +793,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +803,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
  
@@ -759,7 +770,7 @@ index a060f66..c61d689 100644
        if (ret)
                return ret;
  
-@@ -549,8 +850,74 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +860,73 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
  }
  
@@ -789,6 +800,8 @@ index a060f66..c61d689 100644
 +
 +static void ds_free(struct rsocket *rs)
 +{
++      struct ds_qp *qp;
++
 +      if (rs->state & (rs_readable | rs_writable))
 +              rs_remove_from_svc(rs);
 +
@@ -801,12 +814,9 @@ index a060f66..c61d689 100644
 +      if (rs->dmsg)
 +              free(rs->dmsg);
 +
-+      if (rs->smsg_free)
-+              free(rs->smsg_free);
-+
-+      while (rs->qp_list) {
-+              ds_remove_qp(rs, rs->qp_list);
-+              ds_free_qp(rs->qp_list);
++      while ((qp = rs->qp_list)) {
++              ds_remove_qp(rs, qp);
++              ds_free_qp(qp);
 +      }
 +
 +      if (rs->epfd >= 0)
@@ -834,7 +844,7 @@ index a060f66..c61d689 100644
        if (rs->index >= 0)
                rs_remove(rs);
  
-@@ -582,7 +949,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +958,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
  
@@ -843,7 +853,7 @@ index a060f66..c61d689 100644
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
-@@ -636,29 +1003,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1012,89 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
  }
  
@@ -859,6 +869,40 @@ index a060f66..c61d689 100644
 +
 +      return 0;
 +}
++
++static int ds_init_ep(struct rsocket *rs)
++{
++      struct ds_smsg *msg;
++      int i, ret;
++
++      ds_set_qp_size(rs);
++
++      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
++      if (!rs->sbuf)
++              return ERR(ENOMEM);
++
++      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
++      if (!rs->dmsg)
++              return ERR(ENOMEM);
++
++      rs->sqe_avail = rs->sq_size;
++      rs->rqe_avail = rs->rq_size;
++
++      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
++      msg = rs->smsg_free;
++      for (i = 0; i < rs->sq_size - 1; i++) {
++              msg->next = (void *) msg + RS_SNDLOWAT;
++              msg = msg->next;
++      }
++      msg->next = NULL;
++
++      ret = rs_add_to_svc(rs);
++      if (ret)
++              return ret;
++
++      rs->state = rs_readable | rs_writable;
++      return 0;
++}
 +
  int rsocket(int domain, int type, int protocol)
  {
@@ -886,15 +930,16 @@ index a060f66..c61d689 100644
 +              ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
 +              if (ret)
 +                      goto err;
--      ret = rs_insert(rs);
++
 +              rs->cm_id->route.addr.src_addr.sa_family = domain;
 +              index = rs->cm_id->channel->fd;
 +      } else {
++              printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
 +              ret = ds_init(rs, domain);
 +              if (ret)
 +                      goto err;
-+
+-      ret = rs_insert(rs);
 +              index = rs->udp_sock;
 +      }
 +
@@ -906,7 +951,7 @@ index a060f66..c61d689 100644
        return rs->index;
  
  err:
-@@ -672,9 +1064,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1108,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -918,17 +963,17 @@ index a060f66..c61d689 100644
 +              if (!ret)
 +                      rs->state = rs_bound;
 +      } else {
-+              ret = bind(rs->udp_sock, addr, addrlen);
-+              if (!ret) {
-+                      ret = rs_add_to_svc(rs);
-+                      if (!ret)
-+                              rs->state = rs_readable | rs_writable;
++              if (rs->state == rs_init) {
++                      ret = ds_init_ep(rs);
++                      if (ret)
++                              return ret;
 +              }
++              ret = bind(rs->udp_sock, addr, addrlen);
 +      }
        return ret;
  }
  
-@@ -710,7 +1111,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1155,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -937,7 +982,7 @@ index a060f66..c61d689 100644
        if (!new_rs)
                return ERR(ENOMEM);
  
-@@ -718,7 +1119,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1163,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
  
@@ -946,45 +991,10 @@ index a060f66..c61d689 100644
        if (ret < 0)
                goto err;
  
-@@ -855,13 +1256,268 @@ connected:
+@@ -855,13 +1300,256 @@ connected:
        return ret;
  }
  
-+static int ds_init_ep(struct rsocket *rs)
-+{
-+      struct ds_smsg *msg;
-+      int i, ret;
-+
-+      ds_set_qp_size(rs);
-+
-+      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-+      if (!rs->sbuf)
-+              return ERR(ENOMEM);
-+
-+      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+      if (!rs->dmsg)
-+              return ERR(ENOMEM);
-+
-+      rs->sbuf_bytes_avail = rs->sbuf_size;
-+      rs->sqe_avail = rs->sq_size;
-+      rs->rqe_avail = rs->rq_size;
-+
-+      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+      msg = rs->smsg_free;
-+      for (i = 0; i < rs->sq_size - 1; i++) {
-+              msg->next = (void *) msg + i * RS_SNDLOWAT;
-+              msg = msg->next;
-+      }
-+      msg->next = NULL;
-+
-+      ret = rs_add_to_svc(rs);
-+      if (ret)
-+              return ret;
-+
-+      rs->state = rs_readable | rs_writable;
-+      return 0;
-+}
-+
 +static int rs_any_addr(const union socket_addr *addr)
 +{
 +      if (addr->sa.sa_family == AF_INET) {
@@ -1068,38 +1078,44 @@ index a060f66..c61d689 100644
 +}
 +
 +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-+                      socklen_t addrlen, struct ds_qp **qp)
++                      socklen_t addrlen, struct ds_qp **new_qp)
 +{
++      struct ds_qp *qp;
 +      struct ibv_qp_init_attr qp_attr;
 +      struct epoll_event event;
 +      int i, ret;
 +
-+      *qp = calloc(1, sizeof(struct ds_qp));
-+      if (!*qp)
++printf("%s\n", __func__);
++      qp = calloc(1, sizeof(*qp));
++      if (!qp)
 +              return ERR(ENOMEM);
 +
-+      (*qp)->rs = rs;
-+      ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
++      qp->rs = rs;
++      ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
++      printf("%s rdma_create_id %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
-+      ds_format_hdr(&(*qp)->hdr, src_addr);
-+      ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
++      ds_format_hdr(&qp->hdr, src_addr);
++      ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
++      printf("%s rdma_bind_addr %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
-+      ret = ds_init_bufs(*qp);
++      ret = ds_init_bufs(qp);
++      printf("%s ds_init_bufs %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
-+      ret = rs_create_cq(rs, (*qp)->cm_id);
++      ret = rs_create_cq(rs, qp->cm_id);
++      printf("%s rs_create_cq %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
 +      memset(&qp_attr, 0, sizeof qp_attr);
 +      qp_attr.qp_context = qp;
-+      qp_attr.send_cq = rs->cm_id->send_cq;
-+      qp_attr.recv_cq = rs->cm_id->recv_cq;
++      qp_attr.send_cq = qp->cm_id->send_cq;
++      qp_attr.recv_cq = qp->cm_id->recv_cq;
 +      qp_attr.qp_type = IBV_QPT_UD;
 +      qp_attr.sq_sig_all = 1;
 +      qp_attr.cap.max_send_wr = rs->sq_size;
@@ -1107,31 +1123,35 @@ index a060f66..c61d689 100644
 +      qp_attr.cap.max_send_sge = 2;
 +      qp_attr.cap.max_recv_sge = 1;
 +      qp_attr.cap.max_inline_data = rs->sq_inline;
-+      ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
++      ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
++      printf("%s rdma_create_qp %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
-+      ret = ds_add_qp_dest(*qp, src_addr, addrlen);
++      ret = ds_add_qp_dest(qp, src_addr, addrlen);
++      printf("%s ds_add_qp_dest %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
 +      event.events = EPOLLIN;
-+      event.data.ptr = *qp;
++      event.data.ptr = qp;
 +      ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
-+                      (*qp)->cm_id->recv_cq_channel->fd, &event);
++                      qp->cm_id->recv_cq_channel->fd, &event);
++      printf("%s epoll_ctl %d\n", __func__, ret);
 +      if (ret)
 +              goto err;
 +
 +      for (i = 0; i < rs->rq_size; i++) {
-+              ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
++              ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
 +              if (ret)
 +                      goto err;
 +      }
 +
-+      ds_insert_qp(rs, *qp);
++      ds_insert_qp(rs, qp);
++      *new_qp = qp;
 +      return 0;
 +err:
-+      ds_free_qp(*qp);
++      ds_free_qp(qp);
 +      return ret;
 +}
 +
@@ -1158,38 +1178,42 @@ index a060f66..c61d689 100644
 +      union socket_addr src_addr;
 +      socklen_t src_len;
 +      struct ds_qp *qp;
++      struct ds_dest **tdest, *new_dest;
 +      int ret = 0;
 +
++      printf("%s \n", __func__);
 +      fastlock_acquire(&rs->map_lock);
-+      dest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+      if (dest)
-+              goto out;
-+
-+      if (rs->state == rs_init) {
-+              ret = ds_init_ep(rs);
-+              if (ret)
-+                      goto out;
-+      }
++      tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
++      printf("%s tfind %p\n", __func__, dest);
++      if (tdest)
++              goto found;
 +
 +      ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
++      printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
 +      if (ret)
 +              goto out;
 +
 +      ret = ds_get_qp(rs, &src_addr, src_len, &qp);
++      printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
 +      if (ret)
 +              goto out;
 +
-+      if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
-+              *dest = calloc(1, sizeof(struct ds_dest));
-+              if (!*dest) {
++      tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
++      if (!tdest) {
++              printf("%s adding dest into map\n", __func__);
++              new_dest = calloc(1, sizeof(*new_dest));
++              if (!new_dest) {
 +                      ret = ERR(ENOMEM);
 +                      goto out;
 +              }
 +
-+              memcpy(&(*dest)->addr, addr, addrlen);
-+              (*dest)->qp = qp;
-+              tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
++              memcpy(&new_dest->addr, addr, addrlen);
++              new_dest->qp = qp;
++              tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
 +      }
++
++found:
++      *dest = *tdest;
 +out:
 +      fastlock_release(&rs->map_lock);
 +      return ret;
@@ -1207,17 +1231,26 @@ index a060f66..c61d689 100644
 +              memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
 +              ret = rs_do_connect(rs);
 +      } else {
++              printf("%s\n", __func__);
++              if (rs->state == rs_init) {
++                      ret = ds_init_ep(rs);
++                      if (ret)
++                              return ret;
++              }
++
 +              fastlock_acquire(&rs->slock);
 +              ret = connect(rs->udp_sock, addr, addrlen);
++              printf("%s connect %d %s\n", __func__, ret, strerror(errno));
 +              if (!ret)
 +                      ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++              printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
 +              fastlock_release(&rs->slock);
 +      }
 +      return ret;
  }
  
  static int rs_post_write_msg(struct rsocket *rs,
-@@ -903,6 +1559,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1591,24 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
  }
  
@@ -1242,7 +1275,7 @@ index a060f66..c61d689 100644
  /*
   * Update target SGE before sending data.  Otherwise the remote side may
   * update the entry before we do.
-@@ -1046,7 +1720,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1046,7 +1752,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
@@ -1251,12 +1284,15 @@ index a060f66..c61d689 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1137,42 +1811,213 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
-       fastlock_acquire(&rs->cq_lock);
-       do {
--              rs_update_credits(rs);
--              ret = rs_poll_cq(rs);
+@@ -1133,46 +1839,217 @@ static int rs_get_cq_event(struct rsocket *rs)
+  */
+ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+-      int ret;
++      int ret;
++
++      fastlock_acquire(&rs->cq_lock);
++      do {
 +              rs_update_credits(rs);
 +              ret = rs_poll_cq(rs);
 +              if (test(rs)) {
@@ -1424,22 +1460,24 @@ index a060f66..c61d689 100644
 +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
 +{
 +      int ret = 0;
-+
-+      fastlock_acquire(&rs->cq_lock);
-+      do {
+       fastlock_acquire(&rs->cq_lock);
+       do {
+-              rs_update_credits(rs);
+-              ret = rs_poll_cq(rs);
 +              ds_poll_cqs(rs);
                if (test(rs)) {
-+                      printf("%s test succeeded\n", __func__);
++//                    printf("%s test succeeded\n", __func__);
                        ret = 0;
                        break;
 -              } else if (ret) {
 -                      break;
                } else if (nonblock) {
                        ret = ERR(EWOULDBLOCK);
-+                      printf("%s nonblocking \n", __func__);
++//                    printf("%s nonblocking \n", __func__);
                } else if (!rs->cq_armed) {
 -                      ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
-+                      printf("%s req notify \n", __func__);
++//                    printf("%s req notify \n", __func__);
 +                      ds_req_notify_cqs(rs);
                        rs->cq_armed = 1;
                } else {
@@ -1449,7 +1487,7 @@ index a060f66..c61d689 100644
  
 -                      ret = rs_get_cq_event(rs);
 +                      ret = ds_get_cq_event(rs);
-+                      printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
++//                    printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
                        fastlock_release(&rs->cq_wait_lock);
                        fastlock_acquire(&rs->cq_lock);
                }
@@ -1457,7 +1495,7 @@ index a060f66..c61d689 100644
  
 -      rs_update_credits(rs);
        fastlock_release(&rs->cq_lock);
-+      printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
        return ret;
  }
  
@@ -1471,11 +1509,11 @@ index a060f66..c61d689 100644
        do {
 -              ret = rs_process_cq(rs, 1, test);
 +              ret = ds_process_cqs(rs, 1, test);
-+              printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
                if (!ret || nonblock || errno != EWOULDBLOCK)
                        return ret;
  
-@@ -1184,7 +2029,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2061,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
                            (e.tv_usec - s.tv_usec) + 1;
        } while (poll_time <= polling_time);
  
@@ -1484,7 +1522,7 @@ index a060f66..c61d689 100644
        return ret;
  }
  
-@@ -1219,9 +2064,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2096,19 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
  }
  
@@ -1505,7 +1543,7 @@ index a060f66..c61d689 100644
  }
  
  static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2091,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2123,7 @@ static int rs_have_rdata(struct rsocket *rs)
  
  static int rs_conn_have_rdata(struct rsocket *rs)
  {
@@ -1514,7 +1552,7 @@ index a060f66..c61d689 100644
  }
  
  static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2100,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2132,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
               !(rs->state & rs_connected);
  }
  
@@ -1550,16 +1588,15 @@ index a060f66..c61d689 100644
 +      struct ds_header *hdr;
 +      int ret;
 +
-+ret = 0;
-+      printf("%s \n", __func__);
++//    printf("%s \n", __func__);
 +      if (!(rs->state & rs_readable))
 +              return ERR(EINVAL);
 +
 +      if (!rs_have_rdata(rs)) {
-+              printf("%s need rdata \n", __func__);
++//            printf("%s need rdata \n", __func__);
 +              ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
 +                                rs_have_rdata);
-+              printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
++//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
 +              if (ret)
 +                      return ret;
 +      }
@@ -1579,13 +1616,14 @@ index a060f66..c61d689 100644
 +                      rs->rmsg_head = 0;
 +      }
 +
++      printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
 +      return len;
 +}
 +
  static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
  {
        size_t left = len;
-@@ -1290,6 +2209,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2241,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        int ret;
  
        rs = idm_at(&idm, socket);
@@ -1599,7 +1637,7 @@ index a060f66..c61d689 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1339,7 +2265,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2297,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
  
@@ -1608,7 +1646,7 @@ index a060f66..c61d689 100644
  
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
-@@ -1348,8 +2274,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2306,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
  ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
                  struct sockaddr *src_addr, socklen_t *addrlen)
  {
@@ -1626,7 +1664,7 @@ index a060f66..c61d689 100644
        ret = rrecv(socket, buf, len, flags);
        if (ret > 0 && src_addr)
                rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2326,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2358,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
  
@@ -1643,7 +1681,7 @@ index a060f66..c61d689 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1447,10 +2382,90 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2414,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1658,12 +1696,14 @@ index a060f66..c61d689 100644
 +      struct ds_udp_header hdr;
 +      struct msghdr msg;
 +      struct iovec miov[8];
++      ssize_t ret;
 +
++//    printf("%s\n", __func__);
 +      if (iovcnt > 8)
 +              return ERR(ENOTSUP);
 +
 +      hdr.tag = htonl(DS_UDP_TAG);
-+      hdr.version = 1;
++      hdr.version = rs->conn_dest->qp->hdr.version;
 +      hdr.op = op;
 +      hdr.reserved = 0;
 +      hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
@@ -1685,18 +1725,24 @@ index a060f66..c61d689 100644
 +      msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
 +      msg.msg_iov = miov;
 +      msg.msg_iovlen = iovcnt + 1;
-+      return sendmsg(rs->udp_sock, &msg, flags);
++//    printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
++      ret = sendmsg(rs->udp_sock, &msg, flags);
++      printf("%s ret %d %s\n", __func__, ret, strerror(errno));
++      return ret > 0 ? ret - sizeof hdr : ret;
 +}
 +
 +static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
 +                         int flags, uint8_t op)
 +{
 +      struct iovec iov;
++      printf("%s\n", __func__);
 +      if (buf && len) {
++//            printf("%s have buffer\n", __func__);
 +              iov.iov_base = (void *) buf;
 +              iov.iov_len = len;
 +              return ds_sendv_udp(rs, &iov, 1, flags, op);
 +      } else {
++//            printf("%s no buffer\n", __func__);
 +              return ds_sendv_udp(rs, NULL, 0, flags, op);
 +      }
 +}
@@ -1708,6 +1754,7 @@ index a060f66..c61d689 100644
 +      uint64_t offset;
 +      int ret = 0;
 +
++      printf("%s\n", __func__);
 +      if (!rs->conn_dest->ah)
 +              return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
 +
@@ -1735,7 +1782,7 @@ index a060f66..c61d689 100644
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1464,6 +2479,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2520,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1749,7 +1796,7 @@ index a060f66..c61d689 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1485,7 +2507,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2548,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1758,7 +1805,7 @@ index a060f66..c61d689 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1538,10 +2560,27 @@ out:
+@@ -1538,10 +2601,39 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
@@ -1767,6 +1814,7 @@ index a060f66..c61d689 100644
 +      struct rsocket *rs;
 +      int ret;
 +
++      printf("%s\n", __func__);
 +      rs = idm_at(&idm, socket);
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
@@ -1774,14 +1822,25 @@ index a060f66..c61d689 100644
 +
 +              return rsend(socket, buf, len, flags);
 +      }
--      return rsend(socket, buf, len, flags);
++
++      if (rs->state == rs_init) {
++              ret = ds_init_ep(rs);
++              if (ret)
++                      return ret;
++      }
++
 +      fastlock_acquire(&rs->slock);
++      printf("%s check conn dest\n", __func__);
 +      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
++              printf("%s need conn dest\n", __func__);
 +              ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
 +              if (ret)
 +                      goto out;
 +      }
++      else
++              printf("%s connected\n", __func__);
+-      return rsend(socket, buf, len, flags);
 +      ret = dsend(rs, buf, len, flags);
 +out:
 +      fastlock_release(&rs->slock);
@@ -1789,7 +1848,7 @@ index a060f66..c61d689 100644
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2639,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2692,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1798,7 +1857,7 @@ index a060f66..c61d689 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1653,7 +2692,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2745,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1807,7 +1866,7 @@ index a060f66..c61d689 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2729,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2782,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
        int ret;
  
  check_cq:
@@ -1818,7 +1877,7 @@ index a060f66..c61d689 100644
                rs_process_cq(rs, nonblock, test);
  
                revents = 0;
-@@ -1707,6 +2746,16 @@ check_cq:
+@@ -1707,6 +2799,16 @@ check_cq:
                }
  
                return revents;
@@ -1835,7 +1894,7 @@ index a060f66..c61d689 100644
        }
  
        if (rs->state == rs_listening) {
-@@ -1766,11 +2815,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2868,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
                        if (fds[i].revents)
                                return 1;
  
@@ -1855,7 +1914,7 @@ index a060f66..c61d689 100644
                        rfds[i].events = POLLIN;
                } else {
                        rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2845,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2898,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
  
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
@@ -1867,7 +1926,7 @@ index a060f66..c61d689 100644
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3004,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3057,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1876,7 +1935,7 @@ index a060f66..c61d689 100644
                return 0;
        }
  
-@@ -1959,10 +3014,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3067,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -1890,7 +1949,7 @@ index a060f66..c61d689 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -1987,13 +3042,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3095,29 @@ int rshutdown(int socket, int how)
        return 0;
  }
  
@@ -1922,7 +1981,7 @@ index a060f66..c61d689 100644
  
        rs_free(rs);
        return 0;
-@@ -2018,8 +3089,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3142,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1937,7 +1996,7 @@ index a060f66..c61d689 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3102,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3155,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1952,7 +2011,7 @@ index a060f66..c61d689 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2040,18 +3119,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,18 +3172,26 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -1986,7 +2045,7 @@ index a060f66..c61d689 100644
                        opt_on = *(int *) optval;
                        break;
                case SO_RCVBUF:
-@@ -2101,9 +3188,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3241,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
@@ -2001,7 +2060,7 @@ index a060f66..c61d689 100644
                        opt_on = *(int *) optval;
                        break;
                default:
-@@ -2315,7 +3404,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3457,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -2010,7 +2069,7 @@ index a060f66..c61d689 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3438,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -2019,7 +2078,7 @@ index a060f66..c61d689 100644
        return offset;
  }
  
-@@ -2361,7 +3450,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3503,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -2028,7 +2087,7 @@ index a060f66..c61d689 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2382,7 +3471,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3524,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -2037,7 +2096,7 @@ index a060f66..c61d689 100644
        return ret;
  }
  
-@@ -2426,7 +3515,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3568,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -2046,7 +2105,7 @@ index a060f66..c61d689 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -2476,3 +3565,269 @@ out:
+@@ -2476,3 +3618,296 @@ out:
  
        return (ret && left == count) ? ret : count - left;
  }
@@ -2090,9 +2149,11 @@ index a060f66..c61d689 100644
 +      }
 +
 +      svc_rss[++svc_cnt] = rs;
++      printf("%s rs %p\n", __func__, rs);
 +      svc_fds[svc_cnt].fd = rs->udp_sock;
 +      svc_fds[svc_cnt].events = POLLIN;
 +      svc_fds[svc_cnt].revents = 0;
++      printf("add rs udp sock %d\n",rs->udp_sock);
 +      return 0;
 +}
 +
@@ -2116,6 +2177,7 @@ index a060f66..c61d689 100644
 +      struct rs_svc_msg msg;
 +
 +      read(svc_sock[1], &msg, sizeof msg);
++      printf("%s op %d\n",__func__, msg.op);
 +      switch (msg.op) {
 +      case RS_SVC_INSERT:
 +              msg.status = rs_svc_add_rs(msg.rs);
@@ -2127,6 +2189,7 @@ index a060f66..c61d689 100644
 +              msg.status = ENOTSUP;
 +              break;
 +      }
++      printf("%s status %d\n",__func__, msg.status);
 +      write(svc_sock[1], &msg, sizeof msg);
 +}
 +
@@ -2160,6 +2223,7 @@ index a060f66..c61d689 100644
 +      struct ibv_ah_attr attr;
 +      int ret;
 +
++      printf("%s\n",__func__);
 +      if (dest->ah) {
 +              fastlock_acquire(&rs->slock);
 +              ibv_destroy_ah(dest->ah);
@@ -2211,7 +2275,18 @@ index a060f66..c61d689 100644
 +static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
 +                              union socket_addr *addr)
 +{
-+      return (udp_hdr->tag == DS_UDP_TAG) &&
++printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
++      udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
++
++printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
++udp_hdr->tag == ntohl(DS_UDP_TAG),
++      udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
++        udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
++       udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
++        udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
++
++
++      return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
 +              ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
 +                udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
 +               (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
@@ -2226,6 +2301,7 @@ index a060f66..c61d689 100644
 +      struct ibv_sge sge;
 +      uint64_t offset;
 +
++      printf("%s\n",__func__);
 +      if (!ds_can_send(rs)) {
 +              if (ds_get_comp(rs, 0, ds_can_send))
 +                      return;
@@ -2254,7 +2330,9 @@ index a060f66..c61d689 100644
 +      socklen_t addrlen = sizeof addr;
 +      int len, ret;
 +
++      printf("%s\n",__func__);
 +      ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
++      printf("%s recvfrom %d\n",__func__, ret);
 +      if (ret < DS_UDP_IPV4_HDR_LEN)
 +              return;
 +
@@ -2262,10 +2340,12 @@ index a060f66..c61d689 100644
 +      if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
 +              return;
 +
++      printf("%s valid hdr\n",__func__);
 +      len = ret - udp_hdr->length;
 +      udp_hdr->tag = ntohl(udp_hdr->tag);
 +      udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
 +      ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
++      printf("%s ds_get_dest %d\n",__func__, ret);
 +      if (ret)
 +              return;
 +
@@ -2277,10 +2357,12 @@ index a060f66..c61d689 100644
 +      cur_dest = rs->conn_dest;
 +      if (udp_hdr->op == RS_OP_DATA) {
 +              rs->conn_dest = &dest->qp->dest;
++              printf("%s forwarding msg\n",__func__);
 +              rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
 +      }
 +
 +      rs->conn_dest = dest;
++      printf("%s sending resp\n",__func__);
 +      ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
 +      rs->conn_dest = cur_dest;
 +      fastlock_release(&rs->slock);
@@ -2291,6 +2373,7 @@ index a060f66..c61d689 100644
 +      struct rs_svc_msg msg;
 +      int i, ret;
 +
++      printf("%s\n",__func__);
 +      ret = rs_svc_grow_sets();
 +      if (ret) {
 +              msg.status = ret;
@@ -2301,10 +2384,13 @@ index a060f66..c61d689 100644
 +      svc_fds[0].fd = svc_sock[1];
 +      svc_fds[0].events = POLLIN;
 +      do {
++              printf("%s svc cnt %d\n",__func__, svc_cnt);
 +              for (i = 0; i <= svc_cnt; i++)
 +                      svc_fds[i].revents = 0;
 +
++              printf("%s poll\n",__func__);
 +              poll(svc_fds, svc_cnt + 1, -1);
++              printf("%s poll done\n",__func__);
 +              if (svc_fds[0].revents)
 +                      rs_svc_process_sock();
 +
@@ -2312,7 +2398,7 @@ index a060f66..c61d689 100644
 +                      if (svc_fds[i].revents)
 +                              rs_svc_process_rs(svc_rss[i]);
 +              }
-+      } while (svc_cnt > 1);
++      } while (svc_cnt >= 1);
 +
 +      return NULL;
 +}
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index c15e7fc..0000000
+++ /dev/null
@@ -1,676 +0,0 @@
-Bottom: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
-Top:    136936c0a82503ee0da9daccd8b948cd09e58b64
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-12-12 16:49:44 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/cma.c b/src/cma.c
-index 0f58966..ff9b426 100755
---- a/src/cma.c
-+++ b/src/cma.c
-@@ -2238,6 +2238,7 @@ int ucma_max_qpsize(struct rdma_cm_id *id)
-       if (id && id_priv->cma_dev) {
-               max_size = id_priv->cma_dev->max_qpsize;
-       } else {
-+              ucma_init();
-               for (i = 0; i < cma_dev_cnt; i++) {
-                       if (!max_size || max_size > cma_dev_array[i].max_qpsize)
-                               max_size = cma_dev_array[i].max_qpsize;
-diff --git a/src/rsocket.c b/src/rsocket.c
-index c61d689..6fa4c68 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -399,6 +399,7 @@ static int rs_add_to_svc(struct rsocket *rs)
-       msg.op = RS_SVC_INSERT;
-       msg.status = EINVAL;
-+      printf("%s rs %p\n", __func__, rs);
-       msg.rs = rs;
-       write(svc_sock[0], &msg, sizeof msg);
-       read(svc_sock[0], &msg, sizeof msg);
-@@ -602,7 +603,9 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
-               if (!ret && rs->state < rs_connected)
-                       ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
-       } else {
-+              printf("%s set nonblock\n", __func__);
-               ret = fcntl(rs->epfd, F_SETFL, arg);
-+              printf("%s fcntl %d\n", __func__, ret);
-               if (!ret && rs->qp_list) {
-                       qp = rs->qp_list;
-@@ -640,6 +643,8 @@ static void ds_set_qp_size(struct rsocket *rs)
- {
-       uint16_t max_size;
-+      printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+                      rs->rq_size, rs->rbuf_size);
-       max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
-       if (rs->sq_size > max_size)
-@@ -656,6 +661,8 @@ static void ds_set_qp_size(struct rsocket *rs)
-               rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
-       else
-               rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-+      printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
-+                      rs->rq_size, rs->rbuf_size);
- }
- static int rs_init_bufs(struct rsocket *rs)
-@@ -728,15 +735,18 @@ static int ds_init_bufs(struct ds_qp *qp)
- static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
- {
-       cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
-+      printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
-       if (!cm_id->recv_cq_channel)
-               return -1;
-       cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
-                                      cm_id, cm_id->recv_cq_channel, 0);
-+      printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
-       if (!cm_id->recv_cq)
-               goto err1;
-       if (rs->fd_flags & O_NONBLOCK) {
-+              printf("%s set nonblock\n", __func__);
-               if (rs_set_nonblocking(rs, O_NONBLOCK))
-                       goto err2;
-       }
-@@ -876,6 +886,8 @@ static void ds_free_qp(struct ds_qp *qp)
- static void ds_free(struct rsocket *rs)
- {
-+      struct ds_qp *qp;
-+
-       if (rs->state & (rs_readable | rs_writable))
-               rs_remove_from_svc(rs);
-@@ -888,12 +900,9 @@ static void ds_free(struct rsocket *rs)
-       if (rs->dmsg)
-               free(rs->dmsg);
--      if (rs->smsg_free)
--              free(rs->smsg_free);
--
--      while (rs->qp_list) {
--              ds_remove_qp(rs, rs->qp_list);
--              ds_free_qp(rs->qp_list);
-+      while ((qp = rs->qp_list)) {
-+              ds_remove_qp(rs, qp);
-+              ds_free_qp(qp);
-       }
-       if (rs->epfd >= 0)
-@@ -1016,6 +1025,40 @@ static int ds_init(struct rsocket *rs, int domain)
-       return 0;
- }
-+static int ds_init_ep(struct rsocket *rs)
-+{
-+      struct ds_smsg *msg;
-+      int i, ret;
-+
-+      ds_set_qp_size(rs);
-+
-+      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-+      if (!rs->sbuf)
-+              return ERR(ENOMEM);
-+
-+      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+      if (!rs->dmsg)
-+              return ERR(ENOMEM);
-+
-+      rs->sqe_avail = rs->sq_size;
-+      rs->rqe_avail = rs->rq_size;
-+
-+      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+      msg = rs->smsg_free;
-+      for (i = 0; i < rs->sq_size - 1; i++) {
-+              msg->next = (void *) msg + RS_SNDLOWAT;
-+              msg = msg->next;
-+      }
-+      msg->next = NULL;
-+
-+      ret = rs_add_to_svc(rs);
-+      if (ret)
-+              return ret;
-+
-+      rs->state = rs_readable | rs_writable;
-+      return 0;
-+}
-+
- int rsocket(int domain, int type, int protocol)
- {
-       struct rsocket *rs;
-@@ -1040,6 +1083,7 @@ int rsocket(int domain, int type, int protocol)
-               rs->cm_id->route.addr.src_addr.sa_family = domain;
-               index = rs->cm_id->channel->fd;
-       } else {
-+              printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
-               ret = ds_init(rs, domain);
-               if (ret)
-                       goto err;
-@@ -1069,12 +1113,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               if (!ret)
-                       rs->state = rs_bound;
-       } else {
--              ret = bind(rs->udp_sock, addr, addrlen);
--              if (!ret) {
--                      ret = rs_add_to_svc(rs);
--                      if (!ret)
--                              rs->state = rs_readable | rs_writable;
-+              if (rs->state == rs_init) {
-+                      ret = ds_init_ep(rs);
-+                      if (ret)
-+                              return ret;
-               }
-+              ret = bind(rs->udp_sock, addr, addrlen);
-       }
-       return ret;
- }
-@@ -1256,41 +1300,6 @@ connected:
-       return ret;
- }
--static int ds_init_ep(struct rsocket *rs)
--{
--      struct ds_smsg *msg;
--      int i, ret;
--
--      ds_set_qp_size(rs);
--
--      rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
--      if (!rs->sbuf)
--              return ERR(ENOMEM);
--
--      rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
--      if (!rs->dmsg)
--              return ERR(ENOMEM);
--
--      rs->sbuf_bytes_avail = rs->sbuf_size;
--      rs->sqe_avail = rs->sq_size;
--      rs->rqe_avail = rs->rq_size;
--
--      rs->smsg_free = (struct ds_smsg *) rs->sbuf;
--      msg = rs->smsg_free;
--      for (i = 0; i < rs->sq_size - 1; i++) {
--              msg->next = (void *) msg + i * RS_SNDLOWAT;
--              msg = msg->next;
--      }
--      msg->next = NULL;
--
--      ret = rs_add_to_svc(rs);
--      if (ret)
--              return ret;
--
--      rs->state = rs_readable | rs_writable;
--      return 0;
--}
--
- static int rs_any_addr(const union socket_addr *addr)
- {
-       if (addr->sa.sa_family == AF_INET) {
-@@ -1374,38 +1383,44 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
- }
- static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
--                      socklen_t addrlen, struct ds_qp **qp)
-+                      socklen_t addrlen, struct ds_qp **new_qp)
- {
-+      struct ds_qp *qp;
-       struct ibv_qp_init_attr qp_attr;
-       struct epoll_event event;
-       int i, ret;
--      *qp = calloc(1, sizeof(struct ds_qp));
--      if (!*qp)
-+printf("%s\n", __func__);
-+      qp = calloc(1, sizeof(*qp));
-+      if (!qp)
-               return ERR(ENOMEM);
--      (*qp)->rs = rs;
--      ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
-+      qp->rs = rs;
-+      ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
-+      printf("%s rdma_create_id %d\n", __func__, ret);
-       if (ret)
-               goto err;
--      ds_format_hdr(&(*qp)->hdr, src_addr);
--      ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
-+      ds_format_hdr(&qp->hdr, src_addr);
-+      ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
-+      printf("%s rdma_bind_addr %d\n", __func__, ret);
-       if (ret)
-               goto err;
--      ret = ds_init_bufs(*qp);
-+      ret = ds_init_bufs(qp);
-+      printf("%s ds_init_bufs %d\n", __func__, ret);
-       if (ret)
-               goto err;
--      ret = rs_create_cq(rs, (*qp)->cm_id);
-+      ret = rs_create_cq(rs, qp->cm_id);
-+      printf("%s rs_create_cq %d\n", __func__, ret);
-       if (ret)
-               goto err;
-       memset(&qp_attr, 0, sizeof qp_attr);
-       qp_attr.qp_context = qp;
--      qp_attr.send_cq = rs->cm_id->send_cq;
--      qp_attr.recv_cq = rs->cm_id->recv_cq;
-+      qp_attr.send_cq = qp->cm_id->send_cq;
-+      qp_attr.recv_cq = qp->cm_id->recv_cq;
-       qp_attr.qp_type = IBV_QPT_UD;
-       qp_attr.sq_sig_all = 1;
-       qp_attr.cap.max_send_wr = rs->sq_size;
-@@ -1413,31 +1428,35 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-       qp_attr.cap.max_send_sge = 2;
-       qp_attr.cap.max_recv_sge = 1;
-       qp_attr.cap.max_inline_data = rs->sq_inline;
--      ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
-+      ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
-+      printf("%s rdma_create_qp %d\n", __func__, ret);
-       if (ret)
-               goto err;
--      ret = ds_add_qp_dest(*qp, src_addr, addrlen);
-+      ret = ds_add_qp_dest(qp, src_addr, addrlen);
-+      printf("%s ds_add_qp_dest %d\n", __func__, ret);
-       if (ret)
-               goto err;
-       event.events = EPOLLIN;
--      event.data.ptr = *qp;
-+      event.data.ptr = qp;
-       ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
--                      (*qp)->cm_id->recv_cq_channel->fd, &event);
-+                      qp->cm_id->recv_cq_channel->fd, &event);
-+      printf("%s epoll_ctl %d\n", __func__, ret);
-       if (ret)
-               goto err;
-       for (i = 0; i < rs->rq_size; i++) {
--              ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
-+              ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
-               if (ret)
-                       goto err;
-       }
--      ds_insert_qp(rs, *qp);
-+      ds_insert_qp(rs, qp);
-+      *new_qp = qp;
-       return 0;
- err:
--      ds_free_qp(*qp);
-+      ds_free_qp(qp);
-       return ret;
- }
-@@ -1464,38 +1483,42 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
-       union socket_addr src_addr;
-       socklen_t src_len;
-       struct ds_qp *qp;
-+      struct ds_dest **tdest, *new_dest;
-       int ret = 0;
-+      printf("%s \n", __func__);
-       fastlock_acquire(&rs->map_lock);
--      dest = tfind(addr, &rs->dest_map, ds_compare_addr);
--      if (dest)
--              goto out;
--
--      if (rs->state == rs_init) {
--              ret = ds_init_ep(rs);
--              if (ret)
--                      goto out;
--      }
-+      tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+      printf("%s tfind %p\n", __func__, dest);
-+      if (tdest)
-+              goto found;
-       ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
-+      printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
-       if (ret)
-               goto out;
-       ret = ds_get_qp(rs, &src_addr, src_len, &qp);
-+      printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
-       if (ret)
-               goto out;
--      if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
--              *dest = calloc(1, sizeof(struct ds_dest));
--              if (!*dest) {
-+      tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
-+      if (!tdest) {
-+              printf("%s adding dest into map\n", __func__);
-+              new_dest = calloc(1, sizeof(*new_dest));
-+              if (!new_dest) {
-                       ret = ERR(ENOMEM);
-                       goto out;
-               }
--              memcpy(&(*dest)->addr, addr, addrlen);
--              (*dest)->qp = qp;
--              tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
-+              memcpy(&new_dest->addr, addr, addrlen);
-+              new_dest->qp = qp;
-+              tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
-       }
-+
-+found:
-+      *dest = *tdest;
- out:
-       fastlock_release(&rs->map_lock);
-       return ret;
-@@ -1511,10 +1534,19 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-               memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
-               ret = rs_do_connect(rs);
-       } else {
-+              printf("%s\n", __func__);
-+              if (rs->state == rs_init) {
-+                      ret = ds_init_ep(rs);
-+                      if (ret)
-+                              return ret;
-+              }
-+
-               fastlock_acquire(&rs->slock);
-               ret = connect(rs->udp_sock, addr, addrlen);
-+              printf("%s connect %d %s\n", __func__, ret, strerror(errno));
-               if (!ret)
-                       ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+              printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
-               fastlock_release(&rs->slock);
-       }
-       return ret;
-@@ -1983,14 +2015,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
-       do {
-               ds_poll_cqs(rs);
-               if (test(rs)) {
--                      printf("%s test succeeded\n", __func__);
-+//                    printf("%s test succeeded\n", __func__);
-                       ret = 0;
-                       break;
-               } else if (nonblock) {
-                       ret = ERR(EWOULDBLOCK);
--                      printf("%s nonblocking \n", __func__);
-+//                    printf("%s nonblocking \n", __func__);
-               } else if (!rs->cq_armed) {
--                      printf("%s req notify \n", __func__);
-+//                    printf("%s req notify \n", __func__);
-                       ds_req_notify_cqs(rs);
-                       rs->cq_armed = 1;
-               } else {
-@@ -1998,14 +2030,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
-                       fastlock_release(&rs->cq_lock);
-                       ret = ds_get_cq_event(rs);
--                      printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
-+//                    printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
-                       fastlock_release(&rs->cq_wait_lock);
-                       fastlock_acquire(&rs->cq_lock);
-               }
-       } while (!ret);
-       fastlock_release(&rs->cq_lock);
--      printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-       return ret;
- }
-@@ -2017,7 +2049,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-       do {
-               ret = ds_process_cqs(rs, 1, test);
--              printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-               if (!ret || nonblock || errno != EWOULDBLOCK)
-                       return ret;
-@@ -2132,16 +2164,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
-       struct ds_header *hdr;
-       int ret;
--ret = 0;
--      printf("%s \n", __func__);
-+//    printf("%s \n", __func__);
-       if (!(rs->state & rs_readable))
-               return ERR(EINVAL);
-       if (!rs_have_rdata(rs)) {
--              printf("%s need rdata \n", __func__);
-+//            printf("%s need rdata \n", __func__);
-               ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
-                                 rs_have_rdata);
--              printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
-+//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-               if (ret)
-                       return ret;
-       }
-@@ -2161,6 +2192,7 @@ ret = 0;
-                       rs->rmsg_head = 0;
-       }
-+      printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-       return len;
- }
-@@ -2392,12 +2424,14 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       struct ds_udp_header hdr;
-       struct msghdr msg;
-       struct iovec miov[8];
-+      ssize_t ret;
-+//    printf("%s\n", __func__);
-       if (iovcnt > 8)
-               return ERR(ENOTSUP);
-       hdr.tag = htonl(DS_UDP_TAG);
--      hdr.version = 1;
-+      hdr.version = rs->conn_dest->qp->hdr.version;
-       hdr.op = op;
-       hdr.reserved = 0;
-       hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-@@ -2419,18 +2453,24 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-       msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
-       msg.msg_iov = miov;
-       msg.msg_iovlen = iovcnt + 1;
--      return sendmsg(rs->udp_sock, &msg, flags);
-+//    printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
-+      ret = sendmsg(rs->udp_sock, &msg, flags);
-+      printf("%s ret %d %s\n", __func__, ret, strerror(errno));
-+      return ret > 0 ? ret - sizeof hdr : ret;
- }
- static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
-                          int flags, uint8_t op)
- {
-       struct iovec iov;
-+      printf("%s\n", __func__);
-       if (buf && len) {
-+//            printf("%s have buffer\n", __func__);
-               iov.iov_base = (void *) buf;
-               iov.iov_len = len;
-               return ds_sendv_udp(rs, &iov, 1, flags, op);
-       } else {
-+//            printf("%s no buffer\n", __func__);
-               return ds_sendv_udp(rs, NULL, 0, flags, op);
-       }
- }
-@@ -2442,6 +2482,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-       uint64_t offset;
-       int ret = 0;
-+      printf("%s\n", __func__);
-       if (!rs->conn_dest->ah)
-               return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
-@@ -2563,6 +2604,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-       struct rsocket *rs;
-       int ret;
-+      printf("%s\n", __func__);
-       rs = idm_at(&idm, socket);
-       if (rs->type == SOCK_STREAM) {
-               if (dest_addr || addrlen)
-@@ -2571,12 +2613,23 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-               return rsend(socket, buf, len, flags);
-       }
-+      if (rs->state == rs_init) {
-+              ret = ds_init_ep(rs);
-+              if (ret)
-+                      return ret;
-+      }
-+
-       fastlock_acquire(&rs->slock);
-+      printf("%s check conn dest\n", __func__);
-       if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+              printf("%s need conn dest\n", __func__);
-               ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
-               if (ret)
-                       goto out;
-       }
-+      else
-+              printf("%s connected\n", __func__);
-+
-       ret = dsend(rs, buf, len, flags);
- out:
-       fastlock_release(&rs->slock);
-@@ -3605,9 +3658,11 @@ static int rs_svc_add_rs(struct rsocket *rs)
-       }
-       svc_rss[++svc_cnt] = rs;
-+      printf("%s rs %p\n", __func__, rs);
-       svc_fds[svc_cnt].fd = rs->udp_sock;
-       svc_fds[svc_cnt].events = POLLIN;
-       svc_fds[svc_cnt].revents = 0;
-+      printf("add rs udp sock %d\n",rs->udp_sock);
-       return 0;
- }
-@@ -3631,6 +3686,7 @@ static void rs_svc_process_sock(void)
-       struct rs_svc_msg msg;
-       read(svc_sock[1], &msg, sizeof msg);
-+      printf("%s op %d\n",__func__, msg.op);
-       switch (msg.op) {
-       case RS_SVC_INSERT:
-               msg.status = rs_svc_add_rs(msg.rs);
-@@ -3642,6 +3698,7 @@ static void rs_svc_process_sock(void)
-               msg.status = ENOTSUP;
-               break;
-       }
-+      printf("%s status %d\n",__func__, msg.status);
-       write(svc_sock[1], &msg, sizeof msg);
- }
-@@ -3675,6 +3732,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
-       struct ibv_ah_attr attr;
-       int ret;
-+      printf("%s\n",__func__);
-       if (dest->ah) {
-               fastlock_acquire(&rs->slock);
-               ibv_destroy_ah(dest->ah);
-@@ -3726,7 +3784,18 @@ out:
- static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
-                               union socket_addr *addr)
- {
--      return (udp_hdr->tag == DS_UDP_TAG) &&
-+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
-+      udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
-+
-+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
-+udp_hdr->tag == ntohl(DS_UDP_TAG),
-+      udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
-+        udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
-+       udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
-+        udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
-+
-+
-+      return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
-               ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
-                 udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
-                (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
-@@ -3741,6 +3810,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
-       struct ibv_sge sge;
-       uint64_t offset;
-+      printf("%s\n",__func__);
-       if (!ds_can_send(rs)) {
-               if (ds_get_comp(rs, 0, ds_can_send))
-                       return;
-@@ -3769,7 +3839,9 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       socklen_t addrlen = sizeof addr;
-       int len, ret;
-+      printf("%s\n",__func__);
-       ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
-+      printf("%s recvfrom %d\n",__func__, ret);
-       if (ret < DS_UDP_IPV4_HDR_LEN)
-               return;
-@@ -3777,10 +3849,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
-               return;
-+      printf("%s valid hdr\n",__func__);
-       len = ret - udp_hdr->length;
-       udp_hdr->tag = ntohl(udp_hdr->tag);
-       udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
-       ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
-+      printf("%s ds_get_dest %d\n",__func__, ret);
-       if (ret)
-               return;
-@@ -3792,10 +3866,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       cur_dest = rs->conn_dest;
-       if (udp_hdr->op == RS_OP_DATA) {
-               rs->conn_dest = &dest->qp->dest;
-+              printf("%s forwarding msg\n",__func__);
-               rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
-       }
-       rs->conn_dest = dest;
-+      printf("%s sending resp\n",__func__);
-       ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
-       rs->conn_dest = cur_dest;
-       fastlock_release(&rs->slock);
-@@ -3806,6 +3882,7 @@ static void *rs_svc_run(void *arg)
-       struct rs_svc_msg msg;
-       int i, ret;
-+      printf("%s\n",__func__);
-       ret = rs_svc_grow_sets();
-       if (ret) {
-               msg.status = ret;
-@@ -3816,10 +3893,13 @@ static void *rs_svc_run(void *arg)
-       svc_fds[0].fd = svc_sock[1];
-       svc_fds[0].events = POLLIN;
-       do {
-+              printf("%s svc cnt %d\n",__func__, svc_cnt);
-               for (i = 0; i <= svc_cnt; i++)
-                       svc_fds[i].revents = 0;
-+              printf("%s poll\n",__func__);
-               poll(svc_fds, svc_cnt + 1, -1);
-+              printf("%s poll done\n",__func__);
-               if (svc_fds[0].revents)
-                       rs_svc_process_sock();
-@@ -3827,7 +3907,7 @@ static void *rs_svc_run(void *arg)
-                       if (svc_fds[i].revents)
-                               rs_svc_process_rs(svc_rss[i]);
-               }
--      } while (svc_cnt > 1);
-+      } while (svc_cnt >= 1);
-       return NULL;
- }