From df7ecde0da9df4af5d8bc3e1ca472e2e5ec9095b Mon Sep 17 00:00:00 2001 From: "Susan K. Coulter" Date: Fri, 17 Jan 2014 14:31:42 -0800 Subject: [PATCH] rsocket: Add keepalive logic Actually send and receive keepalive messages if keepalive is enabled on an rsocket. Signed-off-by: Susan K. Coulter Signed-off-by: Sean Hefty --- examples/common.c | 2 +- examples/rstream.c | 32 +++- src/rsocket.c | 375 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 314 insertions(+), 95 deletions(-) diff --git a/examples/common.c b/examples/common.c index 0b6428cb..09468ce6 100644 --- a/examples/common.c +++ b/examples/common.c @@ -161,5 +161,5 @@ int do_poll(struct pollfd *fds, int timeout) ret = rs_poll(fds, 1, timeout); } while (!ret); - return ret == 1 ? 0 : ret; + return ret == 1 ? (fds->revents & (POLLERR | POLLHUP)) : ret; } diff --git a/examples/rstream.c b/examples/rstream.c index cf847169..27326c2e 100644 --- a/examples/rstream.c +++ b/examples/rstream.c @@ -90,6 +90,7 @@ static int transfer_count = 1000; static int buffer_size; static char test_name[10] = "custom"; static char *port = "7471"; +static int keepalive; static char *dst_addr; static char *src_addr; static struct timeval start, end; @@ -253,6 +254,28 @@ out: return ret; } +static void set_keepalive(int rs) +{ + int optval; + socklen_t optlen = sizeof(optlen); + + optval = 1; + if (rs_setsockopt(rs, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen)) { + perror("rsetsockopt SO_KEEPALIVE"); + return; + } + + optval = keepalive; + if (rs_setsockopt(rs, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen)) + perror("rsetsockopt TCP_KEEPIDLE"); + + if (!(rs_getsockopt(rs, SOL_SOCKET, SO_KEEPALIVE, &optval, &optlen))) + printf("Keepalive: %s\n", (optval ? "ON" : "OFF")); + + if (!(rs_getsockopt(rs, IPPROTO_TCP, TCP_KEEPIDLE, &optval, &optlen))) + printf(" time: %i\n", optval); +} + static void set_options(int rs) { int val; @@ -284,6 +307,9 @@ static void set_options(int rs) rs_setsockopt(rs, SOL_RDMA, RDMA_INLINE, &val, sizeof val); } } + + if (keepalive) + set_keepalive(rs); } static int server_listen(void) @@ -571,7 +597,7 @@ int main(int argc, char **argv) ai_hints.ai_socktype = SOCK_STREAM; rai_hints.ai_port_space = RDMA_PS_TCP; - while ((op = getopt(argc, argv, "s:b:f:B:I:C:S:p:T:")) != -1) { + while ((op = getopt(argc, argv, "s:b:f:B:I:C:S:p:k:T:")) != -1) { switch (op) { case 's': dst_addr = optarg; @@ -612,6 +638,9 @@ int main(int argc, char **argv) case 'p': port = optarg; break; + case 'k': + keepalive = atoi(optarg); + break; case 'T': if (!set_test_opt(optarg)) break; @@ -627,6 +656,7 @@ int main(int argc, char **argv) printf("\t[-C transfer_count]\n"); printf("\t[-S transfer_size or all]\n"); printf("\t[-p port_number]\n"); + printf("\t[-k keepalive_time]\n"); printf("\t[-T test_option]\n"); printf("\t s|sockets - use standard tcp/ip sockets\n"); printf("\t a|async - asynchronous operation (use poll)\n"); diff --git a/src/rsocket.c b/src/rsocket.c index c7a491ba..de8c8934 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -69,23 +69,43 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; struct rsocket; enum { - RS_SVC_DGRAM = 1 << 0 + RS_SVC_NOOP, + RS_SVC_ADD_DGRAM, + RS_SVC_REM_DGRAM, + RS_SVC_ADD_KEEPALIVE, + RS_SVC_REM_KEEPALIVE, + RS_SVC_MOD_KEEPALIVE }; struct rs_svc_msg { - uint32_t svcs; + uint32_t cmd; uint32_t status; struct rsocket *rs; }; -static pthread_t svc_id; -static int svc_sock[2]; -static int svc_cnt; -static int svc_size; -static struct rsocket **svc_rss; -static struct pollfd *svc_fds; -static uint8_t svc_buf[RS_SNDLOWAT]; -static void *rs_svc_run(void *arg); +struct rs_svc { + pthread_t id; + int sock[2]; + int cnt; + int size; + int context_size; + void *(*run)(void *svc); + struct rsocket **rss; + void *contexts; +}; + +static struct pollfd *udp_svc_fds; +static void *udp_svc_run(void *arg); +static struct rs_svc udp_svc = { + .context_size = sizeof(*udp_svc_fds), + .run = udp_svc_run +}; +static uint32_t *tcp_svc_timeouts; +static void *tcp_svc_run(void *arg); +static struct rs_svc tcp_svc = { + .context_size = sizeof(*tcp_svc_timeouts), + .run = tcp_svc_run +}; static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; @@ -218,12 +238,13 @@ enum rs_state { rs_error = 0x2000, }; -#define RS_OPT_SWAP_SGL (1 << 0) +#define RS_OPT_SWAP_SGL (1 << 0) /* * iWarp does not support RDMA write with immediate data. For iWarp, we * transfer rsocket messages as inline sends. */ -#define RS_OPT_MSG_SEND (1 << 1) +#define RS_OPT_MSG_SEND (1 << 1) +#define RS_OPT_SVC_ACTIVE (1 << 2) union socket_addr { struct sockaddr sa; @@ -282,6 +303,7 @@ struct rsocket { struct { struct rdma_cm_id *cm_id; uint64_t tcp_opts; + unsigned int keepalive_time; int ctrl_avail; uint16_t sseq_no; @@ -324,7 +346,6 @@ struct rsocket { }; }; - int svcs; int opts; long fd_flags; uint64_t so_opts; @@ -396,37 +417,37 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp) } } -static int rs_modify_svcs(struct rsocket *rs, int svcs) +static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) { struct rs_svc_msg msg; int ret; pthread_mutex_lock(&mut); - if (!svc_cnt) { - ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock); + if (!svc->cnt) { + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock); if (ret) goto unlock; - ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL); + ret = pthread_create(&svc->id, NULL, svc->run, svc); if (ret) { ret = ERR(ret); goto closepair; } } - msg.svcs = svcs; + msg.cmd = cmd; msg.status = EINVAL; msg.rs = rs; - write(svc_sock[0], &msg, sizeof msg); - read(svc_sock[0], &msg, sizeof msg); + write(svc->sock[0], &msg, sizeof msg); + read(svc->sock[0], &msg, sizeof msg); ret = rdma_seterrno(msg.status); - if (svc_cnt) + if (svc->cnt) goto unlock; - pthread_join(svc_id, NULL); + pthread_join(svc->id, NULL); closepair: - close(svc_sock[0]); - close(svc_sock[1]); + close(svc->sock[0]); + close(svc->sock[1]); unlock: pthread_mutex_unlock(&mut); return ret; @@ -1064,7 +1085,7 @@ static int ds_init_ep(struct rsocket *rs) } msg->next = NULL; - ret = rs_modify_svcs(rs, RS_SVC_DGRAM); + ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM); if (ret) return ret; @@ -3114,6 +3135,9 @@ int rshutdown(int socket, int how) int ctrl, ret = 0; rs = idm_at(&idm, socket); + if (rs->opts & RS_OPT_SVC_ACTIVE) + rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); + if (rs->fd_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); @@ -3161,8 +3185,8 @@ out: static void ds_shutdown(struct rsocket *rs) { - if (rs->svcs) - rs_modify_svcs(rs, 0); + if (rs->opts & RS_OPT_SVC_ACTIVE) + rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM); if (rs->fd_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); @@ -3230,6 +3254,32 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) } } +static int rs_set_keepalive(struct rsocket *rs, int on) +{ + FILE *f; + int ret; + + if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) || + (!on && !(rs->opts & RS_OPT_SVC_ACTIVE))) + return 0; + + if (on) { + if (!rs->keepalive_time) { + if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) { + (void) fscanf(f, "%u", &rs->keepalive_time); + fclose(f); + } else { + rs->keepalive_time = 7200; + } + } + ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE); + } else { + ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); + } + + return ret; +} + int rsetsockopt(int socket, int level, int optname, const void *optval, socklen_t optlen) { @@ -3280,8 +3330,8 @@ int rsetsockopt(int socket, int level, int optname, ret = 0; break; case SO_KEEPALIVE: - opt_on = *(int *) optval; - ret = 0; + ret = rs_set_keepalive(rs, *(int *) optval); + opt_on = rs->opts & RS_OPT_SVC_ACTIVE; break; case SO_OOBINLINE: opt_on = *(int *) optval; @@ -3294,6 +3344,19 @@ int rsetsockopt(int socket, int level, int optname, case IPPROTO_TCP: opts = &rs->tcp_opts; switch (optname) { + case TCP_KEEPCNT: + case TCP_KEEPINTVL: + ret = 0; /* N/A - we're using a reliable connection */ + break; + case TCP_KEEPIDLE: + if (*(int *) optval <= 0) { + ret = ERR(EINVAL); + break; + } + rs->keepalive_time = *(int *) optval; + ret = (rs->opts & RS_OPT_SVC_ACTIVE) ? + rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0; + break; case TCP_NODELAY: opt_on = *(int *) optval; ret = 0; @@ -3416,6 +3479,14 @@ int rgetsockopt(int socket, int level, int optname, break; case IPPROTO_TCP: switch (optname) { + case TCP_KEEPCNT: + case TCP_KEEPINTVL: + *((int *) optval) = 1; /* N/A */ + break; + case TCP_KEEPIDLE: + *((int *) optval) = (int) rs->keepalive_time; + *optlen = sizeof(int); + break; case TCP_NODELAY: *((int *) optval) = !!(rs->tcp_opts & (1 << optname)); *optlen = sizeof(int); @@ -3700,83 +3771,99 @@ out: return (ret && left == count) ? ret : count - left; } -static int rs_svc_grow_sets(void) +/**************************************************************************** + * Service Processing Threads + ****************************************************************************/ + +static int rs_svc_grow_sets(struct rs_svc *svc, int grow_size) { struct rsocket **rss; - struct pollfd *fds; - void *set; + void *set, *contexts; - set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds)); + set = calloc(svc->size + grow_size, sizeof(*rss) + svc->context_size); if (!set) return ENOMEM; - svc_size += 2; + svc->size += grow_size; rss = set; - fds = set + sizeof(*rss) * svc_size; - if (svc_cnt) { - memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt); - memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt); + contexts = set + sizeof(*rss) * svc->size; + if (svc->cnt) { + memcpy(rss, svc->rss, sizeof(*rss) * svc->cnt); + memcpy(contexts, svc->contexts, svc->context_size * svc->cnt); } - free(svc_rss); - free(svc_fds); - svc_rss = rss; - svc_fds = fds; + free(svc->rss); + svc->rss = rss; + svc->contexts = contexts; return 0; } /* * Index 0 is reserved for the service's communication socket. */ -static int rs_svc_add_rs(struct rsocket *rs) +static int rs_svc_add_rs(struct rs_svc *svc, struct rsocket *rs) { int ret; - if (svc_cnt >= svc_size - 1) { - ret = rs_svc_grow_sets(); + if (svc->cnt >= svc->size - 1) { + ret = rs_svc_grow_sets(svc, 4); if (ret) return ret; } - svc_rss[++svc_cnt] = rs; - svc_fds[svc_cnt].fd = rs->udp_sock; - svc_fds[svc_cnt].events = POLLIN; - svc_fds[svc_cnt].revents = 0; + svc->rss[++svc->cnt] = rs; return 0; } -static int rs_svc_rm_rs(struct rsocket *rs) +static int rs_svc_rm_rs(struct rs_svc *svc, struct rsocket *rs) { int i; - for (i = 1; i <= svc_cnt; i++) { - if (svc_rss[i] == rs) { - svc_cnt--; - svc_rss[i] = svc_rss[svc_cnt]; - svc_fds[i] = svc_fds[svc_cnt]; + for (i = 1; i <= svc->cnt; i++) { + if (svc->rss[i] == rs) { + svc->cnt--; + svc->rss[i] = svc->rss[svc->cnt]; + memcpy(svc->contexts + i * svc->context_size, + svc->contexts + svc->cnt * svc->context_size, + svc->context_size); return 0; } } return EBADF; } -static void rs_svc_process_sock(void) +static void udp_svc_process_sock(struct rs_svc *svc) { struct rs_svc_msg msg; - read(svc_sock[1], &msg, sizeof msg); - if (msg.svcs & RS_SVC_DGRAM) { - msg.status = rs_svc_add_rs(msg.rs); - } else if (!msg.svcs) { - msg.status = rs_svc_rm_rs(msg.rs); + read(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_DGRAM: + msg.status = rs_svc_add_rs(svc, msg.rs); + if (!msg.status) { + msg.rs->opts |= RS_OPT_SVC_ACTIVE; + udp_svc_fds = svc->contexts; + udp_svc_fds[svc->cnt].fd = msg.rs->udp_sock; + udp_svc_fds[svc->cnt].events = POLLIN; + udp_svc_fds[svc->cnt].revents = 0; + } + break; + case RS_SVC_REM_DGRAM: + msg.status = rs_svc_rm_rs(svc, msg.rs); + if (!msg.status) + msg.rs->opts &= ~RS_OPT_SVC_ACTIVE; + break; + case RS_SVC_NOOP: + msg.status = 0; + break; + default: + break; } - if (!msg.status) - msg.rs->svcs = msg.svcs; - write(svc_sock[1], &msg, sizeof msg); + write(svc->sock[1], &msg, sizeof msg); } -static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) +static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) { union ibv_gid gid; int i; @@ -3790,7 +3877,7 @@ static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) return 0; } -static uint8_t rs_svc_path_bits(struct ds_dest *dest) +static uint8_t udp_svc_path_bits(struct ds_dest *dest) { struct ibv_port_attr attr; @@ -3799,7 +3886,7 @@ static uint8_t rs_svc_path_bits(struct ds_dest *dest) return 0x7f; } -static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn) +static void udp_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn) { union socket_addr saddr; struct rdma_cm_id *id; @@ -3836,13 +3923,13 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t attr.is_global = 1; attr.grh.dgid = id->route.path_rec->dgid; attr.grh.flow_label = ntohl(id->route.path_rec->flow_label); - attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid); + attr.grh.sgid_index = udp_svc_sgid_index(dest, &id->route.path_rec->sgid); attr.grh.hop_limit = id->route.path_rec->hop_limit; attr.grh.traffic_class = id->route.path_rec->traffic_class; } attr.dlid = ntohs(id->route.path_rec->dlid); attr.sl = id->route.path_rec->sl; - attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest); + attr.src_path_bits = id->route.path_rec->slid & udp_svc_path_bits(dest); attr.static_rate = id->route.path_rec->rate; attr.port_num = id->port_num; @@ -3854,8 +3941,8 @@ out: rdma_destroy_id(id); } -static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, - union socket_addr *addr) +static int udp_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, + union socket_addr *addr) { return (udp_hdr->tag == ntohl(DS_UDP_TAG)) && ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && @@ -3864,8 +3951,8 @@ static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, udp_hdr->length == DS_UDP_IPV6_HDR_LEN)); } -static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, - union socket_addr *src) +static void udp_svc_forward(struct rsocket *rs, void *buf, size_t len, + union socket_addr *src) { struct ds_header hdr; struct ds_smsg *msg; @@ -3892,20 +3979,21 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len, ds_post_send(rs, &sge, offset); } -static void rs_svc_process_rs(struct rsocket *rs) +static void udp_svc_process_rs(struct rsocket *rs) { + static uint8_t buf[RS_SNDLOWAT]; struct ds_dest *dest, *cur_dest; struct ds_udp_header *udp_hdr; union socket_addr addr; socklen_t addrlen = sizeof addr; int len, ret; - ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); + ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); if (ret < DS_UDP_IPV4_HDR_LEN) return; - udp_hdr = (struct ds_udp_header *) svc_buf; - if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) + udp_hdr = (struct ds_udp_header *) buf; + if (!udp_svc_valid_udp_hdr(udp_hdr, &addr)) return; len = ret - udp_hdr->length; @@ -3925,46 +4013,147 @@ static void rs_svc_process_rs(struct rsocket *rs) } if (!dest->ah || (dest->qpn != udp_hdr->qpn)) - rs_svc_create_ah(rs, dest, udp_hdr->qpn); + udp_svc_create_ah(rs, dest, udp_hdr->qpn); /* to do: handle when dest local ip address doesn't match udp ip */ if (udp_hdr->op == RS_OP_DATA) { fastlock_acquire(&rs->slock); cur_dest = rs->conn_dest; rs->conn_dest = &dest->qp->dest; - rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); + udp_svc_forward(rs, buf + udp_hdr->length, len, &addr); rs->conn_dest = cur_dest; fastlock_release(&rs->slock); } } -static void *rs_svc_run(void *arg) +static void *udp_svc_run(void *arg) { + struct rs_svc *svc = arg; struct rs_svc_msg msg; int i, ret; - ret = rs_svc_grow_sets(); + ret = rs_svc_grow_sets(svc, 4); if (ret) { msg.status = ret; - write(svc_sock[1], &msg, sizeof msg); + write(svc->sock[1], &msg, sizeof msg); return (void *) (uintptr_t) ret; } - svc_fds[0].fd = svc_sock[1]; - svc_fds[0].events = POLLIN; + udp_svc_fds = svc->contexts; + udp_svc_fds[0].fd = svc->sock[1]; + udp_svc_fds[0].events = POLLIN; do { - for (i = 0; i <= svc_cnt; i++) - svc_fds[i].revents = 0; + for (i = 0; i <= svc->cnt; i++) + udp_svc_fds[i].revents = 0; + + poll(udp_svc_fds, svc->cnt + 1, -1); + if (udp_svc_fds[0].revents) + udp_svc_process_sock(svc); - poll(svc_fds, svc_cnt + 1, -1); - if (svc_fds[0].revents) - rs_svc_process_sock(); + for (i = 1; i <= svc->cnt; i++) { + if (udp_svc_fds[i].revents) + udp_svc_process_rs(svc->rss[i]); + } + } while (svc->cnt >= 1); + + return NULL; +} + +static uint32_t rs_get_time(void) +{ + struct timeval now; + + memset(&now, 0, sizeof now); + gettimeofday(&now, NULL); + return (uint32_t) now.tv_sec; +} + +static void tcp_svc_process_sock(struct rs_svc *svc) +{ + struct rs_svc_msg msg; - for (i = 1; i <= svc_cnt; i++) { - if (svc_fds[i].revents) - rs_svc_process_rs(svc_rss[i]); + read(svc->sock[1], &msg, sizeof msg); + switch (msg.cmd) { + case RS_SVC_ADD_KEEPALIVE: + msg.status = rs_svc_add_rs(svc, msg.rs); + if (!msg.status) { + msg.rs->opts |= RS_OPT_SVC_ACTIVE; + tcp_svc_timeouts = svc->contexts; + tcp_svc_timeouts[svc->cnt] = rs_get_time() + + msg.rs->keepalive_time; + } + break; + case RS_SVC_REM_KEEPALIVE: + msg.status = rs_svc_rm_rs(svc, msg.rs); + if (!msg.status) + msg.rs->opts &= ~RS_OPT_SVC_ACTIVE; + break; + case RS_SVC_MOD_KEEPALIVE: + tcp_svc_timeouts[svc->cnt] = rs_get_time() + msg.rs->keepalive_time; + msg.status = 0; + break; + case RS_SVC_NOOP: + msg.status = 0; + break; + default: + break; + } + write(svc->sock[1], &msg, sizeof msg); +} + +/* + * Send a credit update as the keep-alive message. We may or may not have + * any credits, but if we do, then we require a minimum of 2 control credits + * for protocols that do not support RDMA write with immediate data. There's + * no need to send a keep-alive message if we have any messages outstanding, + * and we start with a minimum of 2 credits. For simplicity, we just check + * that both credits are available before sending the keep-alive. + */ +static void tcp_svc_send_keepalive(struct rsocket *rs) +{ + fastlock_acquire(&rs->cq_lock); + if ((rs->ctrl_avail > 1) && (rs->state & rs_connected)) + rs_send_credits(rs); + fastlock_release(&rs->cq_lock); +} + +static void *tcp_svc_run(void *arg) +{ + struct rs_svc *svc = arg; + struct rs_svc_msg msg; + struct pollfd fds; + uint32_t now, next_timeout; + int i, ret, timeout; + + ret = rs_svc_grow_sets(svc, 16); + if (ret) { + msg.status = ret; + write(svc->sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; + } + + tcp_svc_timeouts = svc->contexts; + fds.fd = svc->sock[1]; + fds.events = POLLIN; + timeout = -1; + do { + poll(&fds, 1, timeout * 1000); + if (fds.revents) + tcp_svc_process_sock(svc); + + now = rs_get_time(); + next_timeout = ~0; + for (i = 1; i <= svc->cnt; i++) { + if (tcp_svc_timeouts[i] <= now) { + tcp_svc_send_keepalive(svc->rss[i]); + tcp_svc_timeouts[i] = + now + svc->rss[i]->keepalive_time; + } + if (tcp_svc_timeouts[i] < next_timeout) + next_timeout = tcp_svc_timeouts[i]; } - } while (svc_cnt >= 1); + timeout = (int) (next_timeout - now); + } while (svc->cnt >= 1); return NULL; } -- 2.46.0