struct rsocket;
enum {
- RS_SVC_DGRAM = 1 << 0
+ RS_SVC_NOOP,
+ RS_SVC_ADD_DGRAM,
+ RS_SVC_REM_DGRAM,
+ RS_SVC_ADD_KEEPALIVE,
+ RS_SVC_REM_KEEPALIVE,
+ RS_SVC_MOD_KEEPALIVE
};
struct rs_svc_msg {
- uint32_t svcs;
+ uint32_t cmd;
uint32_t status;
struct rsocket *rs;
};
-static pthread_t svc_id;
-static int svc_sock[2];
-static int svc_cnt;
-static int svc_size;
-static struct rsocket **svc_rss;
-static struct pollfd *svc_fds;
-static uint8_t svc_buf[RS_SNDLOWAT];
-static void *rs_svc_run(void *arg);
+struct rs_svc {
+ pthread_t id;
+ int sock[2];
+ int cnt;
+ int size;
+ int context_size;
+ void *(*run)(void *svc);
+ struct rsocket **rss;
+ void *contexts;
+};
+
+static struct pollfd *udp_svc_fds;
+static void *udp_svc_run(void *arg);
+static struct rs_svc udp_svc = {
+ .context_size = sizeof(*udp_svc_fds),
+ .run = udp_svc_run
+};
+static uint32_t *tcp_svc_timeouts;
+static void *tcp_svc_run(void *arg);
+static struct rs_svc tcp_svc = {
+ .context_size = sizeof(*tcp_svc_timeouts),
+ .run = tcp_svc_run
+};
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
rs_error = 0x2000,
};
-#define RS_OPT_SWAP_SGL (1 << 0)
+#define RS_OPT_SWAP_SGL (1 << 0)
/*
* iWarp does not support RDMA write with immediate data. For iWarp, we
* transfer rsocket messages as inline sends.
*/
-#define RS_OPT_MSG_SEND (1 << 1)
+#define RS_OPT_MSG_SEND (1 << 1)
+#define RS_OPT_SVC_ACTIVE (1 << 2)
union socket_addr {
struct sockaddr sa;
struct {
struct rdma_cm_id *cm_id;
uint64_t tcp_opts;
+ unsigned int keepalive_time;
int ctrl_avail;
uint16_t sseq_no;
};
};
- int svcs;
int opts;
long fd_flags;
uint64_t so_opts;
}
}
-static int rs_modify_svcs(struct rsocket *rs, int svcs)
+static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
{
struct rs_svc_msg msg;
int ret;
pthread_mutex_lock(&mut);
- if (!svc_cnt) {
- ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
+ if (!svc->cnt) {
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc->sock);
if (ret)
goto unlock;
- ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+ ret = pthread_create(&svc->id, NULL, svc->run, svc);
if (ret) {
ret = ERR(ret);
goto closepair;
}
}
- msg.svcs = svcs;
+ msg.cmd = cmd;
msg.status = EINVAL;
msg.rs = rs;
- write(svc_sock[0], &msg, sizeof msg);
- read(svc_sock[0], &msg, sizeof msg);
+ write(svc->sock[0], &msg, sizeof msg);
+ read(svc->sock[0], &msg, sizeof msg);
ret = rdma_seterrno(msg.status);
- if (svc_cnt)
+ if (svc->cnt)
goto unlock;
- pthread_join(svc_id, NULL);
+ pthread_join(svc->id, NULL);
closepair:
- close(svc_sock[0]);
- close(svc_sock[1]);
+ close(svc->sock[0]);
+ close(svc->sock[1]);
unlock:
pthread_mutex_unlock(&mut);
return ret;
}
msg->next = NULL;
- ret = rs_modify_svcs(rs, RS_SVC_DGRAM);
+ ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM);
if (ret)
return ret;
int ctrl, ret = 0;
rs = idm_at(&idm, socket);
+ if (rs->opts & RS_OPT_SVC_ACTIVE)
+ rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+
if (rs->fd_flags & O_NONBLOCK)
rs_set_nonblocking(rs, 0);
static void ds_shutdown(struct rsocket *rs)
{
- if (rs->svcs)
- rs_modify_svcs(rs, 0);
+ if (rs->opts & RS_OPT_SVC_ACTIVE)
+ rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM);
if (rs->fd_flags & O_NONBLOCK)
rs_set_nonblocking(rs, 0);
}
}
+static int rs_set_keepalive(struct rsocket *rs, int on)
+{
+ FILE *f;
+ int ret;
+
+ if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) ||
+ (!on && !(rs->opts & RS_OPT_SVC_ACTIVE)))
+ return 0;
+
+ if (on) {
+ if (!rs->keepalive_time) {
+ if ((f = fopen("/proc/sys/net/ipv4/tcp_keepalive_time", "r"))) {
+ (void) fscanf(f, "%u", &rs->keepalive_time);
+ fclose(f);
+ } else {
+ rs->keepalive_time = 7200;
+ }
+ }
+ ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE);
+ } else {
+ ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+ }
+
+ return ret;
+}
+
int rsetsockopt(int socket, int level, int optname,
const void *optval, socklen_t optlen)
{
ret = 0;
break;
case SO_KEEPALIVE:
- opt_on = *(int *) optval;
- ret = 0;
+ ret = rs_set_keepalive(rs, *(int *) optval);
+ opt_on = rs->opts & RS_OPT_SVC_ACTIVE;
break;
case SO_OOBINLINE:
opt_on = *(int *) optval;
case IPPROTO_TCP:
opts = &rs->tcp_opts;
switch (optname) {
+ case TCP_KEEPCNT:
+ case TCP_KEEPINTVL:
+ ret = 0; /* N/A - we're using a reliable connection */
+ break;
+ case TCP_KEEPIDLE:
+ if (*(int *) optval <= 0) {
+ ret = ERR(EINVAL);
+ break;
+ }
+ rs->keepalive_time = *(int *) optval;
+ ret = (rs->opts & RS_OPT_SVC_ACTIVE) ?
+ rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0;
+ break;
case TCP_NODELAY:
opt_on = *(int *) optval;
ret = 0;
break;
case IPPROTO_TCP:
switch (optname) {
+ case TCP_KEEPCNT:
+ case TCP_KEEPINTVL:
+ *((int *) optval) = 1; /* N/A */
+ break;
+ case TCP_KEEPIDLE:
+ *((int *) optval) = (int) rs->keepalive_time;
+ *optlen = sizeof(int);
+ break;
case TCP_NODELAY:
*((int *) optval) = !!(rs->tcp_opts & (1 << optname));
*optlen = sizeof(int);
return (ret && left == count) ? ret : count - left;
}
-static int rs_svc_grow_sets(void)
+/****************************************************************************
+ * Service Processing Threads
+ ****************************************************************************/
+
+static int rs_svc_grow_sets(struct rs_svc *svc, int grow_size)
{
struct rsocket **rss;
- struct pollfd *fds;
- void *set;
+ void *set, *contexts;
- set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
+ set = calloc(svc->size + grow_size, sizeof(*rss) + svc->context_size);
if (!set)
return ENOMEM;
- svc_size += 2;
+ svc->size += grow_size;
rss = set;
- fds = set + sizeof(*rss) * svc_size;
- if (svc_cnt) {
- memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
- memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
+ contexts = set + sizeof(*rss) * svc->size;
+ if (svc->cnt) {
+ memcpy(rss, svc->rss, sizeof(*rss) * svc->cnt);
+ memcpy(contexts, svc->contexts, svc->context_size * svc->cnt);
}
- free(svc_rss);
- free(svc_fds);
- svc_rss = rss;
- svc_fds = fds;
+ free(svc->rss);
+ svc->rss = rss;
+ svc->contexts = contexts;
return 0;
}
/*
* Index 0 is reserved for the service's communication socket.
*/
-static int rs_svc_add_rs(struct rsocket *rs)
+static int rs_svc_add_rs(struct rs_svc *svc, struct rsocket *rs)
{
int ret;
- if (svc_cnt >= svc_size - 1) {
- ret = rs_svc_grow_sets();
+ if (svc->cnt >= svc->size - 1) {
+ ret = rs_svc_grow_sets(svc, 4);
if (ret)
return ret;
}
- svc_rss[++svc_cnt] = rs;
- svc_fds[svc_cnt].fd = rs->udp_sock;
- svc_fds[svc_cnt].events = POLLIN;
- svc_fds[svc_cnt].revents = 0;
+ svc->rss[++svc->cnt] = rs;
return 0;
}
-static int rs_svc_rm_rs(struct rsocket *rs)
+static int rs_svc_rm_rs(struct rs_svc *svc, struct rsocket *rs)
{
int i;
- for (i = 1; i <= svc_cnt; i++) {
- if (svc_rss[i] == rs) {
- svc_cnt--;
- svc_rss[i] = svc_rss[svc_cnt];
- svc_fds[i] = svc_fds[svc_cnt];
+ for (i = 1; i <= svc->cnt; i++) {
+ if (svc->rss[i] == rs) {
+ svc->cnt--;
+ svc->rss[i] = svc->rss[svc->cnt];
+ memcpy(svc->contexts + i * svc->context_size,
+ svc->contexts + svc->cnt * svc->context_size,
+ svc->context_size);
return 0;
}
}
return EBADF;
}
-static void rs_svc_process_sock(void)
+static void udp_svc_process_sock(struct rs_svc *svc)
{
struct rs_svc_msg msg;
- read(svc_sock[1], &msg, sizeof msg);
- if (msg.svcs & RS_SVC_DGRAM) {
- msg.status = rs_svc_add_rs(msg.rs);
- } else if (!msg.svcs) {
- msg.status = rs_svc_rm_rs(msg.rs);
+ read(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_DGRAM:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+ if (!msg.status) {
+ msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+ udp_svc_fds = svc->contexts;
+ udp_svc_fds[svc->cnt].fd = msg.rs->udp_sock;
+ udp_svc_fds[svc->cnt].events = POLLIN;
+ udp_svc_fds[svc->cnt].revents = 0;
+ }
+ break;
+ case RS_SVC_REM_DGRAM:
+ msg.status = rs_svc_rm_rs(svc, msg.rs);
+ if (!msg.status)
+ msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+ break;
+ case RS_SVC_NOOP:
+ msg.status = 0;
+ break;
+ default:
+ break;
}
- if (!msg.status)
- msg.rs->svcs = msg.svcs;
- write(svc_sock[1], &msg, sizeof msg);
+ write(svc->sock[1], &msg, sizeof msg);
}
-static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+static uint8_t udp_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
{
union ibv_gid gid;
int i;
return 0;
}
-static uint8_t rs_svc_path_bits(struct ds_dest *dest)
+static uint8_t udp_svc_path_bits(struct ds_dest *dest)
{
struct ibv_port_attr attr;
return 0x7f;
}
-static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
+static void udp_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
{
union socket_addr saddr;
struct rdma_cm_id *id;
attr.is_global = 1;
attr.grh.dgid = id->route.path_rec->dgid;
attr.grh.flow_label = ntohl(id->route.path_rec->flow_label);
- attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid);
+ attr.grh.sgid_index = udp_svc_sgid_index(dest, &id->route.path_rec->sgid);
attr.grh.hop_limit = id->route.path_rec->hop_limit;
attr.grh.traffic_class = id->route.path_rec->traffic_class;
}
attr.dlid = ntohs(id->route.path_rec->dlid);
attr.sl = id->route.path_rec->sl;
- attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest);
+ attr.src_path_bits = id->route.path_rec->slid & udp_svc_path_bits(dest);
attr.static_rate = id->route.path_rec->rate;
attr.port_num = id->port_num;
rdma_destroy_id(id);
}
-static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
- union socket_addr *addr)
+static int udp_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
+ union socket_addr *addr)
{
return (udp_hdr->tag == ntohl(DS_UDP_TAG)) &&
((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
}
-static void rs_svc_forward(struct rsocket *rs, void *buf, size_t len,
- union socket_addr *src)
+static void udp_svc_forward(struct rsocket *rs, void *buf, size_t len,
+ union socket_addr *src)
{
struct ds_header hdr;
struct ds_smsg *msg;
ds_post_send(rs, &sge, offset);
}
-static void rs_svc_process_rs(struct rsocket *rs)
+static void udp_svc_process_rs(struct rsocket *rs)
{
+ static uint8_t buf[RS_SNDLOWAT];
struct ds_dest *dest, *cur_dest;
struct ds_udp_header *udp_hdr;
union socket_addr addr;
socklen_t addrlen = sizeof addr;
int len, ret;
- ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+ ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen);
if (ret < DS_UDP_IPV4_HDR_LEN)
return;
- udp_hdr = (struct ds_udp_header *) svc_buf;
- if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+ udp_hdr = (struct ds_udp_header *) buf;
+ if (!udp_svc_valid_udp_hdr(udp_hdr, &addr))
return;
len = ret - udp_hdr->length;
}
if (!dest->ah || (dest->qpn != udp_hdr->qpn))
- rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+ udp_svc_create_ah(rs, dest, udp_hdr->qpn);
/* to do: handle when dest local ip address doesn't match udp ip */
if (udp_hdr->op == RS_OP_DATA) {
fastlock_acquire(&rs->slock);
cur_dest = rs->conn_dest;
rs->conn_dest = &dest->qp->dest;
- rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
+ udp_svc_forward(rs, buf + udp_hdr->length, len, &addr);
rs->conn_dest = cur_dest;
fastlock_release(&rs->slock);
}
}
-static void *rs_svc_run(void *arg)
+static void *udp_svc_run(void *arg)
{
+ struct rs_svc *svc = arg;
struct rs_svc_msg msg;
int i, ret;
- ret = rs_svc_grow_sets();
+ ret = rs_svc_grow_sets(svc, 4);
if (ret) {
msg.status = ret;
- write(svc_sock[1], &msg, sizeof msg);
+ write(svc->sock[1], &msg, sizeof msg);
return (void *) (uintptr_t) ret;
}
- svc_fds[0].fd = svc_sock[1];
- svc_fds[0].events = POLLIN;
+ udp_svc_fds = svc->contexts;
+ udp_svc_fds[0].fd = svc->sock[1];
+ udp_svc_fds[0].events = POLLIN;
do {
- for (i = 0; i <= svc_cnt; i++)
- svc_fds[i].revents = 0;
+ for (i = 0; i <= svc->cnt; i++)
+ udp_svc_fds[i].revents = 0;
+
+ poll(udp_svc_fds, svc->cnt + 1, -1);
+ if (udp_svc_fds[0].revents)
+ udp_svc_process_sock(svc);
- poll(svc_fds, svc_cnt + 1, -1);
- if (svc_fds[0].revents)
- rs_svc_process_sock();
+ for (i = 1; i <= svc->cnt; i++) {
+ if (udp_svc_fds[i].revents)
+ udp_svc_process_rs(svc->rss[i]);
+ }
+ } while (svc->cnt >= 1);
+
+ return NULL;
+}
+
+static uint32_t rs_get_time(void)
+{
+ struct timeval now;
+
+ memset(&now, 0, sizeof now);
+ gettimeofday(&now, NULL);
+ return (uint32_t) now.tv_sec;
+}
+
+static void tcp_svc_process_sock(struct rs_svc *svc)
+{
+ struct rs_svc_msg msg;
- for (i = 1; i <= svc_cnt; i++) {
- if (svc_fds[i].revents)
- rs_svc_process_rs(svc_rss[i]);
+ read(svc->sock[1], &msg, sizeof msg);
+ switch (msg.cmd) {
+ case RS_SVC_ADD_KEEPALIVE:
+ msg.status = rs_svc_add_rs(svc, msg.rs);
+ if (!msg.status) {
+ msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+ tcp_svc_timeouts = svc->contexts;
+ tcp_svc_timeouts[svc->cnt] = rs_get_time() +
+ msg.rs->keepalive_time;
+ }
+ break;
+ case RS_SVC_REM_KEEPALIVE:
+ msg.status = rs_svc_rm_rs(svc, msg.rs);
+ if (!msg.status)
+ msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+ break;
+ case RS_SVC_MOD_KEEPALIVE:
+ tcp_svc_timeouts[svc->cnt] = rs_get_time() + msg.rs->keepalive_time;
+ msg.status = 0;
+ break;
+ case RS_SVC_NOOP:
+ msg.status = 0;
+ break;
+ default:
+ break;
+ }
+ write(svc->sock[1], &msg, sizeof msg);
+}
+
+/*
+ * Send a credit update as the keep-alive message. We may or may not have
+ * any credits, but if we do, then we require a minimum of 2 control credits
+ * for protocols that do not support RDMA write with immediate data. There's
+ * no need to send a keep-alive message if we have any messages outstanding,
+ * and we start with a minimum of 2 credits. For simplicity, we just check
+ * that both credits are available before sending the keep-alive.
+ */
+static void tcp_svc_send_keepalive(struct rsocket *rs)
+{
+ fastlock_acquire(&rs->cq_lock);
+ if ((rs->ctrl_avail > 1) && (rs->state & rs_connected))
+ rs_send_credits(rs);
+ fastlock_release(&rs->cq_lock);
+}
+
+static void *tcp_svc_run(void *arg)
+{
+ struct rs_svc *svc = arg;
+ struct rs_svc_msg msg;
+ struct pollfd fds;
+ uint32_t now, next_timeout;
+ int i, ret, timeout;
+
+ ret = rs_svc_grow_sets(svc, 16);
+ if (ret) {
+ msg.status = ret;
+ write(svc->sock[1], &msg, sizeof msg);
+ return (void *) (uintptr_t) ret;
+ }
+
+ tcp_svc_timeouts = svc->contexts;
+ fds.fd = svc->sock[1];
+ fds.events = POLLIN;
+ timeout = -1;
+ do {
+ poll(&fds, 1, timeout * 1000);
+ if (fds.revents)
+ tcp_svc_process_sock(svc);
+
+ now = rs_get_time();
+ next_timeout = ~0;
+ for (i = 1; i <= svc->cnt; i++) {
+ if (tcp_svc_timeouts[i] <= now) {
+ tcp_svc_send_keepalive(svc->rss[i]);
+ tcp_svc_timeouts[i] =
+ now + svc->rss[i]->keepalive_time;
+ }
+ if (tcp_svc_timeouts[i] < next_timeout)
+ next_timeout = tcp_svc_timeouts[i];
}
- } while (svc_cnt >= 1);
+ timeout = (int) (next_timeout - now);
+ } while (svc->cnt >= 1);
return NULL;
}