]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add keepalive logic
authorSusan K. Coulter <markus@cj-fe2.lanl.gov>
Fri, 17 Jan 2014 22:31:42 +0000 (14:31 -0800)
committerSean Hefty <sean.hefty@intel.com>
Fri, 17 Jan 2014 22:31:42 +0000 (14:31 -0800)
Actually send and receive keepalive messages if keepalive is
enabled on an rsocket.

Signed-off-by: Susan K. Coulter <markus@cj-fe2.lanl.gov>
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
examples/common.c
examples/rstream.c
src/rsocket.c

index 0b6428cbdd90d7f385fb2af7be7542d518fe12c6..09468ce6a03c9e7184fe7c1510f381f795d1349d 100644 (file)
@@ -161,5 +161,5 @@ int do_poll(struct pollfd *fds, int timeout)
                ret = rs_poll(fds, 1, timeout);
        } while (!ret);
 
-       return ret == 1 ? 0 : ret;
+       return ret == 1 ? (fds->revents & (POLLERR | POLLHUP)) : ret;
 }
index cf8471694bdabffb3c6e7d2cf3096a0f8ebfbeaf..27326c2ebc7fca39ced1757d5ac6ce0087a6a3a8 100644 (file)
@@ -90,6 +90,7 @@ static int transfer_count = 1000;
 static int buffer_size;
 static char test_name[10] = "custom";
 static char *port = "7471";
+static int keepalive;
 static char *dst_addr;
 static char *src_addr;
 static struct timeval start, end;
@@ -253,6 +254,28 @@ out:
        return ret;
 }
 
+static void set_keepalive(int rs)
+{
+       int optval;
+       socklen_t optlen = sizeof(optlen);
+
+       optval = 1;
+       if (rs_setsockopt(rs, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen)) {
+               perror("rsetsockopt SO_KEEPALIVE");
+               return;
+       }
+
+       optval = keepalive;
+       if (rs_setsockopt(rs, IPPROTO_TCP, TCP_KEEPIDLE, &optval, optlen))
+               perror("rsetsockopt TCP_KEEPIDLE");
+
+       if (!(rs_getsockopt(rs, SOL_SOCKET, SO_KEEPALIVE, &optval, &optlen)))
+               printf("Keepalive: %s\n", (optval ? "ON" : "OFF"));
+
+       if (!(rs_getsockopt(rs, IPPROTO_TCP, TCP_KEEPIDLE, &optval, &optlen)))
+               printf("  time: %i\n", optval);
+}
+
 static void set_options(int rs)
 {
        int val;
@@ -284,6 +307,9 @@ static void set_options(int rs)
                        rs_setsockopt(rs, SOL_RDMA, RDMA_INLINE, &val, sizeof val);
                }
        }
+
+       if (keepalive)
+               set_keepalive(rs);
 }
 
 static int server_listen(void)
@@ -571,7 +597,7 @@ int main(int argc, char **argv)
 
        ai_hints.ai_socktype = SOCK_STREAM;
        rai_hints.ai_port_space = RDMA_PS_TCP;
-       while ((op = getopt(argc, argv, "s:b:f:B:I:C:S:p:T:")) != -1) {
+       while ((op = getopt(argc, argv, "s:b:f:B:I:C:S:p:k:T:")) != -1) {
                switch (op) {
                case 's':
                        dst_addr = optarg;
@@ -612,6 +638,9 @@ int main(int argc, char **argv)
                case 'p':
                        port = optarg;
                        break;
+               case 'k':
+                       keepalive = atoi(optarg);
+                       break;
                case 'T':
                        if (!set_test_opt(optarg))
                                break;
@@ -627,6 +656,7 @@ int main(int argc, char **argv)
                        printf("\t[-C transfer_count]\n");
                        printf("\t[-S transfer_size or all]\n");
                        printf("\t[-p port_number]\n");
+                       printf("\t[-k keepalive_time]\n");
                        printf("\t[-T test_option]\n");
                        printf("\t    s|sockets - use standard tcp/ip sockets\n");
                        printf("\t    a|async - asynchronous operation (use poll)\n");
index c7a491ba0a4fe06c52c00eebc8184453d95849c4..de8c893458b07e1b8a15e4c0d9cb3088c372c4f4 100644 (file)
@@ -69,23 +69,43 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
 struct rsocket;
 
 enum {
-       RS_SVC_DGRAM = 1 << 0
+       RS_SVC_NOOP,
+       RS_SVC_ADD_DGRAM,
+       RS_SVC_REM_DGRAM,
+       RS_SVC_ADD_KEEPALIVE,
+       RS_SVC_REM_KEEPALIVE,
+       RS_SVC_MOD_KEEPALIVE
 };
 
 struct rs_svc_msg {
-       uint32_t svcs;
+       uint32_t cmd;
        uint32_t status;
        struct rsocket *rs;
 };
 
-static pthread_t svc_id;
-static int svc_sock[2];
-static int svc_cnt;
-static int svc_size;
-static struct rsocket **svc_rss;
-static struct pollfd *svc_fds;
-static uint8_t svc_buf[RS_SNDLOWAT];
-static void *rs_svc_run(void *arg);
+struct rs_svc {
+       pthread_t id;
+       int sock[2];
+       int cnt;
+       int size;
+       int context_size;
+       void *(*run)(void *svc);
+       struct rsocket **rss;
+       void *contexts;
+};
+
+static struct pollfd *udp_svc_fds;
+static void *udp_svc_run(void *arg);
+static struct rs_svc udp_svc = {
+       .context_size = sizeof(*udp_svc_fds),
+       .run = udp_svc_run
+};
+static uint32_t *tcp_svc_timeouts;
+static void *tcp_svc_run(void *arg);
+static struct rs_svc tcp_svc = {
+       .context_size = sizeof(*tcp_svc_timeouts),
+       .run = tcp_svc_run
+};
 
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
@@ -218,12 +238,13 @@ enum rs_state {
        rs_error           =                0x2000,
 };
 
-#define RS_OPT_SWAP_SGL        (1 << 0)
+#define RS_OPT_SWAP_SGL   (1 << 0)
 /*
  * iWarp does not support RDMA write with immediate data.  For iWarp, we
  * transfer rsocket messages as inline sends.
  */
-#define RS_OPT_MSG_SEND        (1 << 1)
+#define RS_OPT_MSG_SEND   (1 << 1)
+#define RS_OPT_SVC_ACTIVE (1 << 2)
 
 union socket_addr {
        struct sockaddr         sa;
@@ -282,6 +303,7 @@ struct rsocket {
                struct {
                        struct rdma_cm_id *cm_id;
                        uint64_t          tcp_opts;
+                       unsigned int      keepalive_time;
 
                        int               ctrl_avail;
                        uint16_t          sseq_no;
@@ -324,7 +346,6 @@ struct rsocket {
                };
        };
 
-       int               svcs;
        int               opts;
        long              fd_flags;
        uint64_t          so_opts;
@@ -396,37 +417,37 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
        }
 }
 
-static int rs_modify_svcs(struct rsocket *rs, int svcs)
+static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
 {
        struct rs_svc_msg msg;
        int ret;
 
        pthread_mutex_lock(&mut);
-       if (!svc_cnt) {
-               ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
+       if (!svc->cnt) {
+               ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock);
                if (ret)
                        goto unlock;
 
-               ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+               ret = pthread_create(&svc->id, NULL, svc->run, svc);
                if (ret) {
                        ret = ERR(ret);
                        goto closepair;
                }
        }
 
-       msg.svcs = svcs;
+       msg.cmd = cmd;
        msg.status = EINVAL;
        msg.rs = rs;
-       write(svc_sock[0], &msg, sizeof msg);
-       read(svc_sock[0], &msg, sizeof msg);
+       write(svc->sock[0], &msg, sizeof msg);
+       read(svc->sock[0], &msg, sizeof msg);
        ret = rdma_seterrno(msg.status);
-       if (svc_cnt)
+       if (svc->cnt)
                goto unlock;
 
-       pthread_join(svc_id, NULL);
+       pthread_join(svc->id, NULL);
 closepair:
-       close(svc_sock[0]);
-       close(svc_sock[1]);
+       close(svc->sock[0]);
+       close(svc->sock[1]);
 unlock:
        pthread_mutex_unlock(&mut);
        return ret;
@@ -1064,7 +1085,7 @@ static int ds_init_ep(struct rsocket *rs)
        }
        msg->next = NULL;
 
-       ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
+       ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM);
        if (ret)
                return ret;
 
@@ -3114,6 +3135,9 @@ int rshutdown(int socket, int how)
        int ctrl, ret = 0;
 
        rs = idm_at(&idm, socket);
+       if (rs->opts & RS_OPT_SVC_ACTIVE)
+               rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+
        if (rs->fd_flags & O_NONBLOCK)
                rs_set_nonblocking(rs, 0);
 
@@ -3161,8 +3185,8 @@ out:
 
 static void ds_shutdown(struct rsocket *rs)
 {
-       if (rs->svcs)
-               rs_modify_svcs(rs, 0);
+       if (rs->opts & RS_OPT_SVC_ACTIVE)
+               rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM);
 
        if (rs->fd_flags & O_NONBLOCK)
                rs_set_nonblocking(rs, 0);
@@ -3230,6 +3254,32 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        }
 }
 
+static int rs_set_keepalive(struct rsocket *rs, int on)
+{
+       FILE *f;
+       int ret;
+
+       if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) ||
+           (!on && !(rs->opts & RS_OPT_SVC_ACTIVE)))
+               return 0;
+
+       if (on) {
+               if (!rs->keepalive_time) {
+                       if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) {
+                               (void) fscanf(f, "%u", &rs->keepalive_time);
+                               fclose(f);
+                       } else {
+                               rs->keepalive_time = 7200;
+                       }
+               }
+               ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE);
+       } else {
+               ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+       }
+
+       return ret;
+}
+
 int rsetsockopt(int socket, int level, int optname,
                const void *optval, socklen_t optlen)
 {
@@ -3280,8 +3330,8 @@ int rsetsockopt(int socket, int level, int optname,
                        ret = 0;
                        break;
                case SO_KEEPALIVE:
-                       opt_on = *(int *) optval;
-                       ret = 0;
+                       ret = rs_set_keepalive(rs, *(int *) optval);
+                       opt_on = rs->opts & RS_OPT_SVC_ACTIVE;
                        break;
                case SO_OOBINLINE:
                        opt_on = *(int *) optval;
@@ -3294,6 +3344,19 @@ int rsetsockopt(int socket, int level, int optname,
        case IPPROTO_TCP:
                opts = &rs->tcp_opts;
                switch (optname) {
+               case TCP_KEEPCNT:
+               case TCP_KEEPINTVL:
+                       ret = 0;   /* N/A - we're using a reliable connection */
+                       break;
+               case TCP_KEEPIDLE:
+                       if (*(int *) optval <= 0) {
+                               ret = ERR(EINVAL);
+                               break;
+                       }
+                       rs->keepalive_time = *(int *) optval;
+                       ret = (rs->opts & RS_OPT_SVC_ACTIVE) ?
+                             rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0;
+                       break;
                case TCP_NODELAY:
                        opt_on = *(int *) optval;
                        ret = 0;
@@ -3416,6 +3479,14 @@ int rgetsockopt(int socket, int level, int optname,
                break;
        case IPPROTO_TCP:
                switch (optname) {
+               case TCP_KEEPCNT:
+               case TCP_KEEPINTVL:
+                       *((int *) optval) = 1;   /* N/A */
+                       break;
+               case TCP_KEEPIDLE:
+                       *((int *) optval) = (int) rs->keepalive_time;
+                       *optlen = sizeof(int);
+                       break;
                case TCP_NODELAY:
                        *((int *) optval) = !!(rs->tcp_opts & (1 << optname));
                        *optlen = sizeof(int);
@@ -3700,83 +3771,99 @@ out:
        return (ret && left == count) ? ret : count - left;
 }
 
-static int rs_svc_grow_sets(void)
+/****************************************************************************
+ * Service Processing Threads
+ ****************************************************************************/
+
+static int rs_svc_grow_sets(struct rs_svc *svc, int grow_size)
 {
        struct rsocket **rss;
-       struct pollfd *fds;
-       void *set;
+       void *set, *contexts;
 
-       set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
+       set = calloc(svc->size + grow_size, sizeof(*rss) + svc->context_size);
        if (!set)
                return ENOMEM;
 
-       svc_size += 2;
+       svc->size += grow_size;
        rss = set;
-       fds = set + sizeof(*rss) * svc_size;
-       if (svc_cnt) {
-               memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
-               memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
+       contexts = set + sizeof(*rss) * svc->size;
+       if (svc->cnt) {
+               memcpy(rss, svc->rss, sizeof(*rss) * svc->cnt);
+               memcpy(contexts, svc->contexts, svc->context_size * svc->cnt);
        }
 
-       free(svc_rss);
-       free(svc_fds);
-       svc_rss = rss;
-       svc_fds = fds;
+       free(svc->rss);
+       svc->rss = rss;
+       svc->contexts = contexts;
        return 0;
 }
 
 /*
  * Index 0 is reserved for the service's communication socket.
  */
-static int rs_svc_add_rs(struct rsocket *rs)
+static int rs_svc_add_rs(struct rs_svc *svc, struct rsocket *rs)
 {
        int ret;
 
-       if (svc_cnt >= svc_size - 1) {
-               ret = rs_svc_grow_sets();
+       if (svc->cnt >= svc->size - 1) {
+               ret = rs_svc_grow_sets(svc, 4);
                if (ret)
                        return ret;
        }
 
-       svc_rss[++svc_cnt] = rs;
-       svc_fds[svc_cnt].fd = rs->udp_sock;
-       svc_fds[svc_cnt].events = POLLIN;
-       svc_fds[svc_cnt].revents = 0;
+       svc->rss[++svc->cnt] = rs;
        return 0;
 }
 
-static int rs_svc_rm_rs(struct rsocket *rs)
+static int rs_svc_rm_rs(struct rs_svc *svc, struct rsocket *rs)
 {
        int i;
 
-       for (i = 1; i <= svc_cnt; i++) {
-               if (svc_rss[i] == rs) {
-                       svc_cnt--;
-                       svc_rss[i] = svc_rss[svc_cnt];
-                       svc_fds[i] = svc_fds[svc_cnt];
+       for (i = 1; i <= svc->cnt; i++) {
+               if (svc->rss[i] == rs) {
+                       svc->cnt--;
+                       svc->rss[i] = svc->rss[svc->cnt];
+                       memcpy(svc->contexts + i * svc->context_size,
+                              svc->contexts + svc->cnt * svc->context_size,
+                              svc->context_size);
                        return 0;
                }
        }
        return EBADF;
 }
 
-static void rs_svc_process_sock(void)
+static void udp_svc_process_sock(struct rs_svc *svc)
 {
        struct rs_svc_msg msg;
 
-       read(svc_sock[1], &msg, sizeof msg);
-       if (msg.svcs & RS_SVC_DGRAM) {
-               msg.status = rs_svc_add_rs(msg.rs);
-       } else if (!msg.svcs) {
-               msg.status = rs_svc_rm_rs(msg.rs);
+       read(svc->sock[1], &msg, sizeof msg);
+       switch (msg.cmd) {
+       case RS_SVC_ADD_DGRAM:
+               msg.status = rs_svc_add_rs(svc, msg.rs);
+               if (!msg.status) {
+                       msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+                       udp_svc_fds = svc->contexts;
+                       udp_svc_fds[svc->cnt].fd = msg.rs->udp_sock;
+                       udp_svc_fds[svc->cnt].events = POLLIN;
+                       udp_svc_fds[svc->cnt].revents = 0;
+               }
+               break;
+       case RS_SVC_REM_DGRAM:
+               msg.status = rs_svc_rm_rs(svc, msg.rs);
+               if (!msg.status)
+                       msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+               break;
+       case RS_SVC_NOOP:
+               msg.status = 0;
+               break;
+       default:
+               break;
        }
 
-       if (!msg.status)
-               msg.rs->svcs = msg.svcs;
-       write(svc_sock[1], &msg, sizeof msg);
+       write(svc->sock[1], &msg, sizeof msg);
 }
 
-static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
 {
        union ibv_gid gid;
        int i;
@@ -3790,7 +3877,7 @@ static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
        return 0;
 }
 
-static uint8_t rs_svc_path_bits(struct ds_dest *dest)
+static uint8_t udp_svc_path_bits(struct ds_dest *dest)
 {
        struct ibv_port_attr attr;
 
@@ -3799,7 +3886,7 @@ static uint8_t rs_svc_path_bits(struct ds_dest *dest)
        return 0x7f;
 }
 
-static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
+static void udp_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
 {
        union socket_addr saddr;
        struct rdma_cm_id *id;
@@ -3836,13 +3923,13 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
                attr.is_global = 1;
                attr.grh.dgid = id->route.path_rec->dgid;
                attr.grh.flow_label = ntohl(id->route.path_rec->flow_label);
-               attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid);
+               attr.grh.sgid_index = udp_svc_sgid_index(dest, &id->route.path_rec->sgid);
                attr.grh.hop_limit = id->route.path_rec->hop_limit;
                attr.grh.traffic_class = id->route.path_rec->traffic_class;
        }
        attr.dlid = ntohs(id->route.path_rec->dlid);
        attr.sl = id->route.path_rec->sl;
-       attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest);
+       attr.src_path_bits = id->route.path_rec->slid & udp_svc_path_bits(dest);
        attr.static_rate = id->route.path_rec->rate;
        attr.port_num  = id->port_num;
 
@@ -3854,8 +3941,8 @@ out:
        rdma_destroy_id(id);
 }
 
-static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
-                               union socket_addr *addr)
+static int udp_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
+                                union socket_addr *addr)
 {
        return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
                ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
@@ -3864,8 +3951,8 @@ static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
                  udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
 }
 
-static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
-                          union socket_addr *src)
+static void udp_svc_forward(struct rsocket *rs, void *buf, size_t len,
+                           union socket_addr *src)
 {
        struct ds_header hdr;
        struct ds_smsg *msg;
@@ -3892,20 +3979,21 @@ static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
        ds_post_send(rs, &sge, offset);
 }
 
-static void rs_svc_process_rs(struct rsocket *rs)
+static void udp_svc_process_rs(struct rsocket *rs)
 {
+       static uint8_t buf[RS_SNDLOWAT];
        struct ds_dest *dest, *cur_dest;
        struct ds_udp_header *udp_hdr;
        union socket_addr addr;
        socklen_t addrlen = sizeof addr;
        int len, ret;
 
-       ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+       ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
        if (ret < DS_UDP_IPV4_HDR_LEN)
                return;
 
-       udp_hdr = (struct ds_udp_header *) svc_buf;
-       if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+       udp_hdr = (struct ds_udp_header *) buf;
+       if (!udp_svc_valid_udp_hdr(udp_hdr, &addr))
                return;
 
        len = ret - udp_hdr->length;
@@ -3925,46 +4013,147 @@ static void rs_svc_process_rs(struct rsocket *rs)
        }
 
        if (!dest->ah || (dest->qpn != udp_hdr->qpn))
-               rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+               udp_svc_create_ah(rs, dest, udp_hdr->qpn);
 
        /* to do: handle when dest local ip address doesn't match udp ip */
        if (udp_hdr->op == RS_OP_DATA) {
                fastlock_acquire(&rs->slock);
                cur_dest = rs->conn_dest;
                rs->conn_dest = &dest->qp->dest;
-               rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
+               udp_svc_forward(rs, buf + udp_hdr->length, len, &addr);
                rs->conn_dest = cur_dest;
                fastlock_release(&rs->slock);
        }
 }
 
-static void *rs_svc_run(void *arg)
+static void *udp_svc_run(void *arg)
 {
+       struct rs_svc *svc = arg;
        struct rs_svc_msg msg;
        int i, ret;
 
-       ret = rs_svc_grow_sets();
+       ret = rs_svc_grow_sets(svc, 4);
        if (ret) {
                msg.status = ret;
-               write(svc_sock[1], &msg, sizeof msg);
+               write(svc->sock[1], &msg, sizeof msg);
                return (void *) (uintptr_t) ret;
        }
 
-       svc_fds[0].fd = svc_sock[1];
-       svc_fds[0].events = POLLIN;
+       udp_svc_fds = svc->contexts;
+       udp_svc_fds[0].fd = svc->sock[1];
+       udp_svc_fds[0].events = POLLIN;
        do {
-               for (i = 0; i <= svc_cnt; i++)
-                       svc_fds[i].revents = 0;
+               for (i = 0; i <= svc->cnt; i++)
+                       udp_svc_fds[i].revents = 0;
+
+               poll(udp_svc_fds, svc->cnt + 1, -1);
+               if (udp_svc_fds[0].revents)
+                       udp_svc_process_sock(svc);
 
-               poll(svc_fds, svc_cnt + 1, -1);
-               if (svc_fds[0].revents)
-                       rs_svc_process_sock();
+               for (i = 1; i <= svc->cnt; i++) {
+                       if (udp_svc_fds[i].revents)
+                               udp_svc_process_rs(svc->rss[i]);
+               }
+       } while (svc->cnt >= 1);
+
+       return NULL;
+}
+
+static uint32_t rs_get_time(void)
+{
+       struct timeval now;
+
+       memset(&now, 0, sizeof now);
+       gettimeofday(&now, NULL);
+       return (uint32_t) now.tv_sec;
+}
+
+static void tcp_svc_process_sock(struct rs_svc *svc)
+{
+       struct rs_svc_msg msg;
 
-               for (i = 1; i <= svc_cnt; i++) {
-                       if (svc_fds[i].revents)
-                               rs_svc_process_rs(svc_rss[i]);
+       read(svc->sock[1], &msg, sizeof msg);
+       switch (msg.cmd) {
+       case RS_SVC_ADD_KEEPALIVE:
+               msg.status = rs_svc_add_rs(svc, msg.rs);
+               if (!msg.status) {
+                       msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+                       tcp_svc_timeouts = svc->contexts;
+                       tcp_svc_timeouts[svc->cnt] = rs_get_time() +
+                                                    msg.rs->keepalive_time;
+               }
+               break;
+       case RS_SVC_REM_KEEPALIVE:
+               msg.status = rs_svc_rm_rs(svc, msg.rs);
+               if (!msg.status)
+                       msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+               break;
+       case RS_SVC_MOD_KEEPALIVE:
+               tcp_svc_timeouts[svc->cnt] = rs_get_time() + msg.rs->keepalive_time;
+               msg.status = 0;
+               break;
+       case RS_SVC_NOOP:
+               msg.status = 0;
+               break;
+       default:
+               break;
+       }
+       write(svc->sock[1], &msg, sizeof msg);
+}
+
+/*
+ * Send a credit update as the keep-alive message.  We may or may not have
+ * any credits, but if we do, then we require a minimum of 2 control credits
+ * for protocols that do not support RDMA write with immediate data.  There's
+ * no need to send a keep-alive message if we have any messages outstanding,
+ * and we start with a minimum of 2 credits.  For simplicity, we just check
+ * that both credits are available before sending the keep-alive.
+ */
+static void tcp_svc_send_keepalive(struct rsocket *rs)
+{
+       fastlock_acquire(&rs->cq_lock);
+       if ((rs->ctrl_avail > 1) && (rs->state & rs_connected))
+               rs_send_credits(rs);
+       fastlock_release(&rs->cq_lock);
+}      
+
+static void *tcp_svc_run(void *arg)
+{
+       struct rs_svc *svc = arg;
+       struct rs_svc_msg msg;
+       struct pollfd fds;
+       uint32_t now, next_timeout;
+       int i, ret, timeout;
+
+       ret = rs_svc_grow_sets(svc, 16);
+       if (ret) {
+               msg.status = ret;
+               write(svc->sock[1], &msg, sizeof msg);
+               return (void *) (uintptr_t) ret;
+       }
+
+       tcp_svc_timeouts = svc->contexts;
+       fds.fd = svc->sock[1];
+       fds.events = POLLIN;
+       timeout = -1;
+       do {
+               poll(&fds, 1, timeout * 1000);
+               if (fds.revents)
+                       tcp_svc_process_sock(svc);
+
+               now = rs_get_time();
+               next_timeout = ~0;
+               for (i = 1; i <= svc->cnt; i++) {
+                       if (tcp_svc_timeouts[i] <= now) {
+                               tcp_svc_send_keepalive(svc->rss[i]);
+                               tcp_svc_timeouts[i] =
+                                       now + svc->rss[i]->keepalive_time;
+                       }
+                       if (tcp_svc_timeouts[i] < next_timeout)
+                               next_timeout = tcp_svc_timeouts[i];
                }
-       } while (svc_cnt >= 1);
+               timeout = (int) (next_timeout - now);
+       } while (svc->cnt >= 1);
 
        return NULL;
 }