]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of dsocket
authorSean Hefty <sean.hefty@intel.com>
Thu, 13 Dec 2012 00:49:44 +0000 (16:49 -0800)
committerSean Hefty <sean.hefty@intel.com>
Thu, 13 Dec 2012 00:49:44 +0000 (16:49 -0800)
src/cma.c
src/rsocket.c

index 0f589668bce186759cbc2b40875356cdb6eb91e9..ff9b426ccee3d0a840ae593da7473ec3a37c7618 100755 (executable)
--- a/src/cma.c
+++ b/src/cma.c
@@ -2238,6 +2238,7 @@ int ucma_max_qpsize(struct rdma_cm_id *id)
        if (id && id_priv->cma_dev) {
                max_size = id_priv->cma_dev->max_qpsize;
        } else {
+               ucma_init();
                for (i = 0; i < cma_dev_cnt; i++) {
                        if (!max_size || max_size > cma_dev_array[i].max_qpsize)
                                max_size = cma_dev_array[i].max_qpsize;
index c61d68937608504bc394827584010b384658b0e3..6fa4c6868b22db0ea0432bb35b9b40ae741ad7fd 100644 (file)
@@ -399,6 +399,7 @@ static int rs_add_to_svc(struct rsocket *rs)
 
        msg.op = RS_SVC_INSERT;
        msg.status = EINVAL;
+       printf("%s rs %p\n", __func__, rs);
        msg.rs = rs;
        write(svc_sock[0], &msg, sizeof msg);
        read(svc_sock[0], &msg, sizeof msg);
@@ -602,7 +603,9 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
                if (!ret && rs->state < rs_connected)
                        ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
        } else {
+               printf("%s set nonblock\n", __func__);
                ret = fcntl(rs->epfd, F_SETFL, arg);
+               printf("%s fcntl %d\n", __func__, ret);
 
                if (!ret && rs->qp_list) {
                        qp = rs->qp_list;
@@ -640,6 +643,8 @@ static void ds_set_qp_size(struct rsocket *rs)
 {
        uint16_t max_size;
 
+       printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
+                       rs->rq_size, rs->rbuf_size);
        max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
 
        if (rs->sq_size > max_size)
@@ -656,6 +661,8 @@ static void ds_set_qp_size(struct rsocket *rs)
                rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
        else
                rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
+       printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size,
+                       rs->rq_size, rs->rbuf_size);
 }
 
 static int rs_init_bufs(struct rsocket *rs)
@@ -728,15 +735,18 @@ static int ds_init_bufs(struct ds_qp *qp)
 static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
 {
        cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+       printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel);
        if (!cm_id->recv_cq_channel)
                return -1;
 
        cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
                                       cm_id, cm_id->recv_cq_channel, 0);
+       printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size);
        if (!cm_id->recv_cq)
                goto err1;
 
        if (rs->fd_flags & O_NONBLOCK) {
+               printf("%s set nonblock\n", __func__);
                if (rs_set_nonblocking(rs, O_NONBLOCK))
                        goto err2;
        }
@@ -876,6 +886,8 @@ static void ds_free_qp(struct ds_qp *qp)
 
 static void ds_free(struct rsocket *rs)
 {
+       struct ds_qp *qp;
+
        if (rs->state & (rs_readable | rs_writable))
                rs_remove_from_svc(rs);
 
@@ -888,12 +900,9 @@ static void ds_free(struct rsocket *rs)
        if (rs->dmsg)
                free(rs->dmsg);
 
-       if (rs->smsg_free)
-               free(rs->smsg_free);
-
-       while (rs->qp_list) {
-               ds_remove_qp(rs, rs->qp_list);
-               ds_free_qp(rs->qp_list);
+       while ((qp = rs->qp_list)) {
+               ds_remove_qp(rs, qp);
+               ds_free_qp(qp);
        }
 
        if (rs->epfd >= 0)
@@ -1016,6 +1025,40 @@ static int ds_init(struct rsocket *rs, int domain)
        return 0;
 }
 
+static int ds_init_ep(struct rsocket *rs)
+{
+       struct ds_smsg *msg;
+       int i, ret;
+
+       ds_set_qp_size(rs);
+
+       rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
+       if (!rs->sbuf)
+               return ERR(ENOMEM);
+
+       rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
+       if (!rs->dmsg)
+               return ERR(ENOMEM);
+
+       rs->sqe_avail = rs->sq_size;
+       rs->rqe_avail = rs->rq_size;
+
+       rs->smsg_free = (struct ds_smsg *) rs->sbuf;
+       msg = rs->smsg_free;
+       for (i = 0; i < rs->sq_size - 1; i++) {
+               msg->next = (void *) msg + RS_SNDLOWAT;
+               msg = msg->next;
+       }
+       msg->next = NULL;
+
+       ret = rs_add_to_svc(rs);
+       if (ret)
+               return ret;
+
+       rs->state = rs_readable | rs_writable;
+       return 0;
+}
+
 int rsocket(int domain, int type, int protocol)
 {
        struct rsocket *rs;
@@ -1040,6 +1083,7 @@ int rsocket(int domain, int type, int protocol)
                rs->cm_id->route.addr.src_addr.sa_family = domain;
                index = rs->cm_id->channel->fd;
        } else {
+               printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size);
                ret = ds_init(rs, domain);
                if (ret)
                        goto err;
@@ -1069,12 +1113,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
                if (!ret)
                        rs->state = rs_bound;
        } else {
-               ret = bind(rs->udp_sock, addr, addrlen);
-               if (!ret) {
-                       ret = rs_add_to_svc(rs);
-                       if (!ret)
-                               rs->state = rs_readable | rs_writable;
+               if (rs->state == rs_init) {
+                       ret = ds_init_ep(rs);
+                       if (ret)
+                               return ret;
                }
+               ret = bind(rs->udp_sock, addr, addrlen);
        }
        return ret;
 }
@@ -1256,41 +1300,6 @@ connected:
        return ret;
 }
 
-static int ds_init_ep(struct rsocket *rs)
-{
-       struct ds_smsg *msg;
-       int i, ret;
-
-       ds_set_qp_size(rs);
-
-       rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
-       if (!rs->sbuf)
-               return ERR(ENOMEM);
-
-       rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-       if (!rs->dmsg)
-               return ERR(ENOMEM);
-
-       rs->sbuf_bytes_avail = rs->sbuf_size;
-       rs->sqe_avail = rs->sq_size;
-       rs->rqe_avail = rs->rq_size;
-
-       rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-       msg = rs->smsg_free;
-       for (i = 0; i < rs->sq_size - 1; i++) {
-               msg->next = (void *) msg + i * RS_SNDLOWAT;
-               msg = msg->next;
-       }
-       msg->next = NULL;
-
-       ret = rs_add_to_svc(rs);
-       if (ret)
-               return ret;
-
-       rs->state = rs_readable | rs_writable;
-       return 0;
-}
-
 static int rs_any_addr(const union socket_addr *addr)
 {
        if (addr->sa.sa_family == AF_INET) {
@@ -1374,38 +1383,44 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr,
 }
 
 static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
-                       socklen_t addrlen, struct ds_qp **qp)
+                       socklen_t addrlen, struct ds_qp **new_qp)
 {
+       struct ds_qp *qp;
        struct ibv_qp_init_attr qp_attr;
        struct epoll_event event;
        int i, ret;
 
-       *qp = calloc(1, sizeof(struct ds_qp));
-       if (!*qp)
+printf("%s\n", __func__);
+       qp = calloc(1, sizeof(*qp));
+       if (!qp)
                return ERR(ENOMEM);
 
-       (*qp)->rs = rs;
-       ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
+       qp->rs = rs;
+       ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP);
+       printf("%s rdma_create_id %d\n", __func__, ret);
        if (ret)
                goto err;
 
-       ds_format_hdr(&(*qp)->hdr, src_addr);
-       ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
+       ds_format_hdr(&qp->hdr, src_addr);
+       ret = rdma_bind_addr(qp->cm_id, &src_addr->sa);
+       printf("%s rdma_bind_addr %d\n", __func__, ret);
        if (ret)
                goto err;
 
-       ret = ds_init_bufs(*qp);
+       ret = ds_init_bufs(qp);
+       printf("%s ds_init_bufs %d\n", __func__, ret);
        if (ret)
                goto err;
 
-       ret = rs_create_cq(rs, (*qp)->cm_id);
+       ret = rs_create_cq(rs, qp->cm_id);
+       printf("%s rs_create_cq %d\n", __func__, ret);
        if (ret)
                goto err;
 
        memset(&qp_attr, 0, sizeof qp_attr);
        qp_attr.qp_context = qp;
-       qp_attr.send_cq = rs->cm_id->send_cq;
-       qp_attr.recv_cq = rs->cm_id->recv_cq;
+       qp_attr.send_cq = qp->cm_id->send_cq;
+       qp_attr.recv_cq = qp->cm_id->recv_cq;
        qp_attr.qp_type = IBV_QPT_UD;
        qp_attr.sq_sig_all = 1;
        qp_attr.cap.max_send_wr = rs->sq_size;
@@ -1413,31 +1428,35 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
        qp_attr.cap.max_send_sge = 2;
        qp_attr.cap.max_recv_sge = 1;
        qp_attr.cap.max_inline_data = rs->sq_inline;
-       ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
+       ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr);
+       printf("%s rdma_create_qp %d\n", __func__, ret);
        if (ret)
                goto err;
 
-       ret = ds_add_qp_dest(*qp, src_addr, addrlen);
+       ret = ds_add_qp_dest(qp, src_addr, addrlen);
+       printf("%s ds_add_qp_dest %d\n", __func__, ret);
        if (ret)
                goto err;
 
        event.events = EPOLLIN;
-       event.data.ptr = *qp;
+       event.data.ptr = qp;
        ret = epoll_ctl(rs->epfd,  EPOLL_CTL_ADD,
-                       (*qp)->cm_id->recv_cq_channel->fd, &event);
+                       qp->cm_id->recv_cq_channel->fd, &event);
+       printf("%s epoll_ctl %d\n", __func__, ret);
        if (ret)
                goto err;
 
        for (i = 0; i < rs->rq_size; i++) {
-               ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
+               ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT);
                if (ret)
                        goto err;
        }
 
-       ds_insert_qp(rs, *qp);
+       ds_insert_qp(rs, qp);
+       *new_qp = qp;
        return 0;
 err:
-       ds_free_qp(*qp);
+       ds_free_qp(qp);
        return ret;
 }
 
@@ -1464,38 +1483,42 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
        union socket_addr src_addr;
        socklen_t src_len;
        struct ds_qp *qp;
+       struct ds_dest **tdest, *new_dest;
        int ret = 0;
 
+       printf("%s \n", __func__);
        fastlock_acquire(&rs->map_lock);
-       dest = tfind(addr, &rs->dest_map, ds_compare_addr);
-       if (dest)
-               goto out;
-
-       if (rs->state == rs_init) {
-               ret = ds_init_ep(rs);
-               if (ret)
-                       goto out;
-       }
+       tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+       printf("%s tfind %p\n", __func__, dest);
+       if (tdest)
+               goto found;
 
        ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+       printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno));
        if (ret)
                goto out;
 
        ret = ds_get_qp(rs, &src_addr, src_len, &qp);
+       printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno));
        if (ret)
                goto out;
 
-       if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
-               *dest = calloc(1, sizeof(struct ds_dest));
-               if (!*dest) {
+       tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+       if (!tdest) {
+               printf("%s adding dest into map\n", __func__);
+               new_dest = calloc(1, sizeof(*new_dest));
+               if (!new_dest) {
                        ret = ERR(ENOMEM);
                        goto out;
                }
 
-               memcpy(&(*dest)->addr, addr, addrlen);
-               (*dest)->qp = qp;
-               tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
+               memcpy(&new_dest->addr, addr, addrlen);
+               new_dest->qp = qp;
+               tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr);
        }
+
+found:
+       *dest = *tdest;
 out:
        fastlock_release(&rs->map_lock);
        return ret;
@@ -1511,10 +1534,19 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
                memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
                ret = rs_do_connect(rs);
        } else {
+               printf("%s\n", __func__);
+               if (rs->state == rs_init) {
+                       ret = ds_init_ep(rs);
+                       if (ret)
+                               return ret;
+               }
+
                fastlock_acquire(&rs->slock);
                ret = connect(rs->udp_sock, addr, addrlen);
+               printf("%s connect %d %s\n", __func__, ret, strerror(errno));
                if (!ret)
                        ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+               printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno));
                fastlock_release(&rs->slock);
        }
        return ret;
@@ -1983,14 +2015,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
        do {
                ds_poll_cqs(rs);
                if (test(rs)) {
-                       printf("%s test succeeded\n", __func__);
+//                     printf("%s test succeeded\n", __func__);
                        ret = 0;
                        break;
                } else if (nonblock) {
                        ret = ERR(EWOULDBLOCK);
-                       printf("%s nonblocking \n", __func__);
+//                     printf("%s nonblocking \n", __func__);
                } else if (!rs->cq_armed) {
-                       printf("%s req notify \n", __func__);
+//                     printf("%s req notify \n", __func__);
                        ds_req_notify_cqs(rs);
                        rs->cq_armed = 1;
                } else {
@@ -1998,14 +2030,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
                        fastlock_release(&rs->cq_lock);
 
                        ret = ds_get_cq_event(rs);
-                       printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
+//                     printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
                        fastlock_release(&rs->cq_wait_lock);
                        fastlock_acquire(&rs->cq_lock);
                }
        } while (!ret);
 
        fastlock_release(&rs->cq_lock);
-       printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+//     printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
        return ret;
 }
 
@@ -2017,7 +2049,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
 
        do {
                ret = ds_process_cqs(rs, 1, test);
-               printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+//             printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
                if (!ret || nonblock || errno != EWOULDBLOCK)
                        return ret;
 
@@ -2132,16 +2164,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
        struct ds_header *hdr;
        int ret;
 
-ret = 0;
-       printf("%s \n", __func__);
+//     printf("%s \n", __func__);
        if (!(rs->state & rs_readable))
                return ERR(EINVAL);
 
        if (!rs_have_rdata(rs)) {
-               printf("%s need rdata \n", __func__);
+//             printf("%s need rdata \n", __func__);
                ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
                                  rs_have_rdata);
-               printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
+//             printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
                if (ret)
                        return ret;
        }
@@ -2161,6 +2192,7 @@ ret = 0;
                        rs->rmsg_head = 0;
        }
 
+       printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
        return len;
 }
 
@@ -2392,12 +2424,14 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
        struct ds_udp_header hdr;
        struct msghdr msg;
        struct iovec miov[8];
+       ssize_t ret;
 
+//     printf("%s\n", __func__);
        if (iovcnt > 8)
                return ERR(ENOTSUP);
 
        hdr.tag = htonl(DS_UDP_TAG);
-       hdr.version = 1;
+       hdr.version = rs->conn_dest->qp->hdr.version;
        hdr.op = op;
        hdr.reserved = 0;
        hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
@@ -2419,18 +2453,24 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
        msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
        msg.msg_iov = miov;
        msg.msg_iovlen = iovcnt + 1;
-       return sendmsg(rs->udp_sock, &msg, flags);
+//     printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+       ret = sendmsg(rs->udp_sock, &msg, flags);
+       printf("%s ret %d %s\n", __func__, ret, strerror(errno));
+       return ret > 0 ? ret - sizeof hdr : ret;
 }
 
 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
                           int flags, uint8_t op)
 {
        struct iovec iov;
+       printf("%s\n", __func__);
        if (buf && len) {
+//             printf("%s have buffer\n", __func__);
                iov.iov_base = (void *) buf;
                iov.iov_len = len;
                return ds_sendv_udp(rs, &iov, 1, flags, op);
        } else {
+//             printf("%s no buffer\n", __func__);
                return ds_sendv_udp(rs, NULL, 0, flags, op);
        }
 }
@@ -2442,6 +2482,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
        uint64_t offset;
        int ret = 0;
 
+       printf("%s\n", __func__);
        if (!rs->conn_dest->ah)
                return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
 
@@ -2563,6 +2604,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
        struct rsocket *rs;
        int ret;
 
+       printf("%s\n", __func__);
        rs = idm_at(&idm, socket);
        if (rs->type == SOCK_STREAM) {
                if (dest_addr || addrlen)
@@ -2571,12 +2613,23 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                return rsend(socket, buf, len, flags);
        }
 
+       if (rs->state == rs_init) {
+               ret = ds_init_ep(rs);
+               if (ret)
+                       return ret;
+       }
+
        fastlock_acquire(&rs->slock);
+       printf("%s check conn dest\n", __func__);
        if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
+               printf("%s need conn dest\n", __func__);
                ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
                if (ret)
                        goto out;
        }
+       else
+               printf("%s connected\n", __func__);
+
        ret = dsend(rs, buf, len, flags);
 out:
        fastlock_release(&rs->slock);
@@ -3605,9 +3658,11 @@ static int rs_svc_add_rs(struct rsocket *rs)
        }
 
        svc_rss[++svc_cnt] = rs;
+       printf("%s rs %p\n", __func__, rs);
        svc_fds[svc_cnt].fd = rs->udp_sock;
        svc_fds[svc_cnt].events = POLLIN;
        svc_fds[svc_cnt].revents = 0;
+       printf("add rs udp sock %d\n",rs->udp_sock);
        return 0;
 }
 
@@ -3631,6 +3686,7 @@ static void rs_svc_process_sock(void)
        struct rs_svc_msg msg;
 
        read(svc_sock[1], &msg, sizeof msg);
+       printf("%s op %d\n",__func__, msg.op);
        switch (msg.op) {
        case RS_SVC_INSERT:
                msg.status = rs_svc_add_rs(msg.rs);
@@ -3642,6 +3698,7 @@ static void rs_svc_process_sock(void)
                msg.status = ENOTSUP;
                break;
        }
+       printf("%s status %d\n",__func__, msg.status);
        write(svc_sock[1], &msg, sizeof msg);
 }
 
@@ -3675,6 +3732,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
        struct ibv_ah_attr attr;
        int ret;
 
+       printf("%s\n",__func__);
        if (dest->ah) {
                fastlock_acquire(&rs->slock);
                ibv_destroy_ah(dest->ah);
@@ -3726,7 +3784,18 @@ out:
 static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
                                union socket_addr *addr)
 {
-       return (udp_hdr->tag == DS_UDP_TAG) &&
+printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag,
+       udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length);
+
+printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n",
+udp_hdr->tag == ntohl(DS_UDP_TAG),
+       udp_hdr->version == 4, addr->sa.sa_family == AF_INET,
+         udp_hdr->length == DS_UDP_IPV4_HDR_LEN,
+        udp_hdr->version == 6, addr->sa.sa_family == AF_INET6,
+         udp_hdr->length == DS_UDP_IPV6_HDR_LEN);
+
+
+       return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
                ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
                  udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
                 (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
@@ -3741,6 +3810,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
        struct ibv_sge sge;
        uint64_t offset;
 
+       printf("%s\n",__func__);
        if (!ds_can_send(rs)) {
                if (ds_get_comp(rs, 0, ds_can_send))
                        return;
@@ -3769,7 +3839,9 @@ static void rs_svc_process_rs(struct rsocket *rs)
        socklen_t addrlen = sizeof addr;
        int len, ret;
 
+       printf("%s\n",__func__);
        ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+       printf("%s recvfrom %d\n",__func__, ret);
        if (ret < DS_UDP_IPV4_HDR_LEN)
                return;
 
@@ -3777,10 +3849,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
        if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
                return;
 
+       printf("%s valid hdr\n",__func__);
        len = ret - udp_hdr->length;
        udp_hdr->tag = ntohl(udp_hdr->tag);
        udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
        ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+       printf("%s ds_get_dest %d\n",__func__, ret);
        if (ret)
                return;
 
@@ -3792,10 +3866,12 @@ static void rs_svc_process_rs(struct rsocket *rs)
        cur_dest = rs->conn_dest;
        if (udp_hdr->op == RS_OP_DATA) {
                rs->conn_dest = &dest->qp->dest;
+               printf("%s forwarding msg\n",__func__);
                rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
        }
 
        rs->conn_dest = dest;
+       printf("%s sending resp\n",__func__);
        ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL);
        rs->conn_dest = cur_dest;
        fastlock_release(&rs->slock);
@@ -3806,6 +3882,7 @@ static void *rs_svc_run(void *arg)
        struct rs_svc_msg msg;
        int i, ret;
 
+       printf("%s\n",__func__);
        ret = rs_svc_grow_sets();
        if (ret) {
                msg.status = ret;
@@ -3816,10 +3893,13 @@ static void *rs_svc_run(void *arg)
        svc_fds[0].fd = svc_sock[1];
        svc_fds[0].events = POLLIN;
        do {
+               printf("%s svc cnt %d\n",__func__, svc_cnt);
                for (i = 0; i <= svc_cnt; i++)
                        svc_fds[i].revents = 0;
 
+               printf("%s poll\n",__func__);
                poll(svc_fds, svc_cnt + 1, -1);
+               printf("%s poll done\n",__func__);
                if (svc_fds[0].revents)
                        rs_svc_process_sock();
 
@@ -3827,7 +3907,7 @@ static void *rs_svc_run(void *arg)
                        if (svc_fds[i].revents)
                                rs_svc_process_rs(svc_rss[i]);
                }
-       } while (svc_cnt > 1);
+       } while (svc_cnt >= 1);
 
        return NULL;
 }