From 39d8533b49387b416ce7c88b8f665fe1e3b784cc Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 12 Dec 2012 16:49:44 -0800 Subject: [PATCH] Refresh of dsocket --- src/cma.c | 1 + src/rsocket.c | 266 ++++++++++++++++++++++++++++++++------------------ 2 files changed, 174 insertions(+), 93 deletions(-) diff --git a/src/cma.c b/src/cma.c index 0f589668..ff9b426c 100755 --- a/src/cma.c +++ b/src/cma.c @@ -2238,6 +2238,7 @@ int ucma_max_qpsize(struct rdma_cm_id *id) if (id && id_priv->cma_dev) { max_size = id_priv->cma_dev->max_qpsize; } else { + ucma_init(); for (i = 0; i < cma_dev_cnt; i++) { if (!max_size || max_size > cma_dev_array[i].max_qpsize) max_size = cma_dev_array[i].max_qpsize; diff --git a/src/rsocket.c b/src/rsocket.c index c61d6893..6fa4c686 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -399,6 +399,7 @@ static int rs_add_to_svc(struct rsocket *rs) msg.op = RS_SVC_INSERT; msg.status = EINVAL; + printf("%s rs %p\n", __func__, rs); msg.rs = rs; write(svc_sock[0], &msg, sizeof msg); read(svc_sock[0], &msg, sizeof msg); @@ -602,7 +603,9 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg) if (!ret && rs->state < rs_connected) ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); } else { + printf("%s set nonblock\n", __func__); ret = fcntl(rs->epfd, F_SETFL, arg); + printf("%s fcntl %d\n", __func__, ret); if (!ret && rs->qp_list) { qp = rs->qp_list; @@ -640,6 +643,8 @@ static void ds_set_qp_size(struct rsocket *rs) { uint16_t max_size; + printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, + rs->rq_size, rs->rbuf_size); max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE); if (rs->sq_size > max_size) @@ -656,6 +661,8 @@ static void ds_set_qp_size(struct rsocket *rs) rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; else rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; + printf("rsocket sq %d buf %d rq %d buf %d\n", rs->sq_size, rs->sbuf_size, + rs->rq_size, rs->rbuf_size); } static int rs_init_bufs(struct rsocket *rs) @@ -728,15 +735,18 @@ static int ds_init_bufs(struct ds_qp *qp) static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) { cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); + printf("%s create comp_channel %p\n", __func__, cm_id->recv_cq_channel); if (!cm_id->recv_cq_channel) return -1; cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, cm_id, cm_id->recv_cq_channel, 0); + printf("%s create cq %p size %d\n", __func__, cm_id->recv_cq, rs->sq_size + rs->rq_size); if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { + printf("%s set nonblock\n", __func__); if (rs_set_nonblocking(rs, O_NONBLOCK)) goto err2; } @@ -876,6 +886,8 @@ static void ds_free_qp(struct ds_qp *qp) static void ds_free(struct rsocket *rs) { + struct ds_qp *qp; + if (rs->state & (rs_readable | rs_writable)) rs_remove_from_svc(rs); @@ -888,12 +900,9 @@ static void ds_free(struct rsocket *rs) if (rs->dmsg) free(rs->dmsg); - if (rs->smsg_free) - free(rs->smsg_free); - - while (rs->qp_list) { - ds_remove_qp(rs, rs->qp_list); - ds_free_qp(rs->qp_list); + while ((qp = rs->qp_list)) { + ds_remove_qp(rs, qp); + ds_free_qp(qp); } if (rs->epfd >= 0) @@ -1016,6 +1025,40 @@ static int ds_init(struct rsocket *rs, int domain) return 0; } +static int ds_init_ep(struct rsocket *rs) +{ + struct ds_smsg *msg; + int i, ret; + + ds_set_qp_size(rs); + + rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); + if (!rs->sbuf) + return ERR(ENOMEM); + + rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); + if (!rs->dmsg) + return ERR(ENOMEM); + + rs->sqe_avail = rs->sq_size; + rs->rqe_avail = rs->rq_size; + + rs->smsg_free = (struct ds_smsg *) rs->sbuf; + msg = rs->smsg_free; + for (i = 0; i < rs->sq_size - 1; i++) { + msg->next = (void *) msg + RS_SNDLOWAT; + msg = msg->next; + } + msg->next = NULL; + + ret = rs_add_to_svc(rs); + if (ret) + return ret; + + rs->state = rs_readable | rs_writable; + return 0; +} + int rsocket(int domain, int type, int protocol) { struct rsocket *rs; @@ -1040,6 +1083,7 @@ int rsocket(int domain, int type, int protocol) rs->cm_id->route.addr.src_addr.sa_family = domain; index = rs->cm_id->channel->fd; } else { + printf("rsocket sq %d rq %d\n", rs->sq_size, rs->rq_size); ret = ds_init(rs, domain); if (ret) goto err; @@ -1069,12 +1113,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) if (!ret) rs->state = rs_bound; } else { - ret = bind(rs->udp_sock, addr, addrlen); - if (!ret) { - ret = rs_add_to_svc(rs); - if (!ret) - rs->state = rs_readable | rs_writable; + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) + return ret; } + ret = bind(rs->udp_sock, addr, addrlen); } return ret; } @@ -1256,41 +1300,6 @@ connected: return ret; } -static int ds_init_ep(struct rsocket *rs) -{ - struct ds_smsg *msg; - int i, ret; - - ds_set_qp_size(rs); - - rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); - if (!rs->sbuf) - return ERR(ENOMEM); - - rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); - if (!rs->dmsg) - return ERR(ENOMEM); - - rs->sbuf_bytes_avail = rs->sbuf_size; - rs->sqe_avail = rs->sq_size; - rs->rqe_avail = rs->rq_size; - - rs->smsg_free = (struct ds_smsg *) rs->sbuf; - msg = rs->smsg_free; - for (i = 0; i < rs->sq_size - 1; i++) { - msg->next = (void *) msg + i * RS_SNDLOWAT; - msg = msg->next; - } - msg->next = NULL; - - ret = rs_add_to_svc(rs); - if (ret) - return ret; - - rs->state = rs_readable | rs_writable; - return 0; -} - static int rs_any_addr(const union socket_addr *addr) { if (addr->sa.sa_family == AF_INET) { @@ -1374,38 +1383,44 @@ static int ds_add_qp_dest(struct ds_qp *qp, union socket_addr *addr, } static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, - socklen_t addrlen, struct ds_qp **qp) + socklen_t addrlen, struct ds_qp **new_qp) { + struct ds_qp *qp; struct ibv_qp_init_attr qp_attr; struct epoll_event event; int i, ret; - *qp = calloc(1, sizeof(struct ds_qp)); - if (!*qp) +printf("%s\n", __func__); + qp = calloc(1, sizeof(*qp)); + if (!qp) return ERR(ENOMEM); - (*qp)->rs = rs; - ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP); + qp->rs = rs; + ret = rdma_create_id(NULL, &qp->cm_id, qp, RDMA_PS_UDP); + printf("%s rdma_create_id %d\n", __func__, ret); if (ret) goto err; - ds_format_hdr(&(*qp)->hdr, src_addr); - ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa); + ds_format_hdr(&qp->hdr, src_addr); + ret = rdma_bind_addr(qp->cm_id, &src_addr->sa); + printf("%s rdma_bind_addr %d\n", __func__, ret); if (ret) goto err; - ret = ds_init_bufs(*qp); + ret = ds_init_bufs(qp); + printf("%s ds_init_bufs %d\n", __func__, ret); if (ret) goto err; - ret = rs_create_cq(rs, (*qp)->cm_id); + ret = rs_create_cq(rs, qp->cm_id); + printf("%s rs_create_cq %d\n", __func__, ret); if (ret) goto err; memset(&qp_attr, 0, sizeof qp_attr); qp_attr.qp_context = qp; - qp_attr.send_cq = rs->cm_id->send_cq; - qp_attr.recv_cq = rs->cm_id->recv_cq; + qp_attr.send_cq = qp->cm_id->send_cq; + qp_attr.recv_cq = qp->cm_id->recv_cq; qp_attr.qp_type = IBV_QPT_UD; qp_attr.sq_sig_all = 1; qp_attr.cap.max_send_wr = rs->sq_size; @@ -1413,31 +1428,35 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, qp_attr.cap.max_send_sge = 2; qp_attr.cap.max_recv_sge = 1; qp_attr.cap.max_inline_data = rs->sq_inline; - ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); + ret = rdma_create_qp(qp->cm_id, NULL, &qp_attr); + printf("%s rdma_create_qp %d\n", __func__, ret); if (ret) goto err; - ret = ds_add_qp_dest(*qp, src_addr, addrlen); + ret = ds_add_qp_dest(qp, src_addr, addrlen); + printf("%s ds_add_qp_dest %d\n", __func__, ret); if (ret) goto err; event.events = EPOLLIN; - event.data.ptr = *qp; + event.data.ptr = qp; ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, - (*qp)->cm_id->recv_cq_channel->fd, &event); + qp->cm_id->recv_cq_channel->fd, &event); + printf("%s epoll_ctl %d\n", __func__, ret); if (ret) goto err; for (i = 0; i < rs->rq_size; i++) { - ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); + ret = ds_post_recv(rs, qp, qp->rbuf + i * RS_SNDLOWAT); if (ret) goto err; } - ds_insert_qp(rs, *qp); + ds_insert_qp(rs, qp); + *new_qp = qp; return 0; err: - ds_free_qp(*qp); + ds_free_qp(qp); return ret; } @@ -1464,38 +1483,42 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, union socket_addr src_addr; socklen_t src_len; struct ds_qp *qp; + struct ds_dest **tdest, *new_dest; int ret = 0; + printf("%s \n", __func__); fastlock_acquire(&rs->map_lock); - dest = tfind(addr, &rs->dest_map, ds_compare_addr); - if (dest) - goto out; - - if (rs->state == rs_init) { - ret = ds_init_ep(rs); - if (ret) - goto out; - } + tdest = tfind(addr, &rs->dest_map, ds_compare_addr); + printf("%s tfind %p\n", __func__, dest); + if (tdest) + goto found; ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); + printf("%s ds_get_src_addr %d %s\n", __func__, ret, strerror(errno)); if (ret) goto out; ret = ds_get_qp(rs, &src_addr, src_len, &qp); + printf("%s ds_get_qp %d %s\n", __func__, ret, strerror(errno)); if (ret) goto out; - if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) { - *dest = calloc(1, sizeof(struct ds_dest)); - if (!*dest) { + tdest = tfind(addr, &rs->dest_map, ds_compare_addr); + if (!tdest) { + printf("%s adding dest into map\n", __func__); + new_dest = calloc(1, sizeof(*new_dest)); + if (!new_dest) { ret = ERR(ENOMEM); goto out; } - memcpy(&(*dest)->addr, addr, addrlen); - (*dest)->qp = qp; - tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr); + memcpy(&new_dest->addr, addr, addrlen); + new_dest->qp = qp; + tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr); } + +found: + *dest = *tdest; out: fastlock_release(&rs->map_lock); return ret; @@ -1511,10 +1534,19 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); ret = rs_do_connect(rs); } else { + printf("%s\n", __func__); + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) + return ret; + } + fastlock_acquire(&rs->slock); ret = connect(rs->udp_sock, addr, addrlen); + printf("%s connect %d %s\n", __func__, ret, strerror(errno)); if (!ret) ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + printf("%s ds_get_dest %d %s\n", __func__, ret, strerror(errno)); fastlock_release(&rs->slock); } return ret; @@ -1983,14 +2015,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r do { ds_poll_cqs(rs); if (test(rs)) { - printf("%s test succeeded\n", __func__); +// printf("%s test succeeded\n", __func__); ret = 0; break; } else if (nonblock) { ret = ERR(EWOULDBLOCK); - printf("%s nonblocking \n", __func__); +// printf("%s nonblocking \n", __func__); } else if (!rs->cq_armed) { - printf("%s req notify \n", __func__); +// printf("%s req notify \n", __func__); ds_req_notify_cqs(rs); rs->cq_armed = 1; } else { @@ -1998,14 +2030,14 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r fastlock_release(&rs->cq_lock); ret = ds_get_cq_event(rs); - printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); +// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno)); fastlock_release(&rs->cq_wait_lock); fastlock_acquire(&rs->cq_lock); } } while (!ret); fastlock_release(&rs->cq_lock); - printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); +// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); return ret; } @@ -2017,7 +2049,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc do { ret = ds_process_cqs(rs, 1, test); - printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); +// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); if (!ret || nonblock || errno != EWOULDBLOCK) return ret; @@ -2132,16 +2164,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, struct ds_header *hdr; int ret; -ret = 0; - printf("%s \n", __func__); +// printf("%s \n", __func__); if (!(rs->state & rs_readable)) return ERR(EINVAL); if (!rs_have_rdata(rs)) { - printf("%s need rdata \n", __func__); +// printf("%s need rdata \n", __func__); ret = ds_get_comp(rs, rs_nonblocking(rs, flags), rs_have_rdata); - printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno)); +// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); if (ret) return ret; } @@ -2161,6 +2192,7 @@ ret = 0; rs->rmsg_head = 0; } + printf("%s ret %d errno %s\n", __func__, ret, strerror(errno)); return len; } @@ -2392,12 +2424,14 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, struct ds_udp_header hdr; struct msghdr msg; struct iovec miov[8]; + ssize_t ret; +// printf("%s\n", __func__); if (iovcnt > 8) return ERR(ENOTSUP); hdr.tag = htonl(DS_UDP_TAG); - hdr.version = 1; + hdr.version = rs->conn_dest->qp->hdr.version; hdr.op = op; hdr.reserved = 0; hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF); @@ -2419,18 +2453,24 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); msg.msg_iov = miov; msg.msg_iovlen = iovcnt + 1; - return sendmsg(rs->udp_sock, &msg, flags); +// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen); + ret = sendmsg(rs->udp_sock, &msg, flags); + printf("%s ret %d %s\n", __func__, ret, strerror(errno)); + return ret > 0 ? ret - sizeof hdr : ret; } static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags, uint8_t op) { struct iovec iov; + printf("%s\n", __func__); if (buf && len) { +// printf("%s have buffer\n", __func__); iov.iov_base = (void *) buf; iov.iov_len = len; return ds_sendv_udp(rs, &iov, 1, flags, op); } else { +// printf("%s no buffer\n", __func__); return ds_sendv_udp(rs, NULL, 0, flags, op); } } @@ -2442,6 +2482,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) uint64_t offset; int ret = 0; + printf("%s\n", __func__); if (!rs->conn_dest->ah) return ds_send_udp(rs, buf, len, flags, RS_OP_DATA); @@ -2563,6 +2604,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, struct rsocket *rs; int ret; + printf("%s\n", __func__); rs = idm_at(&idm, socket); if (rs->type == SOCK_STREAM) { if (dest_addr || addrlen) @@ -2571,12 +2613,23 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, return rsend(socket, buf, len, flags); } + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) + return ret; + } + fastlock_acquire(&rs->slock); + printf("%s check conn dest\n", __func__); if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { + printf("%s need conn dest\n", __func__); ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); if (ret) goto out; } + else + printf("%s connected\n", __func__); + ret = dsend(rs, buf, len, flags); out: fastlock_release(&rs->slock); @@ -3605,9 +3658,11 @@ static int rs_svc_add_rs(struct rsocket *rs) } svc_rss[++svc_cnt] = rs; + printf("%s rs %p\n", __func__, rs); svc_fds[svc_cnt].fd = rs->udp_sock; svc_fds[svc_cnt].events = POLLIN; svc_fds[svc_cnt].revents = 0; + printf("add rs udp sock %d\n",rs->udp_sock); return 0; } @@ -3631,6 +3686,7 @@ static void rs_svc_process_sock(void) struct rs_svc_msg msg; read(svc_sock[1], &msg, sizeof msg); + printf("%s op %d\n",__func__, msg.op); switch (msg.op) { case RS_SVC_INSERT: msg.status = rs_svc_add_rs(msg.rs); @@ -3642,6 +3698,7 @@ static void rs_svc_process_sock(void) msg.status = ENOTSUP; break; } + printf("%s status %d\n",__func__, msg.status); write(svc_sock[1], &msg, sizeof msg); } @@ -3675,6 +3732,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t struct ibv_ah_attr attr; int ret; + printf("%s\n",__func__); if (dest->ah) { fastlock_acquire(&rs->slock); ibv_destroy_ah(dest->ah); @@ -3726,7 +3784,18 @@ out: static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, union socket_addr *addr) { - return (udp_hdr->tag == DS_UDP_TAG) && +printf("tag %x ver %d family %d (AF_INET %d) length %d\n", udp_hdr->tag, + udp_hdr->version, addr->sa.sa_family, AF_INET, udp_hdr->length); + +printf("tag %d ver %d fam %d len %d ver %d fam %d len %d\n", +udp_hdr->tag == ntohl(DS_UDP_TAG), + udp_hdr->version == 4, addr->sa.sa_family == AF_INET, + udp_hdr->length == DS_UDP_IPV4_HDR_LEN, + udp_hdr->version == 6, addr->sa.sa_family == AF_INET6, + udp_hdr->length == DS_UDP_IPV6_HDR_LEN); + + + return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 && @@ -3741,6 +3810,7 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, struct ibv_sge sge; uint64_t offset; + printf("%s\n",__func__); if (!ds_can_send(rs)) { if (ds_get_comp(rs, 0, ds_can_send)) return; @@ -3769,7 +3839,9 @@ static void rs_svc_process_rs(struct rsocket *rs) socklen_t addrlen = sizeof addr; int len, ret; + printf("%s\n",__func__); ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); + printf("%s recvfrom %d\n",__func__, ret); if (ret < DS_UDP_IPV4_HDR_LEN) return; @@ -3777,10 +3849,12 @@ static void rs_svc_process_rs(struct rsocket *rs) if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) return; + printf("%s valid hdr\n",__func__); len = ret - udp_hdr->length; udp_hdr->tag = ntohl(udp_hdr->tag); udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); + printf("%s ds_get_dest %d\n",__func__, ret); if (ret) return; @@ -3792,10 +3866,12 @@ static void rs_svc_process_rs(struct rsocket *rs) cur_dest = rs->conn_dest; if (udp_hdr->op == RS_OP_DATA) { rs->conn_dest = &dest->qp->dest; + printf("%s forwarding msg\n",__func__); rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); } rs->conn_dest = dest; + printf("%s sending resp\n",__func__); ds_send_udp(rs, svc_buf + udp_hdr->length, len, 0, RS_OP_CTRL); rs->conn_dest = cur_dest; fastlock_release(&rs->slock); @@ -3806,6 +3882,7 @@ static void *rs_svc_run(void *arg) struct rs_svc_msg msg; int i, ret; + printf("%s\n",__func__); ret = rs_svc_grow_sets(); if (ret) { msg.status = ret; @@ -3816,10 +3893,13 @@ static void *rs_svc_run(void *arg) svc_fds[0].fd = svc_sock[1]; svc_fds[0].events = POLLIN; do { + printf("%s svc cnt %d\n",__func__, svc_cnt); for (i = 0; i <= svc_cnt; i++) svc_fds[i].revents = 0; + printf("%s poll\n",__func__); poll(svc_fds, svc_cnt + 1, -1); + printf("%s poll done\n",__func__); if (svc_fds[0].revents) rs_svc_process_sock(); @@ -3827,7 +3907,7 @@ static void *rs_svc_run(void *arg) if (svc_fds[i].revents) rs_svc_process_rs(svc_rss[i]); } - } while (svc_cnt > 1); + } while (svc_cnt >= 1); return NULL; } -- 2.41.0