]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of dsocket
authorSean Hefty <sean.hefty@intel.com>
Tue, 4 Dec 2012 07:40:39 +0000 (23:40 -0800)
committerSean Hefty <sean.hefty@intel.com>
Tue, 4 Dec 2012 07:40:39 +0000 (23:40 -0800)
src/cma.c
src/cma.h
src/rsocket.c

index 49b88a0eb7a84e7820c709468fc61e9f086819e8..0f589668bce186759cbc2b40875356cdb6eb91e9 100755 (executable)
--- a/src/cma.c
+++ b/src/cma.c
@@ -513,7 +513,7 @@ int rdma_destroy_id(struct rdma_cm_id *id)
        return 0;
 }
 
-static int ucma_addrlen(struct sockaddr *addr)
+int ucma_addrlen(struct sockaddr *addr)
 {
        if (!addr)
                return 0;
index 0a0370eeed0ea2d1ef911f4c545f80303a861869..7135a612467aae848e6d627be9dc89ec326eab44 100644 (file)
--- a/src/cma.h
+++ b/src/cma.h
@@ -145,10 +145,12 @@ typedef struct { volatile int val; } atomic_t;
 #define atomic_set(v, s) ((v)->val = s)
 
 uint16_t ucma_get_port(struct sockaddr *addr);
+int ucma_addrlen(struct sockaddr *addr);
 void ucma_set_sid(enum rdma_port_space ps, struct sockaddr *addr,
                  struct sockaddr_ib *sib);
 int ucma_max_qpsize(struct rdma_cm_id *id);
 int ucma_complete(struct rdma_cm_id *id);
+
 static inline int ERR(int err)
 {
        errno = err;
index 63bb225c613ee4389cca0e1da4ecb3e406407e62..31b946a63f121c5469147a1f595dc18920cbac34 100644 (file)
@@ -84,6 +84,8 @@ static int svc_cnt;
 static int svc_size;
 static struct rsocket **svc_rss;
 static struct pollfd *svc_fds;
+static uint8_t svc_buf[RS_SNDLOWAT];
+static int rs_svc_run(void *arg);
 
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
@@ -348,9 +350,7 @@ struct ds_udp_header {
        uint32_t          qpn;  /* lower 8-bits reserved */
        union {
                uint32_t ipv4;
-               struct {
-                       uint8_t  addr[16];
-               } ipv6;
+               uint8_t  ipv6[16];
        } addr;
 };
 
@@ -378,122 +378,7 @@ static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
        }
 }
 
-static int rs_svc_grow_sets(void)
-{
-       struct rsocket **rss;
-       struct pollfd *fds;
-       void *set;
-
-       set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
-       if (!set)
-               return ENOMEM;
-
-       svc_size += 2;
-       rss = set;
-       fds = set + sizeof(*rss) * svc_size;
-       if (svc_cnt) {
-               memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
-               memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
-       }
-
-       free(svc_rss);
-       free(svc_fds);
-       svc_rss = rss;
-       svc_fds = fds;
-       return 0;
-}
-
-/*
- * Index 0 is reserved for the service's communication socket.
- */
-static int rs_svc_add_rs(struct rsocket *rs)
-{
-       int ret;
-
-       if (svc_cnt >= svc_size - 1) {
-               ret = rs_svc_grow_sets();
-               if (ret)
-                       return ret;
-       }
-
-       svc_rss[++svc_cnt] = rs;
-       svc_fds[svc_cnt].fd = rs->udp_sock;
-       svc_fds[svc_cnt].events = POLLIN;
-       svc_fds[svc_cnt].revents = 0;
-       return 0;
-}
-
-static int rs_svc_rm_rs(struct rsocket *rs)
-{
-       int i;
-
-       for (i = 1; i <= svc_cnt; i++) {
-               if (svc_rss[i] == rs) {
-                       svc_cnt--;
-                       svc_rss[i] = svc_rss[svc_cnt];
-                       svc_fds[i] = svc_fds[svc_cnt];
-                       return 0;
-               }
-       }
-       return EBADF;
-}
-
-static void rs_svc_process_sock(void)
-{
-       struct rs_svc_msg msg;
-
-       read(svc_sock[1], &msg, sizeof msg);
-       switch (msg.op) {
-       case RS_SVC_INSERT:
-               msg.status = rs_svc_add_rs(msg.rs);
-               break;
-       case RS_SVC_REMOVE:
-               msg.status = rs_svc_rm_rs(msg.rs);
-               break;
-       default:
-               msg.status = ENOTSUP;
-               break;
-       }
-       write(svc_sock[1], &msg, sizeof msg);
-}
-
-static void rs_svc_process_rs(struct rsocket *rs)
-{
-
-}
-
-static int rs_svc_run(void *arg)
-{
-       struct rs_svc_msg msg;
-       int i, ret;
-
-       ret = rs_svc_grow_sets();
-       if (ret) {
-               msg.status = ret;
-               write(svc_sock[1] &msg, sizeof msg);
-               return ret;
-       }
-
-       svc_fds[0].fd = svc_sock[1];
-       svc_fds[0].events = POLLIN;
-       do {
-               for (i = 0; i <= svc_cnt; i++)
-                       svc_fds[i].revents = 0;
-
-               poll(svc_fds, svc_cnt + 1, -1);
-               if (svc_fds[0].revents)
-                       rs_svc_process_sock();
-
-               for (i = 1; i <= svc_cnt; i++) {
-                       if (svc_fds[i].revents)
-                               rs_svc_process_rs(svc_rss[i]);
-               }
-       } while (svc_cnt > 1);
-
-       return 0;
-}
-
-static int rs_svc_insert(struct rsocket *rs)
+static int rs_add_to_svc(struct rsocket *rs)
 {
        struct rs_svc_msg msg;
        int ret;
@@ -533,7 +418,7 @@ err1:
        return ret;
 }
 
-static int rs_svc_remove(struct rsocket *rs)
+static int rs_remove_from_svc(struct rsocket *rs)
 {
        struct rs_svc_msg msg;
        int ret;
@@ -974,7 +859,7 @@ static void ds_free(struct rsocket *rs)
        struct ds_qp *qp;
 
        if (rs->state & (rs_readable | rs_writable))
-               rs_svc_remove(rs);
+               rs_remove_from_svc(rs);
 
        if (rs->udp_sock >= 0)
                close(rs->udp_sock);
@@ -1168,7 +1053,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        } else {
                ret = bind(rs->udp_sock, addr, addrlen);
                if (!ret) {
-                       ret = rs_svc_insert(rs);
+                       ret = rs_add_to_svc(rs);
                        if (!ret)
                                rs->state = rs_readable | rs_writable;
                }
@@ -1380,7 +1265,7 @@ static int ds_init_ep(struct rsocket *rs)
        }
        msg->next = NULL;
 
-       ret = rs_svc_insert(rs);
+       ret = rs_add_to_svc(rs);
        if (ret)
                return ret;
 
@@ -2499,12 +2384,13 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
                hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
        } else {
                hdr.length = DS_UDP_IPV6_HDR_LEN;
-               memcpy(hdr.addr.ipv6.addr, rs->conn_dest->qp->hdr.addr.ipv6.addr, 16);
+               memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16);
        }
 
        miov[0].iov_base = &hdr;
        miov[0].iov_len = sizeof hdr;
-       memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+       if (iov && iovcnt)
+               memcpy(&miov[1], iov, sizeof *iov * iovcnt);
 
        memset(&msg, 0, sizeof msg);
        msg.msg_iov = miov;
@@ -3656,3 +3542,256 @@ out:
 
        return (ret && left == count) ? ret : count - left;
 }
+
+static int rs_svc_grow_sets(void)
+{
+       struct rsocket **rss;
+       struct pollfd *fds;
+       void *set;
+
+       set = calloc(svc_size + 2, sizeof(*rss) + sizeof(*fds));
+       if (!set)
+               return ENOMEM;
+
+       svc_size += 2;
+       rss = set;
+       fds = set + sizeof(*rss) * svc_size;
+       if (svc_cnt) {
+               memcpy(rss, svc_rss, sizeof(*rss) * svc_cnt);
+               memcpy(fds, svc_fds, sizeof(*fds) * svc_cnt);
+       }
+
+       free(svc_rss);
+       free(svc_fds);
+       svc_rss = rss;
+       svc_fds = fds;
+       return 0;
+}
+
+/*
+ * Index 0 is reserved for the service's communication socket.
+ */
+static int rs_svc_add_rs(struct rsocket *rs)
+{
+       int ret;
+
+       if (svc_cnt >= svc_size - 1) {
+               ret = rs_svc_grow_sets();
+               if (ret)
+                       return ret;
+       }
+
+       svc_rss[++svc_cnt] = rs;
+       svc_fds[svc_cnt].fd = rs->udp_sock;
+       svc_fds[svc_cnt].events = POLLIN;
+       svc_fds[svc_cnt].revents = 0;
+       return 0;
+}
+
+static int rs_svc_rm_rs(struct rsocket *rs)
+{
+       int i;
+
+       for (i = 1; i <= svc_cnt; i++) {
+               if (svc_rss[i] == rs) {
+                       svc_cnt--;
+                       svc_rss[i] = svc_rss[svc_cnt];
+                       svc_fds[i] = svc_fds[svc_cnt];
+                       return 0;
+               }
+       }
+       return EBADF;
+}
+
+static void rs_svc_process_sock(void)
+{
+       struct rs_svc_msg msg;
+
+       read(svc_sock[1], &msg, sizeof msg);
+       switch (msg.op) {
+       case RS_SVC_INSERT:
+               msg.status = rs_svc_add_rs(msg.rs);
+               break;
+       case RS_SVC_REMOVE:
+               msg.status = rs_svc_rm_rs(msg.rs);
+               break;
+       default:
+               msg.status = ENOTSUP;
+               break;
+       }
+       write(svc_sock[1], &msg, sizeof msg);
+}
+
+static uint8_t rs_svc_sgid_index(struct ds_dest *dest, union ibv_gid *sgid)
+{
+       union ibv_gid gid;
+       int i, ret;
+
+       for (i = 0; i < 16; i++) {
+               ret = ibv_query_gid(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num,
+                                   i, &gid);
+               if (!memcmp(sgid, &gid, sizeof gid))
+                       return i;
+       }
+       return 0;
+}
+
+static uint8_t rs_svc_path_bits(struct ds_dest *dest)
+{
+       struct ibv_port_attr attr;
+
+       if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
+               return (uint8_t) ((1 << attr.lmc) - 1));
+       return 0x7f;
+}
+
+static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
+{
+       struct socket_addr saddr;
+       struct rdma_cm_id *id;
+       struct ibv_ah_attr attr;
+       int ret;
+
+       if (dest->ah) {
+               fastlock_acquire(&rs->slock);
+               ibv_destroy_ah(dest->ah);
+               dest->ah = NULL;
+               fastlock_release(&rs->slock);
+       }
+
+       ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps);
+       if  (ret)
+               return;
+
+       memcpy(&saddr, rdma_get_local_addr(dest->qp->cm_id),
+              ucma_addrlen(rdma_get_local_addr(dest->qp->cm_id)));
+       if (saddr.sa.sa_family == AF_INET)
+               saddr.sin.sin_port = 0;
+       else
+               saddr.sin6.sin6_port = 0;
+       ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000);
+       if (ret)
+               goto out;
+
+       ret = rdma_resolve_route(id, 2000);
+       if (ret)
+               goto out;
+
+       memset(&attr, 0, sizeof attr);
+       if (id->route.path_rec->hop_limit) {
+               attr.is_global = 1;
+               attr.grh.dgid = id->route.path_rec->dgid;
+               attr.grh.flow_label = id->route.path_rec->flow_label;
+               attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid);
+               attr.grh.hop_limit = id->route.path_rec->hop_limit;
+               attr.grh.traffic_class = id->route.path_rec->traffic_class;
+       }
+       attr.dlid = id->route.path_rec->dlid;
+       attr.sl = id->route.path_rec->sl;
+       attr.src_path_bits = id->route.path_rec->slid & rs_svc_path_bits(dest);
+       attr.static_rate = id->route.path_rec->rate;
+       attr.port_num  = id->port_num;
+
+       fastlock_acquire(&rs->slock);
+       dest->qpn = qpn;
+       dest->ah = ibv_create_ah(dest->qp->cm_id->pd, &attr);
+       fastlock_release(&rs->slock);
+out:
+       rdma_destroy_id(id);
+}
+
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
+                               union socket_addr *addr)
+{
+       return (udp_hdr->tag == DS_UDP_TAG) &&
+               ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET &&
+                 udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
+                (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 &&
+                 udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
+}
+
+static int rs_svc_comp_local_net(struct ds_dest *dest,
+                                struct ds_udp_header *udp_hdr)
+{
+       return ((udp_hdr->version == 4) &&
+               (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET) &&
+               (udp_hdr->addr.ipv4 == dest->qp->cm_id->route.addr.src_sin.sin_addr)) ||
+              ((udp_hdr->version == 6) &&
+               (dest->qp->cm_id->route.addr.src_addr.sa_family == AF_INET6) &&
+               (!memcmp(udp_hdr->addr.ipv6,
+                        &dest->qp->cm_id->route.addr.src_sin6.sin6_addr, 16)));
+}
+
+static void rs_svc_process_rs(struct rsocket *rs)
+{
+       struct ds_dest *dest;
+       struct ds_udp_header *udp_hdr;
+       union socket_addr addr;
+       socklen_t addrlen = sizeof addr;
+       int len, ret;
+
+       ret = recvfrom(rs->udp_sock, svc_buf, sizeof svc_buf, 0, &addr.sa, &addrlen);
+       if (ret < DS_UDP_IPV4_HDR_LEN)
+               return;
+
+       udp_hdr = (struct ds_udp_header *) svc_buf;
+       if (!rs_svc_valid_udp_hdr(udp_hdr, &addr))
+               return;
+
+       len = ret - udp_hdr->length;
+       udp_hdr->tag = ntohl(udp_hdr->tag);
+       udp_hdr->qpn = ntohl(udp_hdr->qpn) & 0xFFFFFF;
+       ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+       if (ret)
+               return;
+
+       if (!dest->ah || (dest->qpn != udp_hdr->qpn))
+               rs_svc_create_ah(rs, dest, udp_hdr->qpn);
+
+       if (!rs_svc_comp_local_net(dest, udp_hdr)) {
+
+               ret = ds_get_dest(rs, &addr.sa, addrlen, &dest);
+               if (!ret) {
+                       if (!dest->ah)
+                               rs_svc_create_ah(rs, dest, udp_hdr);
+                       rs_send_udp_reply(rs, dest);
+               }
+       }
+
+       if (udp_hdr->op == RS_OP_DATA) {
+               fastlock_acquire(&rs->slock);
+               ret = dsend(rs, buf, len, flags);
+               fastlock_release(&rs->slock);
+       }
+}
+
+static int rs_svc_run(void *arg)
+{
+       struct rs_svc_msg msg;
+       int i, ret;
+
+       ret = rs_svc_grow_sets();
+       if (ret) {
+               msg.status = ret;
+               write(svc_sock[1] &msg, sizeof msg);
+               return ret;
+       }
+
+       svc_fds[0].fd = svc_sock[1];
+       svc_fds[0].events = POLLIN;
+       do {
+               for (i = 0; i <= svc_cnt; i++)
+                       svc_fds[i].revents = 0;
+
+               poll(svc_fds, svc_cnt + 1, -1);
+               if (svc_fds[0].revents)
+                       rs_svc_process_sock();
+
+               for (i = 1; i <= svc_cnt; i++) {
+                       if (svc_fds[i].revents)
+                               rs_svc_process_rs(svc_rss[i]);
+               }
+       } while (svc_cnt > 1);
+
+       return 0;
+}