static int svc_size;
static struct rsocket **svc_rss;
static struct pollfd *svc_fds;
+static uint8_t svc_buf[RS_SNDLOWAT];
+static int rs_svc_run(void *arg);
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
uint32_t qpn; /* lower 8-bits reserved */
union {
uint32_t ipv4;
- struct {
- uint8_t addr[16];
- } ipv6;
+ uint8_t ipv6[16];
} addr;
};
}
}
-static int rs_svc_grow_sets(void)
-{
- struct rsocket **rss;
- struct pollfd *fds;
- void *set;
-
- set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
- if (!set)
- return ENOMEM;
-
- svc_size += 2;
- rss = set;
- fds = set + sizeof(*rss) * svc_size;
- if (svc_cnt) {
- memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
- memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
- }
-
- free(svc_rss);
- free(svc_fds);
- svc_rss = rss;
- svc_fds = fds;
- return 0;
-}
-
-/*
- * Index 0 is reserved for the service's communication socket.
- */
-static int rs_svc_add_rs(struct rsocket *rs)
-{
- int ret;
-
- if (svc_cnt >= svc_size - 1) {
- ret = rs_svc_grow_sets();
- if (ret)
- return ret;
- }
-
- svc_rss[++svc_cnt] = rs;
- svc_fds[svc_cnt].fd = rs->udp_sock;
- svc_fds[svc_cnt].events = POLLIN;
- svc_fds[svc_cnt].revents = 0;
- return 0;
-}
-
-static int rs_svc_rm_rs(struct rsocket *rs)
-{
- int i;
-
- for (i = 1; i <= svc_cnt; i++) {
- if (svc_rss[i] == rs) {
- svc_cnt--;
- svc_rss[i] = svc_rss[svc_cnt];
- svc_fds[i] = svc_fds[svc_cnt];
- return 0;
- }
- }
- return EBADF;
-}
-
-static void rs_svc_process_sock(void)
-{
- struct rs_svc_msg msg;
-
- read(svc_sock[1], &msg, sizeof msg);
- switch (msg.op) {
- case RS_SVC_INSERT:
- msg.status = rs_svc_add_rs(msg.rs);
- break;
- case RS_SVC_REMOVE:
- msg.status = rs_svc_rm_rs(msg.rs);
- break;
- default:
- msg.status = ENOTSUP;
- break;
- }
- write(svc_sock[1], &msg, sizeof msg);
-}
-
-static void rs_svc_process_rs(struct rsocket *rs)
-{
-
-}
-
-static int rs_svc_run(void *arg)
-{
- struct rs_svc_msg msg;
- int i, ret;
-
- ret = rs_svc_grow_sets();
- if (ret) {
- msg.status = ret;
- write(svc_sock[1] &msg, sizeof msg);
- return ret;
- }
-
- svc_fds[0].fd = svc_sock[1];
- svc_fds[0].events = POLLIN;
- do {
- for (i = 0; i <= svc_cnt; i++)
- svc_fds[i].revents = 0;
-
- poll(svc_fds, svc_cnt + 1, -1);
- if (svc_fds[0].revents)
- rs_svc_process_sock();
-
- for (i = 1; i <= svc_cnt; i++) {
- if (svc_fds[i].revents)
- rs_svc_process_rs(svc_rss[i]);
- }
- } while (svc_cnt > 1);
-
- return 0;
-}
-
-static int rs_svc_insert(struct rsocket *rs)
+static int rs_add_to_svc(struct rsocket *rs)
{
struct rs_svc_msg msg;
int ret;
return ret;
}
-static int rs_svc_remove(struct rsocket *rs)
+static int rs_remove_from_svc(struct rsocket *rs)
{
struct rs_svc_msg msg;
int ret;
struct ds_qp *qp;
if (rs->state & (rs_readable | rs_writable))
- rs_svc_remove(rs);
+ rs_remove_from_svc(rs);
if (rs->udp_sock >= 0)
close(rs->udp_sock);
} else {
ret = bind(rs->udp_sock, addr, addrlen);
if (!ret) {
- ret = rs_svc_insert(rs);
+ ret = rs_add_to_svc(rs);
if (!ret)
rs->state = rs_readable | rs_writable;
}
}
msg->next = NULL;
- ret = rs_svc_insert(rs);
+ ret = rs_add_to_svc(rs);
if (ret)
return ret;
hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
} else {
hdr.length = DS_UDP_IPV6_HDR_LEN;
- memcpy(hdr.addr.ipv6.addr, rs->conn_dest->qp->hdr.addr.ipv6.addr, 16);
+ memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16);
}
miov[0].iov_base = &hdr;
miov[0].iov_len = sizeof hdr;
- memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+ if (iov && iovcnt)
+ memcpy(&miov[1], iov, sizeof *iov * iovcnt);
memset(&msg, 0, sizeof msg);
msg.msg_iov = miov;
return (ret && left == count) ? ret : count - left;
}
+
+static int rs_svc_grow_sets(void)
+{
+ struct rsocket **rss;
+ struct pollfd *fds;
+ void *set;
+
+ set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
+ if (!set)
+ return ENOMEM;
+
+ svc_size += 2;
+ rss = set;
+ fds = set + sizeof(*rss) * svc_size;
+ if (svc_cnt) {
+ memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
+ memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
+ }
+
+ free(svc_rss);
+ free(svc_fds);
+ svc_rss = rss;
+ svc_fds = fds;
+ return 0;
+}
+
+/*
+ * Index 0 is reserved for the service's communication socket.
+ */
+static int rs_svc_add_rs(struct rsocket *rs)
+{
+ int ret;
+
+ if (svc_cnt >= svc_size - 1) {
+ ret = rs_svc_grow_sets();
+ if (ret)
+ return ret;
+ }
+
+ svc_rss[++svc_cnt] = rs;
+ svc_fds[svc_cnt].fd = rs->udp_sock;
+ svc_fds[svc_cnt].events = POLLIN;
+ svc_fds[svc_cnt].revents = 0;
+ return 0;
+}
+
+static int rs_svc_rm_rs(struct rsocket *rs)
+{
+ int i;
+
+ for (i = 1; i <= svc_cnt; i++) {
+ if (svc_rss[i] == rs) {
+ svc_cnt--;
+ svc_rss[i] = svc_rss[svc_cnt];
+ svc_fds[i] = svc_fds[svc_cnt];
+ return 0;
+ }
+ }
+ return EBADF;
+}
+
+static void rs_svc_process_sock(void)
+{
+ struct rs_svc_msg msg;
+
+ read(svc_sock[1], &msg, sizeof msg);
+ switch (msg.op) {
+ case RS_SVC_INSERT:
+ msg.status = rs_svc_add_rs(msg.rs);
+ break;
+ case RS_SVC_REMOVE:
+ msg.status = rs_svc_rm_rs(msg.rs);
+ break;
+ default:
+ msg.status = ENOTSUP;
+ break;
+ }
+ write(svc_sock[1], &msg, sizeof msg);
+}
+
+static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+{
+ union ibv_gid gid;
+ int i, ret;
+
+ for (i = 0; i < 16; i++) {
+ ret = ibv_query_gid(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num,
+ i, &gid);
+ if (!memcmp(sgid, &gid, sizeof gid))
+ return i;
+ }
+ return 0;
+}
+
+static uint8_t rs_svc_path_bits(struct ds_dest *dest)
+{
+ struct ibv_port_attr attr;
+
+ if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
+ return (uint8_t) ((1 << attr.lmc) - 1));
+ return 0x7f;
+}
+
+static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
+{
+ struct socket_addr saddr;
+ struct rdma_cm_id *id;
+ struct ibv_ah_attr attr;
+ int ret;
+
+ if (dest->ah) {
+ fastlock_acquire(&rs->slock);
+ ibv_destroy_ah(dest->ah);
+ dest->ah = NULL;
+ fastlock_release(&rs->slock);
+ }
+
+ ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps);
+ if (ret)
+ return;
+
+ memcpy(&saddr, rdma_get_local_addr(dest->qp->cm_id),
+ ucma_addrlen(rdma_get_local_addr(dest->qp->cm_id)));
+ if (saddr.sa.sa_family == AF_INET)
+ saddr.sin.sin_port = 0;
+ else
+ saddr.sin6.sin6_port = 0;
+ ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000);
+ if (ret)
+ goto out;
+
+ ret = rdma_resolve_route(id, 2000);
+ if (ret)
+ goto out;
+
+ memset(&attr, 0, sizeof attr);
+ if (id->route.path_rec->hop_limit) {
+ attr.is_global = 1;
+ attr.grh.dgid = id->route.path_rec->dgid;
+ attr.grh.flow_label = id->route.path_rec->flow_label;
+ attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid);
+ attr.grh.hop_limit = id->route.path_rec->hop_limit;
+ attr.grh.traffic_class = id->route.path_rec->traffic_class;
+ }
+ attr.dlid = id->route.path_rec->dlid;
+ attr.sl = id->route.path_rec->sl;
+ attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest);
+ attr.static_rate = id->route.path_rec->rate;
+ attr.port_num = id->port_num;
+
+ fastlock_acquire(&rs->slock);
+ dest->qpn = qpn;
+ dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
+ fastlock_release(&rs->slock);
+out:
+ rdma_destroy_id(id);
+}
+
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
+ union socket_addr *addr)
+{
+ return (udp_hdr->tag == DS_UDP_TAG) &&
+ ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET &&
+ udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
+ (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 &&
+ udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
+}
+
+static int rs_svc_comp_local_net(struct ds_dest *dest,
+ struct ds_udp_header *udp_hdr)
+{
+ return ((udp_hdr->version == 4) &&
+ (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
+ (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
+ ((udp_hdr->version == 6) &&
+ (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
+ (!memcmp(udp_hdr->addr.ipv6,
+ &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
+}
+
+static void rs_svc_process_rs(struct rsocket *rs)
+{
+ struct ds_dest *dest;
+ struct ds_udp_header *udp_hdr;
+ union socket_addr addr;
+ socklen_t addrlen = sizeof addr;
+ int len, ret;
+
+ ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+ if (ret < DS_UDP_IPV4_HDR_LEN)
+ return;
+
+ udp_hdr = (struct ds_udp_header *) svc_buf;
+ if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+ return;
+
+ len = ret - udp_hdr->length;
+ udp_hdr->tag = ntohl(udp_hdr->tag);
+ udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
+ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+ if (ret)
+ return;
+
+ if (!dest->ah || (dest->qpn != udp_hdr->qpn))
+ rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+
+ if (!rs_svc_comp_local_net(dest, udp_hdr)) {
+
+ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+ if (!ret) {
+ if (!dest->ah)
+ rs_svc_create_ah(rs, dest, udp_hdr);
+ rs_send_udp_reply(rs, dest);
+ }
+ }
+
+ if (udp_hdr->op == RS_OP_DATA) {
+ fastlock_acquire(&rs->slock);
+ ret = dsend(rs, buf, len, flags);
+ fastlock_release(&rs->slock);
+ }
+}
+
+static int rs_svc_run(void *arg)
+{
+ struct rs_svc_msg msg;
+ int i, ret;
+
+ ret = rs_svc_grow_sets();
+ if (ret) {
+ msg.status = ret;
+ write(svc_sock[1] &msg, sizeof msg);
+ return ret;
+ }
+
+ svc_fds[0].fd = svc_sock[1];
+ svc_fds[0].events = POLLIN;
+ do {
+ for (i = 0; i <= svc_cnt; i++)
+ svc_fds[i].revents = 0;
+
+ poll(svc_fds, svc_cnt + 1, -1);
+ if (svc_fds[0].revents)
+ rs_svc_process_sock();
+
+ for (i = 1; i <= svc_cnt; i++) {
+ if (svc_fds[i].revents)
+ rs_svc_process_rs(svc_rss[i]);
+ }
+ } while (svc_cnt > 1);
+
+ return 0;
+}