--- /dev/null
+Bottom: 8e7e91c54177aff763fd2a822f66f4299671cd81
+Top: 44e30115528e8117f25b73ebedc1c7f5ec687bb6
+Author: Sean Hefty <sean.hefty@intel.com>
+Date: 2012-12-03 23:40:39 -0800
+
+Refresh of dsocket
+
+---
+
+diff --git a/src/cma.c b/src/cma.c
+index 49b88a0..0f58966 100755
+--- a/src/cma.c
++++ b/src/cma.c
+@@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id)
+ return 0;
+ }
+
+-static int ucma_addrlen(struct sockaddr *addr)
++int ucma_addrlen(struct sockaddr *addr)
+ {
+ if (!addr)
+ return 0;
+diff --git a/src/cma.h b/src/cma.h
+index 0a0370e..7135a61 100644
+--- a/src/cma.h
++++ b/src/cma.h
+@@ -145,10 +145,12 @@ typedef struct { volatile int val; } atomic_t;
+ #define atomic_set(v, s) ((v)->val = s)
+
+ uint16_t ucma_get_port(struct sockaddr *addr);
++int ucma_addrlen(struct sockaddr *addr);
+ void ucma_set_sid(enum rdma_port_space ps, struct sockaddr *addr,
+ struct sockaddr_ib *sib);
+ int ucma_max_qpsize(struct rdma_cm_id *id);
+ int ucma_complete(struct rdma_cm_id *id);
++
+ static inline int ERR(int err)
+ {
+ errno = err;
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 63bb225..31b946a 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -84,6 +84,8 @@ static int svc_cnt;
+ static int svc_size;
+ static struct rsocket **svc_rss;
+ static struct pollfd *svc_fds;
++static uint8_t svc_buf[RS_SNDLOWAT];
++static int rs_svc_run(void *arg);
+
+ static uint16_t def_iomap_size = 0;
+ static uint16_t def_inline = 64;
+@@ -348,9 +350,7 @@ struct ds_udp_header {
+ uint32_t qpn; /* lower 8-bits reserved */
+ union {
+ uint32_t ipv4;
+- struct {
+- uint8_t addr[16];
+- } ipv6;
++ uint8_t ipv6[16];
+ } addr;
+ };
+
+@@ -378,122 +378,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
+ }
+ }
+
+-static int rs_svc_grow_sets(void)
+-{
+- struct rsocket **rss;
+- struct pollfd *fds;
+- void *set;
+-
+- set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
+- if (!set)
+- return ENOMEM;
+-
+- svc_size += 2;
+- rss = set;
+- fds = set + sizeof(*rss) * svc_size;
+- if (svc_cnt) {
+- memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
+- memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
+- }
+-
+- free(svc_rss);
+- free(svc_fds);
+- svc_rss = rss;
+- svc_fds = fds;
+- return 0;
+-}
+-
+-/*
+- * Index 0 is reserved for the service's communication socket.
+- */
+-static int rs_svc_add_rs(struct rsocket *rs)
+-{
+- int ret;
+-
+- if (svc_cnt >= svc_size - 1) {
+- ret = rs_svc_grow_sets();
+- if (ret)
+- return ret;
+- }
+-
+- svc_rss[++svc_cnt] = rs;
+- svc_fds[svc_cnt].fd = rs->udp_sock;
+- svc_fds[svc_cnt].events = POLLIN;
+- svc_fds[svc_cnt].revents = 0;
+- return 0;
+-}
+-
+-static int rs_svc_rm_rs(struct rsocket *rs)
+-{
+- int i;
+-
+- for (i = 1; i <= svc_cnt; i++) {
+- if (svc_rss[i] == rs) {
+- svc_cnt--;
+- svc_rss[i] = svc_rss[svc_cnt];
+- svc_fds[i] = svc_fds[svc_cnt];
+- return 0;
+- }
+- }
+- return EBADF;
+-}
+-
+-static void rs_svc_process_sock(void)
+-{
+- struct rs_svc_msg msg;
+-
+- read(svc_sock[1], &msg, sizeof msg);
+- switch (msg.op) {
+- case RS_SVC_INSERT:
+- msg.status = rs_svc_add_rs(msg.rs);
+- break;
+- case RS_SVC_REMOVE:
+- msg.status = rs_svc_rm_rs(msg.rs);
+- break;
+- default:
+- msg.status = ENOTSUP;
+- break;
+- }
+- write(svc_sock[1], &msg, sizeof msg);
+-}
+-
+-static void rs_svc_process_rs(struct rsocket *rs)
+-{
+-
+-}
+-
+-static int rs_svc_run(void *arg)
+-{
+- struct rs_svc_msg msg;
+- int i, ret;
+-
+- ret = rs_svc_grow_sets();
+- if (ret) {
+- msg.status = ret;
+- write(svc_sock[1] &msg, sizeof msg);
+- return ret;
+- }
+-
+- svc_fds[0].fd = svc_sock[1];
+- svc_fds[0].events = POLLIN;
+- do {
+- for (i = 0; i <= svc_cnt; i++)
+- svc_fds[i].revents = 0;
+-
+- poll(svc_fds, svc_cnt + 1, -1);
+- if (svc_fds[0].revents)
+- rs_svc_process_sock();
+-
+- for (i = 1; i <= svc_cnt; i++) {
+- if (svc_fds[i].revents)
+- rs_svc_process_rs(svc_rss[i]);
+- }
+- } while (svc_cnt > 1);
+-
+- return 0;
+-}
+-
+-static int rs_svc_insert(struct rsocket *rs)
++static int rs_add_to_svc(struct rsocket *rs)
+ {
+ struct rs_svc_msg msg;
+ int ret;
+@@ -533,7 +418,7 @@ err1:
+ return ret;
+ }
+
+-static int rs_svc_remove(struct rsocket *rs)
++static int rs_remove_from_svc(struct rsocket *rs)
+ {
+ struct rs_svc_msg msg;
+ int ret;
+@@ -974,7 +859,7 @@ static void ds_free(struct rsocket *rs)
+ struct ds_qp *qp;
+
+ if (rs->state & (rs_readable | rs_writable))
+- rs_svc_remove(rs);
++ rs_remove_from_svc(rs);
+
+ if (rs->udp_sock >= 0)
+ close(rs->udp_sock);
+@@ -1168,7 +1053,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ } else {
+ ret = bind(rs->udp_sock, addr, addrlen);
+ if (!ret) {
+- ret = rs_svc_insert(rs);
++ ret = rs_add_to_svc(rs);
+ if (!ret)
+ rs->state = rs_readable | rs_writable;
+ }
+@@ -1380,7 +1265,7 @@ static int ds_init_ep(struct rsocket *rs)
+ }
+ msg->next = NULL;
+
+- ret = rs_svc_insert(rs);
++ ret = rs_add_to_svc(rs);
+ if (ret)
+ return ret;
+
+@@ -2499,12 +2384,13 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+ hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
+ } else {
+ hdr.length = DS_UDP_IPV6_HDR_LEN;
+- memcpy(hdr.addr.ipv6.addr, rs->conn_dest->qp->hdr.addr.ipv6.addr, 16);
++ memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16);
+ }
+
+ miov[0].iov_base = &hdr;
+ miov[0].iov_len = sizeof hdr;
+- memcpy(&miov[1], iov, sizeof *iov * iovcnt);
++ if (iov && iovcnt)
++ memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+ memset(&msg, 0, sizeof msg);
+ msg.msg_iov = miov;
+@@ -3656,3 +3542,256 @@ out:
+
+ return (ret && left == count) ? ret : count - left;
+ }
++
++static int rs_svc_grow_sets(void)
++{
++ struct rsocket **rss;
++ struct pollfd *fds;
++ void *set;
++
++ set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
++ if (!set)
++ return ENOMEM;
++
++ svc_size += 2;
++ rss = set;
++ fds = set + sizeof(*rss) * svc_size;
++ if (svc_cnt) {
++ memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
++ memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
++ }
++
++ free(svc_rss);
++ free(svc_fds);
++ svc_rss = rss;
++ svc_fds = fds;
++ return 0;
++}
++
++/*
++ * Index 0 is reserved for the service's communication socket.
++ */
++static int rs_svc_add_rs(struct rsocket *rs)
++{
++ int ret;
++
++ if (svc_cnt >= svc_size - 1) {
++ ret = rs_svc_grow_sets();
++ if (ret)
++ return ret;
++ }
++
++ svc_rss[++svc_cnt] = rs;
++ svc_fds[svc_cnt].fd = rs->udp_sock;
++ svc_fds[svc_cnt].events = POLLIN;
++ svc_fds[svc_cnt].revents = 0;
++ return 0;
++}
++
++static int rs_svc_rm_rs(struct rsocket *rs)
++{
++ int i;
++
++ for (i = 1; i <= svc_cnt; i++) {
++ if (svc_rss[i] == rs) {
++ svc_cnt--;
++ svc_rss[i] = svc_rss[svc_cnt];
++ svc_fds[i] = svc_fds[svc_cnt];
++ return 0;
++ }
++ }
++ return EBADF;
++}
++
++static void rs_svc_process_sock(void)
++{
++ struct rs_svc_msg msg;
++
++ read(svc_sock[1], &msg, sizeof msg);
++ switch (msg.op) {
++ case RS_SVC_INSERT:
++ msg.status = rs_svc_add_rs(msg.rs);
++ break;
++ case RS_SVC_REMOVE:
++ msg.status = rs_svc_rm_rs(msg.rs);
++ break;
++ default:
++ msg.status = ENOTSUP;
++ break;
++ }
++ write(svc_sock[1], &msg, sizeof msg);
++}
++
++static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
++{
++ union ibv_gid gid;
++ int i, ret;
++
++ for (i = 0; i < 16; i++) {
++ ret = ibv_query_gid(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num,
++ i, &gid);
++ if (!memcmp(sgid, &gid, sizeof gid))
++ return i;
++ }
++ return 0;
++}
++
++static uint8_t rs_svc_path_bits(struct ds_dest *dest)
++{
++ struct ibv_port_attr attr;
++
++ if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
++ return (uint8_t) ((1 << attr.lmc) - 1));
++ return 0x7f;
++}
++
++static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
++{
++ struct socket_addr saddr;
++ struct rdma_cm_id *id;
++ struct ibv_ah_attr attr;
++ int ret;
++
++ if (dest->ah) {
++ fastlock_acquire(&rs->slock);
++ ibv_destroy_ah(dest->ah);
++ dest->ah = NULL;
++ fastlock_release(&rs->slock);
++ }
++
++ ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps);
++ if (ret)
++ return;
++
++ memcpy(&saddr, rdma_get_local_addr(dest->qp->cm_id),
++ ucma_addrlen(rdma_get_local_addr(dest->qp->cm_id)));
++ if (saddr.sa.sa_family == AF_INET)
++ saddr.sin.sin_port = 0;
++ else
++ saddr.sin6.sin6_port = 0;
++ ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000);
++ if (ret)
++ goto out;
++
++ ret = rdma_resolve_route(id, 2000);
++ if (ret)
++ goto out;
++
++ memset(&attr, 0, sizeof attr);
++ if (id->route.path_rec->hop_limit) {
++ attr.is_global = 1;
++ attr.grh.dgid = id->route.path_rec->dgid;
++ attr.grh.flow_label = id->route.path_rec->flow_label;
++ attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid);
++ attr.grh.hop_limit = id->route.path_rec->hop_limit;
++ attr.grh.traffic_class = id->route.path_rec->traffic_class;
++ }
++ attr.dlid = id->route.path_rec->dlid;
++ attr.sl = id->route.path_rec->sl;
++ attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest);
++ attr.static_rate = id->route.path_rec->rate;
++ attr.port_num = id->port_num;
++
++ fastlock_acquire(&rs->slock);
++ dest->qpn = qpn;
++ dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
++ fastlock_release(&rs->slock);
++out:
++ rdma_destroy_id(id);
++}
++
++static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
++ union socket_addr *addr)
++{
++ return (udp_hdr->tag == DS_UDP_TAG) &&
++ ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET &&
++ udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
++ (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 &&
++ udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
++}
++
++static int rs_svc_comp_local_net(struct ds_dest *dest,
++ struct ds_udp_header *udp_hdr)
++{
++ return ((udp_hdr->version == 4) &&
++ (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
++ (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
++ ((udp_hdr->version == 6) &&
++ (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
++ (!memcmp(udp_hdr->addr.ipv6,
++ &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
++}
++
++static void rs_svc_process_rs(struct rsocket *rs)
++{
++ struct ds_dest *dest;
++ struct ds_udp_header *udp_hdr;
++ union socket_addr addr;
++ socklen_t addrlen = sizeof addr;
++ int len, ret;
++
++ ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
++ if (ret < DS_UDP_IPV4_HDR_LEN)
++ return;
++
++ udp_hdr = (struct ds_udp_header *) svc_buf;
++ if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
++ return;
++
++ len = ret - udp_hdr->length;
++ udp_hdr->tag = ntohl(udp_hdr->tag);
++ udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
++ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
++ if (ret)
++ return;
++
++ if (!dest->ah || (dest->qpn != udp_hdr->qpn))
++ rs_svc_create_ah(rs, dest, udp_hdr->qpn);
++
++ if (!rs_svc_comp_local_net(dest, udp_hdr)) {
++
++ ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
++ if (!ret) {
++ if (!dest->ah)
++ rs_svc_create_ah(rs, dest, udp_hdr);
++ rs_send_udp_reply(rs, dest);
++ }
++ }
++
++ if (udp_hdr->op == RS_OP_DATA) {
++ fastlock_acquire(&rs->slock);
++ ret = dsend(rs, buf, len, flags);
++ fastlock_release(&rs->slock);
++ }
++}
++
++static int rs_svc_run(void *arg)
++{
++ struct rs_svc_msg msg;
++ int i, ret;
++
++ ret = rs_svc_grow_sets();
++ if (ret) {
++ msg.status = ret;
++ write(svc_sock[1] &msg, sizeof msg);
++ return ret;
++ }
++
++ svc_fds[0].fd = svc_sock[1];
++ svc_fds[0].events = POLLIN;
++ do {
++ for (i = 0; i <= svc_cnt; i++)
++ svc_fds[i].revents = 0;
++
++ poll(svc_fds, svc_cnt + 1, -1);
++ if (svc_fds[0].revents)
++ rs_svc_process_sock();
++
++ for (i = 1; i <= svc_cnt; i++) {
++ if (svc_fds[i].revents)
++ rs_svc_process_rs(svc_rss[i]);
++ }
++ } while (svc_cnt > 1);
++
++ return 0;
++}