From: Sean Hefty Date: Tue, 4 Dec 2012 07:40:39 +0000 (-0800) Subject: Refresh of dsocket X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=6c44ce5f0cff27447e4c87ca8a694914d0d6256e;p=~shefty%2Flibrdmacm.git Refresh of dsocket --- diff --git a/src/cma.c b/src/cma.c index 49b88a0e..0f589668 100755 --- a/src/cma.c +++ b/src/cma.c @@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id) return 0; } -static int ucma_addrlen(struct sockaddr *addr) +int ucma_addrlen(struct sockaddr *addr) { if (!addr) return 0; diff --git a/src/cma.h b/src/cma.h index 0a0370ee..7135a612 100644 --- a/src/cma.h +++ b/src/cma.h @@ -145,10 +145,12 @@ typedef struct { volatile int val; } atomic_t; #define atomic_set(v, s) ((v)->val = s) uint16_t ucma_get_port(struct sockaddr *addr); +int ucma_addrlen(struct sockaddr *addr); void ucma_set_sid(enum rdma_port_space ps, struct sockaddr *addr, struct sockaddr_ib *sib); int ucma_max_qpsize(struct rdma_cm_id *id); int ucma_complete(struct rdma_cm_id *id); + static inline int ERR(int err) { errno = err; diff --git a/src/rsocket.c b/src/rsocket.c index 63bb225c..31b946a6 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -84,6 +84,8 @@ static int svc_cnt; static int svc_size; static struct rsocket **svc_rss; static struct pollfd *svc_fds; +static uint8_t svc_buf[RS_SNDLOWAT]; +static int rs_svc_run(void *arg); static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; @@ -348,9 +350,7 @@ struct ds_udp_header { uint32_t qpn; /* lower 8-bits reserved */ union { uint32_t ipv4; - struct { - uint8_t addr[16]; - } ipv6; + uint8_t ipv6[16]; } addr; }; @@ -378,122 +378,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp) } } -static int rs_svc_grow_sets(void) -{ - struct rsocket **rss; - struct pollfd *fds; - void *set; - - set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds)); - if (!set) - return ENOMEM; - - svc_size += 2; - rss = set; - fds = set + sizeof(*rss) * svc_size; - if (svc_cnt) { - memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt); - memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt); - } - - free(svc_rss); - free(svc_fds); - svc_rss = rss; - svc_fds = fds; - return 0; -} - -/* - * Index 0 is reserved for the service's communication socket. - */ -static int rs_svc_add_rs(struct rsocket *rs) -{ - int ret; - - if (svc_cnt >= svc_size - 1) { - ret = rs_svc_grow_sets(); - if (ret) - return ret; - } - - svc_rss[++svc_cnt] = rs; - svc_fds[svc_cnt].fd = rs->udp_sock; - svc_fds[svc_cnt].events = POLLIN; - svc_fds[svc_cnt].revents = 0; - return 0; -} - -static int rs_svc_rm_rs(struct rsocket *rs) -{ - int i; - - for (i = 1; i <= svc_cnt; i++) { - if (svc_rss[i] == rs) { - svc_cnt--; - svc_rss[i] = svc_rss[svc_cnt]; - svc_fds[i] = svc_fds[svc_cnt]; - return 0; - } - } - return EBADF; -} - -static void rs_svc_process_sock(void) -{ - struct rs_svc_msg msg; - - read(svc_sock[1], &msg, sizeof msg); - switch (msg.op) { - case RS_SVC_INSERT: - msg.status = rs_svc_add_rs(msg.rs); - break; - case RS_SVC_REMOVE: - msg.status = rs_svc_rm_rs(msg.rs); - break; - default: - msg.status = ENOTSUP; - break; - } - write(svc_sock[1], &msg, sizeof msg); -} - -static void rs_svc_process_rs(struct rsocket *rs) -{ - -} - -static int rs_svc_run(void *arg) -{ - struct rs_svc_msg msg; - int i, ret; - - ret = rs_svc_grow_sets(); - if (ret) { - msg.status = ret; - write(svc_sock[1] &msg, sizeof msg); - return ret; - } - - svc_fds[0].fd = svc_sock[1]; - svc_fds[0].events = POLLIN; - do { - for (i = 0; i <= svc_cnt; i++) - svc_fds[i].revents = 0; - - poll(svc_fds, svc_cnt + 1, -1); - if (svc_fds[0].revents) - rs_svc_process_sock(); - - for (i = 1; i <= svc_cnt; i++) { - if (svc_fds[i].revents) - rs_svc_process_rs(svc_rss[i]); - } - } while (svc_cnt > 1); - - return 0; -} - -static int rs_svc_insert(struct rsocket *rs) +static int rs_add_to_svc(struct rsocket *rs) { struct rs_svc_msg msg; int ret; @@ -533,7 +418,7 @@ err1: return ret; } -static int rs_svc_remove(struct rsocket *rs) +static int rs_remove_from_svc(struct rsocket *rs) { struct rs_svc_msg msg; int ret; @@ -974,7 +859,7 @@ static void ds_free(struct rsocket *rs) struct ds_qp *qp; if (rs->state & (rs_readable | rs_writable)) - rs_svc_remove(rs); + rs_remove_from_svc(rs); if (rs->udp_sock >= 0) close(rs->udp_sock); @@ -1168,7 +1053,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) } else { ret = bind(rs->udp_sock, addr, addrlen); if (!ret) { - ret = rs_svc_insert(rs); + ret = rs_add_to_svc(rs); if (!ret) rs->state = rs_readable | rs_writable; } @@ -1380,7 +1265,7 @@ static int ds_init_ep(struct rsocket *rs) } msg->next = NULL; - ret = rs_svc_insert(rs); + ret = rs_add_to_svc(rs); if (ret) return ret; @@ -2499,12 +2384,13 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4; } else { hdr.length = DS_UDP_IPV6_HDR_LEN; - memcpy(hdr.addr.ipv6.addr, rs->conn_dest->qp->hdr.addr.ipv6.addr, 16); + memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16); } miov[0].iov_base = &hdr; miov[0].iov_len = sizeof hdr; - memcpy(&miov[1], iov, sizeof *iov * iovcnt); + if (iov && iovcnt) + memcpy(&miov[1], iov, sizeof *iov * iovcnt); memset(&msg, 0, sizeof msg); msg.msg_iov = miov; @@ -3656,3 +3542,256 @@ out: return (ret && left == count) ? ret : count - left; } + +static int rs_svc_grow_sets(void) +{ + struct rsocket **rss; + struct pollfd *fds; + void *set; + + set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds)); + if (!set) + return ENOMEM; + + svc_size += 2; + rss = set; + fds = set + sizeof(*rss) * svc_size; + if (svc_cnt) { + memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt); + memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt); + } + + free(svc_rss); + free(svc_fds); + svc_rss = rss; + svc_fds = fds; + return 0; +} + +/* + * Index 0 is reserved for the service's communication socket. + */ +static int rs_svc_add_rs(struct rsocket *rs) +{ + int ret; + + if (svc_cnt >= svc_size - 1) { + ret = rs_svc_grow_sets(); + if (ret) + return ret; + } + + svc_rss[++svc_cnt] = rs; + svc_fds[svc_cnt].fd = rs->udp_sock; + svc_fds[svc_cnt].events = POLLIN; + svc_fds[svc_cnt].revents = 0; + return 0; +} + +static int rs_svc_rm_rs(struct rsocket *rs) +{ + int i; + + for (i = 1; i <= svc_cnt; i++) { + if (svc_rss[i] == rs) { + svc_cnt--; + svc_rss[i] = svc_rss[svc_cnt]; + svc_fds[i] = svc_fds[svc_cnt]; + return 0; + } + } + return EBADF; +} + +static void rs_svc_process_sock(void) +{ + struct rs_svc_msg msg; + + read(svc_sock[1], &msg, sizeof msg); + switch (msg.op) { + case RS_SVC_INSERT: + msg.status = rs_svc_add_rs(msg.rs); + break; + case RS_SVC_REMOVE: + msg.status = rs_svc_rm_rs(msg.rs); + break; + default: + msg.status = ENOTSUP; + break; + } + write(svc_sock[1], &msg, sizeof msg); +} + +static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid) +{ + union ibv_gid gid; + int i, ret; + + for (i = 0; i < 16; i++) { + ret = ibv_query_gid(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, + i, &gid); + if (!memcmp(sgid, &gid, sizeof gid)) + return i; + } + return 0; +} + +static uint8_t rs_svc_path_bits(struct ds_dest *dest) +{ + struct ibv_port_attr attr; + + if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr)) + return (uint8_t) ((1 << attr.lmc) - 1)); + return 0x7f; +} + +static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn) +{ + struct socket_addr saddr; + struct rdma_cm_id *id; + struct ibv_ah_attr attr; + int ret; + + if (dest->ah) { + fastlock_acquire(&rs->slock); + ibv_destroy_ah(dest->ah); + dest->ah = NULL; + fastlock_release(&rs->slock); + } + + ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps); + if (ret) + return; + + memcpy(&saddr, rdma_get_local_addr(dest->qp->cm_id), + ucma_addrlen(rdma_get_local_addr(dest->qp->cm_id))); + if (saddr.sa.sa_family == AF_INET) + saddr.sin.sin_port = 0; + else + saddr.sin6.sin6_port = 0; + ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000); + if (ret) + goto out; + + ret = rdma_resolve_route(id, 2000); + if (ret) + goto out; + + memset(&attr, 0, sizeof attr); + if (id->route.path_rec->hop_limit) { + attr.is_global = 1; + attr.grh.dgid = id->route.path_rec->dgid; + attr.grh.flow_label = id->route.path_rec->flow_label; + attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid); + attr.grh.hop_limit = id->route.path_rec->hop_limit; + attr.grh.traffic_class = id->route.path_rec->traffic_class; + } + attr.dlid = id->route.path_rec->dlid; + attr.sl = id->route.path_rec->sl; + attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest); + attr.static_rate = id->route.path_rec->rate; + attr.port_num = id->port_num; + + fastlock_acquire(&rs->slock); + dest->qpn = qpn; + dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr); + fastlock_release(&rs->slock); +out: + rdma_destroy_id(id); +} + +static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header, + union socket_addr *addr) +{ + return (udp_hdr->tag == DS_UDP_TAG) && + ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET && + udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || + (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 && + udp_hdr->length == DS_UDP_IPV6_HDR_LEN)); +} + +static int rs_svc_comp_local_net(struct ds_dest *dest, + struct ds_udp_header *udp_hdr) +{ + return ((udp_hdr->version == 4) && + (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) && + (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) || + ((udp_hdr->version == 6) && + (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) && + (!memcmp(udp_hdr->addr.ipv6, + &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16))); +} + +static void rs_svc_process_rs(struct rsocket *rs) +{ + struct ds_dest *dest; + struct ds_udp_header *udp_hdr; + union socket_addr addr; + socklen_t addrlen = sizeof addr; + int len, ret; + + ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen); + if (ret < DS_UDP_IPV4_HDR_LEN) + return; + + udp_hdr = (struct ds_udp_header *) svc_buf; + if (!rs_svc_valid_udp_hdr(udp_hdr, &addr)) + return; + + len = ret - udp_hdr->length; + udp_hdr->tag = ntohl(udp_hdr->tag); + udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF; + ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); + if (ret) + return; + + if (!dest->ah || (dest->qpn != udp_hdr->qpn)) + rs_svc_create_ah(rs, dest, udp_hdr->qpn); + + if (!rs_svc_comp_local_net(dest, udp_hdr)) { + + ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); + if (!ret) { + if (!dest->ah) + rs_svc_create_ah(rs, dest, udp_hdr); + rs_send_udp_reply(rs, dest); + } + } + + if (udp_hdr->op == RS_OP_DATA) { + fastlock_acquire(&rs->slock); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + } +} + +static int rs_svc_run(void *arg) +{ + struct rs_svc_msg msg; + int i, ret; + + ret = rs_svc_grow_sets(); + if (ret) { + msg.status = ret; + write(svc_sock[1] &msg, sizeof msg); + return ret; + } + + svc_fds[0].fd = svc_sock[1]; + svc_fds[0].events = POLLIN; + do { + for (i = 0; i <= svc_cnt; i++) + svc_fds[i].revents = 0; + + poll(svc_fds, svc_cnt + 1, -1); + if (svc_fds[0].revents) + rs_svc_process_sock(); + + for (i = 1; i <= svc_cnt; i++) { + if (svc_fds[i].revents) + rs_svc_process_rs(svc_rss[i]); + } + } while (svc_cnt > 1); + + return 0; +}