]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh
authorSean Hefty <sean.hefty@intel.com>
Tue, 4 Dec 2012 08:33:07 +0000 (00:33 -0800)
committerSean Hefty <sean.hefty@intel.com>
Tue, 4 Dec 2012 08:33:07 +0000 (00:33 -0800)
meta
patches/dsocket
patches/refresh-temp [deleted file]

diff --git a/meta b/meta
index dc8e104a24ba61b4702396374109ef4887758cf6..84c727936ce788b138a300c3c434d1bf1e406883 100644 (file)
--- a/meta
+++ b/meta
@@ -1,10 +1,9 @@
 Version: 1
-Previous: 5538681efe7d5e51c6d299764f45fe2a94c1992f
-Head: 66aea9f888c312fd13dcf1087fbfc120a85abb03
+Previous: d6d73c60c00fe983d9ca845b05cf1ae1d42b2507
+Head: ad58289e8917c3b926ccdf0d9b645f0e87b3bd87
 Applied:
   librdmacm-fixed-build-problem-: c6bfc1c5b15e6207188a97e8a5df0405cfd2587f
-  dsocket: dae5a20b3a78e30d9a69785bc0f37dc626f08425
-  refresh-temp: 66aea9f888c312fd13dcf1087fbfc120a85abb03
+  dsocket: ad58289e8917c3b926ccdf0d9b645f0e87b3bd87
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
index 047969acbc88e3cffba5cec003244071401a1f58..ec3a01820fb3ece1c5e28413a1750fab558f182c 100644 (file)
@@ -1,5 +1,5 @@
 Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top:    44e30115528e8117f25b73ebedc1c7f5ec687bb6
+Top:    ac38ccc22b5116e46418ba99f1924f1c35d49ffc
 Author: Sean Hefty <sean.hefty@intel.com>
 Date:   2012-11-09 10:26:38 -0800
 
@@ -112,7 +112,7 @@ index 0a0370e..7135a61 100644
  {
        errno = err;
 diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..31b946a 100644
+index a060f66..018af90 100644
 --- a/src/rsocket.c
 +++ b/src/rsocket.c
 @@ -47,6 +47,7 @@
@@ -373,12 +373,12 @@ index a060f66..31b946a 100644
 -      void             *target_buffer_list;
 -      volatile struct rs_sge    *target_sgl;
 -      struct rs_iomap  *target_iomap;
--
++#define DS_UDP_TAG 0x55555555
 -      uint32_t          rbuf_size;
 -      struct ibv_mr    *rmr;
 -      uint8_t           *rbuf;
-+#define DS_UDP_TAG 0x55555555
+-
 -      uint32_t          sbuf_size;
 -      struct ibv_mr    *smr;
 -      struct ibv_sge    ssgl[2];
@@ -626,9 +626,7 @@ index a060f66..31b946a 100644
  
 -static int rs_create_cq(struct rsocket *rs)
 +static int ds_init_bufs(struct ds_qp *qp)
- {
--      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
--      if (!rs->cm_id->recv_cq_channel)
++{
 +      qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
 +      if (!qp->rbuf)
 +              return ERR(ENOMEM);
@@ -645,7 +643,9 @@ index a060f66..31b946a 100644
 +}
 +
 +static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
-+{
+ {
+-      rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+-      if (!rs->cm_id->recv_cq_channel)
 +      cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
 +      if (!cm_id->recv_cq_channel)
                return -1;
@@ -857,11 +857,11 @@ index a060f66..31b946a 100644
 +              ret = ds_init(rs, domain);
 +              if (ret)
 +                      goto err;
-+
-+              index = rs->udp_sock;
-+      }
  
 -      ret = rs_insert(rs);
++              index = rs->udp_sock;
++      }
++
 +      ret = rs_insert(rs, index);
        if (ret < 0)
                goto err;
@@ -1227,47 +1227,11 @@ index a060f66..31b946a 100644
                                }
                                break;
                        case RS_OP_WRITE:
-@@ -1097,8 +1765,181 @@ static int rs_poll_cq(struct rsocket *rs)
-       return ret;
- }
--static int rs_get_cq_event(struct rsocket *rs)
-+static int rs_get_cq_event(struct rsocket *rs)
-+{
-+      struct ibv_cq *cq;
-+      void *context;
-+      int ret;
-+
-+      if (!rs->cq_armed)
-+              return 0;
-+
-+      ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
-+      if (!ret) {
-+              ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
-+              rs->cq_armed = 0;
-+      } else if (errno != EAGAIN) {
-+              rs->state = rs_error;
-+      }
-+
-+      return ret;
-+}
-+
-+/*
-+ * Although we serialize rsend and rrecv calls with respect to themselves,
-+ * both calls may run simultaneously and need to poll the CQ for completions.
-+ * We need to serialize access to the CQ, but rsend and rrecv need to
-+ * allow each other to make forward progress.
-+ *
-+ * For example, rsend may need to wait for credits from the remote side,
-+ * which could be stalled until the remote process calls rrecv.  This should
-+ * not block rrecv from receiving data from the remote side however.
-+ *
-+ * We handle this by using two locks.  The cq_lock protects against polling
-+ * the CQ and processing completions.  The cq_wait_lock serializes access to
-+ * waiting on the CQ.
-+ */
-+static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
-+{
+@@ -1133,46 +1801,205 @@ static int rs_get_cq_event(struct rsocket *rs)
+  */
+ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+-      int ret;
 +      int ret;
 +
 +      fastlock_acquire(&rs->cq_lock);
@@ -1404,51 +1368,33 @@ index a060f66..31b946a 100644
 +}
 +
 +static int ds_get_cq_event(struct rsocket *rs)
- {
++{
 +      struct epoll_event event;
 +      struct ds_qp *qp;
-       struct ibv_cq *cq;
-       void *context;
-       int ret;
-@@ -1106,73 +1947,59 @@ static int rs_get_cq_event(struct rsocket *rs)
-       if (!rs->cq_armed)
-               return 0;
++      struct ibv_cq *cq;
++      void *context;
++      int ret;
++
++      if (!rs->cq_armed)
++              return 0;
++
 +      ret = epoll_wait(rs->epfd, &event, 1, -1);
 +      if (ret <= 0)
 +              return ret;
 +
 +      qp = event.data.ptr;
-       ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
-       if (!ret) {
-               ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
++      ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
++      if (!ret) {
++              ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
 +              qp->cq_armed = 0;
-               rs->cq_armed = 0;
--      } else if (errno != EAGAIN) {
--              rs->state = rs_error;
-       }
-       return ret;
- }
--/*
-- * Although we serialize rsend and rrecv calls with respect to themselves,
-- * both calls may run simultaneously and need to poll the CQ for completions.
-- * We need to serialize access to the CQ, but rsend and rrecv need to
-- * allow each other to make forward progress.
-- *
-- * For example, rsend may need to wait for credits from the remote side,
-- * which could be stalled until the remote process calls rrecv.  This should
-- * not block rrecv from receiving data from the remote side however.
-- *
-- * We handle this by using two locks.  The cq_lock protects against polling
-- * the CQ and processing completions.  The cq_wait_lock serializes access to
-- * waiting on the CQ.
-- */
--static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
++              rs->cq_armed = 0;
++      }
++
++      return ret;
++}
++
 +static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
--      int ret;
++{
 +      int ret = 0;
  
        fastlock_acquire(&rs->cq_lock);
@@ -1658,7 +1604,7 @@ index a060f66..31b946a 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1447,10 +2359,89 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2359,91 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
  
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
@@ -1668,7 +1614,7 @@ index a060f66..31b946a 100644
  }
  
 +static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-+                          int iovcnt, int flags)
++                          int iovcnt, int flags, uint8_t op)
 +{
 +      struct ds_udp_header hdr;
 +      struct msghdr msg;
@@ -1697,6 +1643,8 @@ index a060f66..31b946a 100644
 +              memcpy(&miov[1], iov, sizeof *iov * iovcnt);
 +
 +      memset(&msg, 0, sizeof msg);
++      msg.msg_name = rs->conn_dest->addr;
++      msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
 +      msg.msg_iov = miov;
 +      msg.msg_iovlen = iovcnt + 1;
 +      return sendmsg(rs->udp_sock, msg, flags);
@@ -1749,7 +1697,7 @@ index a060f66..31b946a 100644
  /*
   * We overlap sending the data, by posting a small work request immediately,
   * then increasing the size of the send on each iteration.
-@@ -1464,6 +2455,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2457,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -1763,7 +1711,7 @@ index a060f66..31b946a 100644
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
-@@ -1485,7 +2483,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2485,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1772,7 +1720,7 @@ index a060f66..31b946a 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1538,10 +2536,26 @@ out:
+@@ -1538,10 +2538,26 @@ out:
  ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
  {
@@ -1784,11 +1732,11 @@ index a060f66..31b946a 100644
 +      if (rs->type == SOCK_STREAM) {
 +              if (dest_addr || addrlen)
 +                      return ERR(EISCONN);
-+
-+              return rsend(socket, buf, len, flags);
-+      }
  
 -      return rsend(socket, buf, len, flags);
++              return rsend(socket, buf, len, flags);
++      }
++
 +      fastlock_acquire(&rs->slock);
 +      if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
 +              ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
@@ -1802,7 +1750,7 @@ index a060f66..31b946a 100644
  }
  
  static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2614,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2616,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -1811,7 +1759,7 @@ index a060f66..31b946a 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -1653,7 +2667,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2669,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
  
@@ -1820,7 +1768,7 @@ index a060f66..31b946a 100644
  }
  
  ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2704,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2706,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
        int ret;
  
  check_cq:
@@ -1831,7 +1779,7 @@ index a060f66..31b946a 100644
                rs_process_cq(rs, nonblock, test);
  
                revents = 0;
-@@ -1707,6 +2721,16 @@ check_cq:
+@@ -1707,6 +2723,16 @@ check_cq:
                }
  
                return revents;
@@ -1848,7 +1796,7 @@ index a060f66..31b946a 100644
        }
  
        if (rs->state == rs_listening) {
-@@ -1766,11 +2790,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2792,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
                        if (fds[i].revents)
                                return 1;
  
@@ -1868,7 +1816,7 @@ index a060f66..31b946a 100644
                        rfds[i].events = POLLIN;
                } else {
                        rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2820,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2822,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
  
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
@@ -1880,7 +1828,7 @@ index a060f66..31b946a 100644
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
-@@ -1949,7 +2979,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +2981,7 @@ int rshutdown(int socket, int how)
  
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
@@ -1889,7 +1837,7 @@ index a060f66..31b946a 100644
                return 0;
        }
  
-@@ -1959,10 +2989,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +2991,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
@@ -1903,7 +1851,7 @@ index a060f66..31b946a 100644
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
-@@ -1987,13 +3017,31 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3019,31 @@ int rshutdown(int socket, int how)
        return 0;
  }
  
@@ -1937,7 +1885,7 @@ index a060f66..31b946a 100644
  
        rs_free(rs);
        return 0;
-@@ -2018,8 +3066,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3068,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1952,7 +1900,7 @@ index a060f66..31b946a 100644
  }
  
  int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3079,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3081,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
  
        rs = idm_at(&idm, socket);
@@ -1967,7 +1915,7 @@ index a060f66..31b946a 100644
  }
  
  int rsetsockopt(int socket, int level, int optname,
-@@ -2040,18 +3096,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,18 +3098,26 @@ int rsetsockopt(int socket, int level, int optname,
  
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
@@ -2001,7 +1949,7 @@ index a060f66..31b946a 100644
                        opt_on = *(int *) optval;
                        break;
                case SO_RCVBUF:
-@@ -2101,9 +3165,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3167,11 @@ int rsetsockopt(int socket, int level, int optname,
                opts = &rs->ipv6_opts;
                switch (optname) {
                case IPV6_V6ONLY:
@@ -2016,7 +1964,7 @@ index a060f66..31b946a 100644
                        opt_on = *(int *) optval;
                        break;
                default:
-@@ -2315,7 +3381,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3383,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
  
@@ -2025,7 +1973,7 @@ index a060f66..31b946a 100644
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3415,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3417,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
  out:
@@ -2034,7 +1982,7 @@ index a060f66..31b946a 100644
        return offset;
  }
  
-@@ -2361,7 +3427,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3429,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
  
        rs = idm_at(&idm, socket);
@@ -2043,7 +1991,7 @@ index a060f66..31b946a 100644
  
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
-@@ -2382,7 +3448,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3450,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
  out:
@@ -2052,7 +2000,7 @@ index a060f66..31b946a 100644
        return ret;
  }
  
-@@ -2426,7 +3492,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3494,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
@@ -2061,7 +2009,7 @@ index a060f66..31b946a 100644
                                ret = ERR(ECONNRESET);
                                break;
                        }
-@@ -2476,3 +3542,256 @@ out:
+@@ -2476,3 +3544,269 @@ out:
  
        return (ret && left == count) ? ret : count - left;
  }
@@ -2233,21 +2181,37 @@ index a060f66..31b946a 100644
 +                udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
 +}
 +
-+static int rs_svc_comp_local_net(struct ds_dest *dest,
-+                               struct ds_udp_header *udp_hdr)
++static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
++                         union socket_addr *src)
 +{
-+      return ((udp_hdr->version == 4) &&
-+              (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
-+              (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
-+             ((udp_hdr->version == 6) &&
-+              (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
-+              (!memcmp(udp_hdr->addr.ipv6,
-+                       &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
++      struct ds_header hdr;
++      struct ds_smsg *msg;
++      struct ibv_sge sge;
++      uint64_t offset;
++
++      if (!ds_can_send(rs)) {
++              if (ds_get_comp(rs, 0, ds_can_send))
++                      return;
++      }
++
++      msg = rs->smsg_free;
++      rs->smsg_free = msg->next;
++      rs->sqe_avail--;
++
++      ds_format_hdr(&hdr, src);
++      memcpy((void *) msg, &hdr, hdr.length);
++      memcpy((void *) msg + hdr.length, buf, len);
++      sge.addr = (uintptr_t) msg;
++      sge.length = hdr.length + len;
++      sge.lkey = rs->smr->lkey;
++      offset = (uint8_t *) msg - rs->sbuf;
++
++      ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
 +}
 +
 +static void rs_svc_process_rs(struct rsocket *rs)
 +{
-+      struct ds_dest *dest;
++      struct ds_dest *dest, *cur_dest;
 +      struct ds_udp_header *udp_hdr;
 +      union socket_addr addr;
 +      socklen_t addrlen = sizeof addr;
@@ -2271,21 +2235,18 @@ index a060f66..31b946a 100644
 +      if (!dest->ah || (dest->qpn != udp_hdr->qpn))
 +              rs_svc_create_ah(rs, dest, udp_hdr->qpn);
 +
-+      if (!rs_svc_comp_local_net(dest, udp_hdr)) {
-+
-+              ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
-+              if (!ret) {
-+                      if (!dest->ah)
-+                              rs_svc_create_ah(rs, dest, udp_hdr);
-+                      rs_send_udp_reply(rs, dest);
-+              }
-+      }
-+
++      /* to do: handle when dest local ip address doesn't match udp ip */
++      fastlock_acquire(&rs->slock);
++      cur_dest = rs->conn_dest;
 +      if (udp_hdr->op == RS_OP_DATA) {
-+              fastlock_acquire(&rs->slock);
-+              ret = dsend(rs, buf, len, flags);
-+              fastlock_release(&rs->slock);
++              rs->conn_dest = &dest->qp->dest;
++              rs_svc_forward(rs, buf, len, addr);
 +      }
++
++      rs->conn_dest = dest;
++      ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
++      rs->conn_dest = cur_dest;
++      fastlock_release(&rs->slock);
 +}
 +
 +static int rs_svc_run(void *arg)
diff --git a/patches/refresh-temp b/patches/refresh-temp
deleted file mode 100644 (file)
index 73933be..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-Bottom: 44e30115528e8117f25b73ebedc1c7f5ec687bb6
-Top:    ac38ccc22b5116e46418ba99f1924f1c35d49ffc
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-12-04 00:33:07 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 31b946a..018af90 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -2364,7 +2364,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
- }
- static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
--                          int iovcnt, int flags)
-+                          int iovcnt, int flags, uint8_t op)
- {
-       struct ds_udp_header hdr;
-       struct msghdr msg;
-@@ -2393,6 +2393,8 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-               memcpy(&miov[1], iov, sizeof *iov * iovcnt);
-       memset(&msg, 0, sizeof msg);
-+      msg.msg_name = rs->conn_dest->addr;
-+      msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
-       msg.msg_iov = miov;
-       msg.msg_iovlen = iovcnt + 1;
-       return sendmsg(rs->udp_sock, msg, flags);
-@@ -3710,21 +3712,37 @@ static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
-                 udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
- }
--static int rs_svc_comp_local_net(struct ds_dest *dest,
--                               struct ds_udp_header *udp_hdr)
-+static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
-+                         union socket_addr *src)
- {
--      return ((udp_hdr->version == 4) &&
--              (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
--              (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
--             ((udp_hdr->version == 6) &&
--              (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
--              (!memcmp(udp_hdr->addr.ipv6,
--                       &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
-+      struct ds_header hdr;
-+      struct ds_smsg *msg;
-+      struct ibv_sge sge;
-+      uint64_t offset;
-+
-+      if (!ds_can_send(rs)) {
-+              if (ds_get_comp(rs, 0, ds_can_send))
-+                      return;
-+      }
-+
-+      msg = rs->smsg_free;
-+      rs->smsg_free = msg->next;
-+      rs->sqe_avail--;
-+
-+      ds_format_hdr(&hdr, src);
-+      memcpy((void *) msg, &hdr, hdr.length);
-+      memcpy((void *) msg + hdr.length, buf, len);
-+      sge.addr = (uintptr_t) msg;
-+      sge.length = hdr.length + len;
-+      sge.lkey = rs->smr->lkey;
-+      offset = (uint8_t *) msg - rs->sbuf;
-+
-+      ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
- }
- static void rs_svc_process_rs(struct rsocket *rs)
- {
--      struct ds_dest *dest;
-+      struct ds_dest *dest, *cur_dest;
-       struct ds_udp_header *udp_hdr;
-       union socket_addr addr;
-       socklen_t addrlen = sizeof addr;
-@@ -3748,21 +3766,18 @@ static void rs_svc_process_rs(struct rsocket *rs)
-       if (!dest->ah || (dest->qpn != udp_hdr->qpn))
-               rs_svc_create_ah(rs, dest, udp_hdr->qpn);
--      if (!rs_svc_comp_local_net(dest, udp_hdr)) {
--
--              ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
--              if (!ret) {
--                      if (!dest->ah)
--                              rs_svc_create_ah(rs, dest, udp_hdr);
--                      rs_send_udp_reply(rs, dest);
--              }
--      }
--
-+      /* to do: handle when dest local ip address doesn't match udp ip */
-+      fastlock_acquire(&rs->slock);
-+      cur_dest = rs->conn_dest;
-       if (udp_hdr->op == RS_OP_DATA) {
--              fastlock_acquire(&rs->slock);
--              ret = dsend(rs, buf, len, flags);
--              fastlock_release(&rs->slock);
-+              rs->conn_dest = &dest->qp->dest;
-+              rs_svc_forward(rs, buf, len, addr);
-       }
-+
-+      rs->conn_dest = dest;
-+      ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
-+      rs->conn_dest = cur_dest;
-+      fastlock_release(&rs->slock);
- }
- static int rs_svc_run(void *arg)