]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
refresh (create temporary patch)
authorSean Hefty <sean.hefty@intel.com>
Mon, 17 Dec 2012 21:41:21 +0000 (13:41 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 17 Dec 2012 21:41:21 +0000 (13:41 -0800)
meta
patches/refresh-temp [new file with mode: 0644]

diff --git a/meta b/meta
index e88c212f06159bae1af52a3b96a495390789fff3..9c4fe862097e0e5a382f3ddc809c709dfff9afce 100644 (file)
--- a/meta
+++ b/meta
@@ -1,8 +1,9 @@
 Version: 1
-Previous: 95f8b8706a0c8c03133bf2f27aeb4860b1835bde
-Head: 9a45fe716b7886d96ac080929ecba4eeb1f12d0f
+Previous: 42a80d310cd3e2086e62e4a02d65302a705b5031
+Head: 5ecf5331f42951a4a7ead43657894630be857283
 Applied:
   dsocket: 9a45fe716b7886d96ac080929ecba4eeb1f12d0f
+  refresh-temp: 5ecf5331f42951a4a7ead43657894630be857283
 Unapplied:
   udpong: a003dda8e0d2f6ea134932bdd8bf5a5f29b7bfce
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
diff --git a/patches/refresh-temp b/patches/refresh-temp
new file mode 100644 (file)
index 0000000..0f6cf00
--- /dev/null
@@ -0,0 +1,594 @@
+Bottom: f8258dbc93f14acdb67621e51c1696fca92c841a
+Top:    89a1bd159135ab5aae205202e62cdc91b9ab3b0d
+Author: Sean Hefty <sean.hefty@intel.com>
+Date:   2012-12-17 13:41:21 -0800
+
+Refresh of dsocket
+
+---
+
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 4631b1d..219aa4a 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -66,23 +66,14 @@
+ static struct index_map idm;
+ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+-enum {
+-      RS_SVC_INSERT,
+-      RS_SVC_REMOVE
+-};
+-
+ struct rsocket;
+-
+-#define PRINTADDR(a) \
+-printf("%s port %x ip %x\n", __func__, \
+-      ((struct sockaddr_in *)a)->sin_port, \
+-      ((struct sockaddr_in *)a)->sin_addr.s_addr)
+-
+-
++enum {
++      RS_SVC_DGRAM = 1 << 0
++};
+ struct rs_svc_msg {
+-      uint32_t op;
++      uint32_t svcs;
+       uint32_t status;
+       struct rsocket *rs;
+ };
+@@ -318,6 +309,7 @@ struct rsocket {
+               };
+       };
++      int               svcs;
+       int               opts;
+       long              fd_flags;
+       uint64_t          so_opts;
+@@ -387,7 +379,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
+       }
+ }
+-static int rs_add_to_svc(struct rsocket *rs)
++static int rs_modify_svcs(struct rsocket *rs, int svcs)
+ {
+       struct rs_svc_msg msg;
+       int ret;
+@@ -396,58 +388,60 @@ static int rs_add_to_svc(struct rsocket *rs)
+       if (!svc_cnt) {
+               ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
+               if (ret)
+-                      goto err1;
++                      goto unlock;
+               ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+               if (ret) {
+                       ret = ERR(ret);
+-                      goto err2;
++                      goto closepair;
+               }
+       }
+-      msg.op = RS_SVC_INSERT;
++      msg.svcs = svcs;
+       msg.status = EINVAL;
+       msg.rs = rs;
+       write(svc_sock[0], &msg, sizeof msg);
+       read(svc_sock[0], &msg, sizeof msg);
+       ret = rdma_seterrno(msg.status);
+-      if (ret && !svc_cnt)
+-              goto err3;
++      if (svc_cnt)
++              goto unlock;
++//    if (ret && !svc_cnt)
++//            goto join;
++//
++//    pthread_mutex_unlock(&mut);
++//    return ret;
+-      pthread_mutex_unlock(&mut);
+-      return ret;
+-
+-err3:
+       pthread_join(svc_id, NULL);
+-err2:
++closepair:
+       close(svc_sock[0]);
+       close(svc_sock[1]);
+-err1:
++unlock:
+       pthread_mutex_unlock(&mut);
+       return ret;
+ }
+-static int rs_remove_from_svc(struct rsocket *rs)
+-{
+-      struct rs_svc_msg msg;
+-      int ret;
+-
+-      pthread_mutex_lock(&mut);
+-      msg.op = RS_SVC_REMOVE;
+-      msg.status = EINVAL;
+-      msg.rs = rs;
+-      write(svc_sock[0], &msg, sizeof msg);
+-      read(svc_sock[0], &msg, sizeof msg);
+-      ret = ERR(msg.status);
+-      if (!svc_cnt) {
+-              pthread_join(svc_id, NULL);
+-              close(svc_sock[0]);
+-              close(svc_sock[1]);
+-      }
+-
+-      pthread_mutex_unlock(&mut);
+-      return ret;
+-}
++//static void rs_remove_from_svc(struct rsocket *rs)
++//{
++//    struct rs_svc_msg msg;
++//    int ret;
++//
++//    pthread_mutex_lock(&mut);
++//    if (svc_cnt) {
++//            msg.op = RS_SVC_REMOVE;
++//            msg.status = EINVAL;
++//            msg.rs = rs;
++//            write(svc_sock[0], &msg, sizeof msg);
++//            read(svc_sock[0], &msg, sizeof msg);
++//    }
++//
++//    if (!svc_cnt) {
++//            pthread_join(svc_id, NULL);
++//            close(svc_sock[0]);
++//            close(svc_sock[1]);
++//    }
++//
++//    pthread_mutex_unlock(&mut);
++//}
+ static int ds_compare_addr(const void *dst1, const void *dst2)
+ {
+@@ -459,7 +453,6 @@ static int ds_compare_addr(const void *dst1, const void *dst2)
+       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
+-//    printf("%s len %d sizeof sin %d memcmp %d\n", __func__, len, sizeof(struct sockaddr_in), memcmp(dst1,dst2,len));
+       return memcmp(dst1, dst2, len);
+ }
+@@ -893,9 +886,6 @@ static void ds_free(struct rsocket *rs)
+ {
+       struct ds_qp *qp;
+-      if (rs->state & (rs_readable | rs_writable))
+-              rs_remove_from_svc(rs);
+-
+       if (rs->udp_sock >= 0)
+               close(rs->udp_sock);
+@@ -1056,7 +1046,7 @@ static int ds_init_ep(struct rsocket *rs)
+       }
+       msg->next = NULL;
+-      ret = rs_add_to_svc(rs);
++      ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
+       if (ret)
+               return ret;
+@@ -1322,10 +1312,8 @@ static int ds_get_src_addr(struct rsocket *rs,
+       int sock, ret;
+       uint16_t port;
+-//    printf("dest: "); PRINTADDR(dest_addr);
+-      *src_len = sizeof src_addr;
++      *src_len = sizeof *src_addr;
+       ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
+-//    printf("src: "); PRINTADDR(src_addr);
+       if (ret || !rs_any_addr(src_addr))
+               return ret;
+@@ -1338,10 +1326,9 @@ static int ds_get_src_addr(struct rsocket *rs,
+       if (ret)
+               goto out;
+-      *src_len = sizeof src_addr;
++      *src_len = sizeof *src_addr;
+       ret = getsockname(sock, &src_addr->sa, src_len);
+       src_addr->sin.sin_port = port;
+-//    printf("selected src: ");
+ out:
+       close(sock);
+       return ret;
+@@ -1350,7 +1337,6 @@ out:
+ static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
+ {
+       if (addr->sa.sa_family == AF_INET) {
+-//            PRINTADDR(addr);
+               hdr->version = 4;
+               hdr->length = DS_IPV4_HDR_LEN;
+               hdr->port = addr->sin.sin_port;
+@@ -1371,7 +1357,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
+       struct ibv_ah_attr attr;
+       int ret;
+-//    printf("%s\n", __func__);
+       memcpy(&qp->dest.addr, addr, addrlen);
+       qp->dest.qp = qp;
+       qp->dest.qpn = qp->cm_id->qp->qp_num;
+@@ -1384,8 +1369,6 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
+       attr.dlid = port_attr.lid;
+       attr.port_num = qp->cm_id->port_num;
+       qp->dest.ah = ibv_create_ah(qp->cm_id->pd, &attr);
+-//    printf("%s ah %p lid %x port %d qpn %x\n", __func__, qp->dest.ah, attr.dlid,
+-//            attr.port_num, qp->dest.qpn);
+       if (!qp->dest.ah)
+               return ERR(ENOMEM);
+@@ -1401,7 +1384,6 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+       struct epoll_event event;
+       int i, ret;
+-//    PRINTADDR(src_addr);
+       qp = calloc(1, sizeof(*qp));
+       if (!qp)
+               return ERR(ENOMEM);
+@@ -1490,14 +1472,12 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
+       struct ds_dest **tdest, *new_dest;
+       int ret = 0;
+-//    PRINTADDR(addr);
+       fastlock_acquire(&rs->map_lock);
+       tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+       if (tdest)
+               goto found;
+       ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+-//    printf("get src: "); PRINTADDR(&src_addr);
+       if (ret)
+               goto out;
+@@ -1542,7 +1522,6 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+               }
+               fastlock_acquire(&rs->slock);
+-//            PRINTADDR(addr);
+               ret = connect(rs->udp_sock, addr, addrlen);
+               if (!ret)
+                       ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+@@ -1604,8 +1583,6 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
+       wr.wr.ud.ah = rs->conn_dest->ah;
+       wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+       wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
+-//    printf("%s ah %p qpn %x\n", __func__, rs->conn_dest->ah,
+-//            rs->conn_dest->qpn);
+       return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+ }
+@@ -1935,7 +1912,6 @@ static void ds_poll_cqs(struct rsocket *rs)
+                       if (ds_wr_is_recv(wc.wr_id)) {
+                               if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
+                                   ds_valid_recv(qp, &wc)) {
+-//                                    printf("%s recv over QP\n", __func__);
+                                       rs->rqe_avail--;
+                                       rmsg = &rs->dmsg[rs->rmsg_tail];
+                                       rmsg->qp = qp;
+@@ -1944,13 +1920,11 @@ static void ds_poll_cqs(struct rsocket *rs)
+                                       if (++rs->rmsg_tail == rs->rq_size + 1)
+                                               rs->rmsg_tail = 0;
+                               } else {
+-//                                    printf("%s invalid recv\n", __func__);
+                                       ds_post_recv(rs, qp, ds_wr_offset(wc.wr_id));
+                               }
+                       } else {
+                               smsg = (struct ds_smsg *)
+                                      (rs->sbuf + ds_wr_offset(wc.wr_id));
+-//                            printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
+                               smsg->next = rs->smsg_free;
+                               rs->smsg_free = smsg;
+                               rs->sqe_avail++;
+@@ -1993,9 +1967,7 @@ static int ds_get_cq_event(struct rsocket *rs)
+       if (!rs->cq_armed)
+               return 0;
+-//    printf("wait for epoll event\n");
+       ret = epoll_wait(rs->epfd, &event, 1, -1);
+-//    printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
+       if (ret <= 0)
+               return ret;
+@@ -2012,46 +1984,35 @@ static int ds_get_cq_event(struct rsocket *rs)
+ static int rs_have_rdata(struct rsocket *rs);
+ static int ds_can_send(struct rsocket *rs);
++static int rs_poll_all(struct rsocket *rs);
++static int ds_all_sends_done(struct rsocket *rs);
+ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+       int ret = 0;
+-      if (test == rs_have_rdata)
+-              printf("%s test rs_have_rdata\n", __func__);
+-      else if (test == ds_can_send)
+-              printf("%s test ds_can_send\n", __func__);
+-      else
+-              printf("%s test ?\n", __func__);
+-
+       fastlock_acquire(&rs->cq_lock);
+       do {
+               ds_poll_cqs(rs);
+               if (test(rs)) {
+-                      printf("%s test succeeded\n", __func__);
+                       ret = 0;
+                       break;
+               } else if (nonblock) {
+                       ret = ERR(EWOULDBLOCK);
+-                      printf("%s nonblocking \n", __func__);
+               } else if (!rs->cq_armed) {
+-                      printf("%s req notify \n", __func__);
+                       ds_req_notify_cqs(rs);
+                       rs->cq_armed = 1;
+               } else {
+                       fastlock_acquire(&rs->cq_wait_lock);
+                       fastlock_release(&rs->cq_lock);
+-                      printf("%s wait for event \n", __func__);
+                       ret = ds_get_cq_event(rs);
+-                      printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
+                       fastlock_release(&rs->cq_wait_lock);
+                       fastlock_acquire(&rs->cq_lock);
+               }
+       } while (!ret);
+       fastlock_release(&rs->cq_lock);
+-//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+       return ret;
+ }
+@@ -2063,7 +2024,6 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+       do {
+               ret = ds_process_cqs(rs, 1, test);
+-//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+               if (!ret || nonblock || errno != EWOULDBLOCK)
+                       return ret;
+@@ -2178,15 +2138,12 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
+       struct ds_header *hdr;
+       int ret;
+-//    printf("%s \n", __func__);
+       if (!(rs->state & rs_readable))
+               return ERR(EINVAL);
+       if (!rs_have_rdata(rs)) {
+-//            printf("%s need rdata \n", __func__);
+               ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+                                 rs_have_rdata);
+-//            printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+               if (ret)
+                       return ret;
+       }
+@@ -2198,10 +2155,7 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
+       memcpy(buf, (void *) hdr + hdr->length, len);
+       if (addrlen)
+-{
+               ds_set_src(src_addr, addrlen, hdr);
+-//PRINTADDR(src_addr);
+-}
+       if (!(flags & MSG_PEEK)) {
+               ds_post_recv(rs, rmsg->qp, rmsg->offset);
+@@ -2210,7 +2164,6 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
+               rs->rqe_avail++;
+       }
+-//    printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+       return len;
+ }
+@@ -2444,7 +2397,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+       struct iovec miov[8];
+       ssize_t ret;
+-//    printf("%s\n", __func__);
+       if (iovcnt > 8)
+               return ERR(ENOTSUP);
+@@ -2471,7 +2423,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+       msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
+       msg.msg_iov = miov;
+       msg.msg_iovlen = iovcnt + 1;
+-//    printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+       ret = sendmsg(rs->udp_sock, &msg, flags);
+       return ret > 0 ? ret - hdr.length : ret;
+ }
+@@ -2480,14 +2431,11 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
+                          int flags, uint8_t op)
+ {
+       struct iovec iov;
+-//    printf("%s\n", __func__);
+       if (buf && len) {
+-//            printf("%s have buffer\n", __func__);
+               iov.iov_base = (void *) buf;
+               iov.iov_len = len;
+               return ds_sendv_udp(rs, &iov, 1, flags, op);
+       } else {
+-//            printf("%s no buffer\n", __func__);
+               return ds_sendv_udp(rs, NULL, 0, flags, op);
+       }
+ }
+@@ -2499,14 +2447,11 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+       uint64_t offset;
+       int ret = 0;
+-//    printf("%s\n", __func__);
+       if (!rs->conn_dest->ah)
+               return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
+       if (!ds_can_send(rs)) {
+-              printf("can't send\n");
+               ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
+-              printf("ds_get_comp %d\n", ret);
+               if (ret)
+                       return ret;
+       }
+@@ -2522,9 +2467,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+       sge.lkey = rs->conn_dest->qp->smr->lkey;
+       offset = (uint8_t *) msg - rs->sbuf;
+-//    printf("%s - sending over QP\n", __func__);
+       ret = ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
+-//    printf("%s - ds_post_send %d %s\n", __func__, ret, strerror(errno));
+       return ret ? ret : len;
+ }
+@@ -2625,8 +2568,6 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
+       struct rsocket *rs;
+       int ret;
+-//    PRINTADDR(dest_addr);
+-//    printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
+       rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_STREAM) {
+               if (dest_addr || addrlen)
+@@ -2642,28 +2583,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
+       }
+       fastlock_acquire(&rs->slock);
+-//    printf("%s - checking conn dest %p\n", __func__, rs->conn_dest);
+-//    printf("dest addr: af %d", dest_addr->sa_family); PRINTADDR(dest_addr);
+-//    if (rs->conn_dest) {
+-//            int i;
+-//            printf("conn addr: af %d", rs->conn_dest->addr.sa.sa_family); PRINTADDR(&rs->conn_dest->addr);
+-//            for (i=0;i<16;i++)
+-//                    printf("%x", ((uint8_t *)dest_addr)[i]);
+-//            printf("\n");
+-//            for (i=0;i<16;i++)
+-//                    printf("%x", ((uint8_t *)&rs->conn_dest->addr)[i]);
+-//            printf("\n");
+-//    }
+       if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
+-//            printf("%s - getting conn dest\n", __func__);
+               ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
+-//            printf("%s - get conn dest %d %s\n", __func__, ret, strerror(errno));
+               if (ret)
+                       goto out;
+       }
+       ret = dsend(rs, buf, len, flags);
+-//    printf("%s - dsend %d %s\n", __func__, ret, strerror(errno));
+ out:
+       fastlock_release(&rs->slock);
+       return ret;
+@@ -3130,6 +3056,9 @@ int rshutdown(int socket, int how)
+ static void ds_shutdown(struct rsocket *rs)
+ {
++      if (rs->svcs)
++              rs_modify_svcs(rs, 0);
++
+       if (rs->fd_flags & O_NONBLOCK)
+               rs_set_nonblocking(rs, 0);
+@@ -3718,17 +3647,14 @@ static void rs_svc_process_sock(void)
+       struct rs_svc_msg msg;
+       read(svc_sock[1], &msg, sizeof msg);
+-      switch (msg.op) {
+-      case RS_SVC_INSERT:
++      if (msg.svcs & RS_SVC_DGRAM) {
+               msg.status = rs_svc_add_rs(msg.rs);
+-              break;
+-      case RS_SVC_REMOVE:
++      } else if (!msg.svcs) {
+               msg.status = rs_svc_rm_rs(msg.rs);
+-              break;
+-      default:
+-              msg.status = ENOTSUP;
+-              break;
+       }
++
++      if (!msg.status)
++              msg.rs->svcs = msg.svcs;
+       write(svc_sock[1], &msg, sizeof msg);
+ }
+@@ -3828,7 +3754,6 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
+       struct ibv_sge sge;
+       uint64_t offset;
+-//    PRINTADDR(src);
+       if (!ds_can_send(rs)) {
+               if (ds_get_comp(rs, 0, ds_can_send))
+                       return;
+@@ -3839,18 +3764,13 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
+       rs->sqe_avail--;
+       ds_format_hdr(&hdr, src);
+-//    printf("%s hdr ver %d length %d port %x\n", __func__, hdr.version,
+-//                    hdr.length, hdr.port);
+       memcpy((void *) msg, &hdr, hdr.length);
+       memcpy((void *) msg + hdr.length, buf, len);
+-//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)buf));
+       sge.addr = (uintptr_t) msg;
+       sge.length = hdr.length + len;
+       sge.lkey = rs->conn_dest->qp->smr->lkey;
+       offset = (uint8_t *) msg - rs->sbuf;
+-//    printf("%s ver %d length %d port %x\n", __func__, ((struct ds_header *) msg)->version,
+-//                    ((struct ds_header *) msg)->length, ((struct ds_header *) msg)->port);
+       ds_post_send(rs, &sge, ds_send_wr_id(offset, sge.length));
+ }
+@@ -3863,8 +3783,6 @@ static void rs_svc_process_rs(struct rsocket *rs)
+       int len, ret;
+       ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+-//    PRINTADDR(&addr);
+-//    printf("%s received data 0x%x\n", __func__, *((uint32_t*)&svc_buf[8]));
+       if (ret < DS_UDP_IPV4_HDR_LEN)
+               return;
+@@ -3879,22 +3797,27 @@ static void rs_svc_process_rs(struct rsocket *rs)
+       if (ret)
+               return;
++      if (udp_hdr->op == RS_OP_DATA) {
++              fastlock_acquire(&rs->slock);
++              cur_dest = rs->conn_dest;
++              rs->conn_dest = dest;
++              ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
++              rs->conn_dest = cur_dest;
++              fastlock_release(&rs->slock);
++      }
++
+       if (!dest->ah || (dest->qpn != udp_hdr->qpn))
+               rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+       /* to do: handle when dest local ip address doesn't match udp ip */
+-      if (udp_hdr->op != RS_OP_DATA)
+-              return;
+-
+-      fastlock_acquire(&rs->slock);
+-      cur_dest = rs->conn_dest;
+-      rs->conn_dest = &dest->qp->dest;
+-      rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
+-
+-      rs->conn_dest = dest;
+-      ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL);
+-      rs->conn_dest = cur_dest;
+-      fastlock_release(&rs->slock);
++      if (udp_hdr->op == RS_OP_DATA) {
++              fastlock_acquire(&rs->slock);
++              cur_dest = rs->conn_dest;
++              rs->conn_dest = &dest->qp->dest;
++              rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
++              rs->conn_dest = cur_dest;
++              fastlock_release(&rs->slock);
++      }
+ }
+ static void *rs_svc_run(void *arg)