]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of dsocket
authorSean Hefty <sean.hefty@intel.com>
Wed, 5 Dec 2012 20:53:38 +0000 (12:53 -0800)
committerSean Hefty <sean.hefty@intel.com>
Wed, 5 Dec 2012 20:53:38 +0000 (12:53 -0800)
src/rsocket.c

index 018af90a327e2bde724314982233f0120eeabb27..9996d3331f478a6eb26f490aa6e13a391726a2e8 100644 (file)
@@ -48,6 +48,7 @@
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
+#include <search.h>
 
 #include <rdma/rdma_cma.h>
 #include <rdma/rdma_verbs.h>
@@ -85,7 +86,7 @@ static int svc_size;
 static struct rsocket **svc_rss;
 static struct pollfd *svc_fds;
 static uint8_t svc_buf[RS_SNDLOWAT];
-static int rs_svc_run(void *arg);
+static void *rs_svc_run(void *arg);
 
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
@@ -241,7 +242,7 @@ struct ds_dest {
 };
 
 struct ds_qp {
-       dlist_t           list;
+       dlist_entry       list;
        struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
        struct ds_header  hdr;
@@ -362,15 +363,15 @@ struct ds_udp_header {
 static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
 {
        if (!rs->qp_list)
-               list_init(&qp->list);
+               dlist_init(&qp->list);
        else
-               list_insert_head(&qp->list, &rs->qp_list->list);
-       rs->qp_list = *qp;
+               dlist_insert_head(&qp->list, &rs->qp_list->list);
+       rs->qp_list = qp;
 }
 
 static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
 {
-       if (qp->list.next != qp->list) {
+       if (qp->list.next != &qp->list) {
                rs->qp_list = ds_next_qp(qp);
                dlist_remove(&qp->list);
        } else {
@@ -385,7 +386,7 @@ static int rs_add_to_svc(struct rsocket *rs)
 
        pthread_mutex_lock(&mut);
        if (!svc_cnt) {
-               ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock);
+               ret = socketpair(AF_INET, SOCK_STREAM, 0, svc_sock);
                if (ret)
                        goto err1;
 
@@ -402,7 +403,7 @@ static int rs_add_to_svc(struct rsocket *rs)
        write(svc_sock[0], &msg, sizeof msg);
        read(svc_sock[0], &msg, sizeof msg);
        ret = ERR(msg.status);
-       if (ret && !svn_cnt)
+       if (ret && !svc_cnt)
                goto err3;
 
        pthread_mutex_unlock(&mut);
@@ -430,7 +431,7 @@ static int rs_remove_from_svc(struct rsocket *rs)
        write(svc_sock[0], &msg, sizeof msg);
        read(svc_sock[0], &msg, sizeof msg);
        ret = ERR(msg.status);
-       if (!svn_cnt) {
+       if (!svc_cnt) {
                pthread_join(svc_id, NULL);
                close(svc_sock[0]);
                close(svc_sock[1]);
@@ -440,6 +441,19 @@ static int rs_remove_from_svc(struct rsocket *rs)
        return ret;
 }
 
+static int ds_compare_addr(const void *dst1, const void *dst2)
+{
+       const struct sockaddr *sa1, *sa2;
+       size_t len;
+
+       sa1 = (const struct sockaddr *) dst1;
+       sa2 = (const struct sockaddr *) dst2;
+
+       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
+       return memcmp(dst1, dst2, len);
+}
+
 static int rs_value_to_scale(int value, int bits)
 {
        return value <= (1 << (bits - 1)) ?
@@ -515,7 +529,7 @@ out:
        pthread_mutex_unlock(&mut);
 }
 
-static int rs_insert(struct rsocket *rs, index)
+static int rs_insert(struct rsocket *rs, int index)
 {
        pthread_mutex_lock(&mut);
        rs->index = idm_set(&idm, index, rs);
@@ -644,11 +658,11 @@ static int rs_init_bufs(struct rsocket *rs)
 
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
        if (!rs->rmsg)
-               return ERR(ENOEMEM);
+               return ERR(ENOMEM);
 
        rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
        if (!rs->sbuf)
-               return ERR(ENOEMEM);
+               return ERR(ENOMEM);
 
        rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
        if (!rs->smr)
@@ -658,7 +672,7 @@ static int rs_init_bufs(struct rsocket *rs)
              sizeof(*rs->target_iomap) * rs->target_iomap_size;
        rs->target_buffer_list = malloc(len);
        if (!rs->target_buffer_list)
-               return ERR(ENOEMEM);
+               return ERR(ENOMEM);
 
        rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
@@ -671,7 +685,7 @@ static int rs_init_bufs(struct rsocket *rs)
 
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
-               return ERR(ENOEMEM);
+               return ERR(ENOMEM);
 
        rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
        if (!rs->rmr)
@@ -753,9 +767,9 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
 
        sge.addr = (uintptr_t) buf;
        sge.length = RS_SNDLOWAT;
-       sge.lkey = qp->rmr;
+       sge.lkey = qp->rmr->lkey;
 
-       wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
+       wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf));
        wr.next = NULL;
        wr.sg_list = &sge;
        wr.num_sge = 1;
@@ -843,7 +857,7 @@ static void ds_free_qp(struct ds_qp *qp)
 
        if (qp->cm_id) {
                if (qp->cm_id->qp) {
-                       tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest);
+                       tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr);
                        epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
                                  qp->cm_id->recv_cq_channel->fd, NULL);
                        rdma_destroy_qp(qp->cm_id);
@@ -856,8 +870,6 @@ static void ds_free_qp(struct ds_qp *qp)
 
 static void ds_free(struct rsocket *rs)
 {
-       struct ds_qp *qp;
-
        if (rs->state & (rs_readable | rs_writable))
                rs_remove_from_svc(rs);
 
@@ -870,8 +882,8 @@ static void ds_free(struct rsocket *rs)
        if (rs->dmsg)
                free(rs->dmsg);
 
-       if (rs->smsg)
-               free(rs->smsg);
+       if (rs->smsg_free)
+               free(rs->smsg_free);
 
        while (rs->qp_list) {
                ds_remove_qp(rs, rs->qp_list);
@@ -1251,7 +1263,7 @@ static int ds_init_ep(struct rsocket *rs)
 
        rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
        if (!rs->dmsg)
-               return ERR(ENOEMEM);
+               return ERR(ENOMEM);
 
        rs->sbuf_bytes_avail = rs->sbuf_size;
        rs->sqe_avail = rs->sq_size;
@@ -1273,27 +1285,14 @@ static int ds_init_ep(struct rsocket *rs)
        return 0;
 }
 
-static int ds_compare_addr(const void *dst1, const void *dst2)
-{
-       const struct sockaddr *sa1, *sa2;
-       size_t len;
-
-       sa1 = (const struct sockaddr *) dst1;
-       sa2 = (const struct sockaddr *) dst2;
-
-       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
-             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
-       return memcmp(dst1, dst2, len);
-}
-
 static int rs_any_addr(const union socket_addr *addr)
 {
        if (addr->sa.sa_family == AF_INET) {
-               return (addr->sin.sin_addr == INADDR_ANY ||
-                       addr->sin.sin_addr == INADDR_LOOPBACK);
+               return (addr->sin.sin_addr.s_addr == INADDR_ANY ||
+                       addr->sin.sin_addr.s_addr == INADDR_LOOPBACK);
        } else {
-               return (addr->sin6.sin6_addr == in6addr_any ||
-                       addr->sin6.sin6_addr == in6addr_loopback);
+               return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) ||
+                       !memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16));
        }
 }
 
@@ -1332,7 +1331,7 @@ static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
                hdr->version = 4;
                hdr->length = DS_IPV4_HDR_LEN;
                hdr->port = addr->sin.sin_port;
-               hdr->addr.ipv4 = addr->sin.sin_addr;
+               hdr->addr.ipv4 = addr->sin.sin_addr.s_addr;
        } else {
                hdr->version = 6;
                hdr->length = DS_IPV6_HDR_LEN;
@@ -1373,7 +1372,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
 {
        struct ibv_qp_init_attr qp_attr;
        struct epoll_event event;
-       int ret;
+       int i, ret;
 
        *qp = calloc(1, sizeof(struct ds_qp));
        if (!*qp)
@@ -1442,7 +1441,8 @@ static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
        if (rs->qp_list) {
                *qp = rs->qp_list;
                do {
-                       if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+                       if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id),
+                                            src_addr))
                                return 0;
 
                        *qp = ds_next_qp(*qp);
@@ -1475,11 +1475,11 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
        if (ret)
                goto out;
 
-       ret = ds_get_qp(rs, src_addr, src_len, &qp);
+       ret = ds_get_qp(rs, &src_addr, src_len, &qp);
        if (ret)
                goto out;
 
-       if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) {
+       if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) {
                *dest = calloc(1, sizeof(struct ds_dest));
                if (!*dest) {
                        ret = ERR(ENOMEM);
@@ -1488,7 +1488,7 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
 
                memcpy(&(*dest)->addr, addr, addrlen);
                (*dest)->qp = qp;
-               tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+               tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr);
        }
 out:
        fastlock_release(&rs->map_lock);
@@ -1563,7 +1563,7 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge,
        wr.sg_list = sge;
        wr.num_sge = 1;
        wr.opcode = IBV_WR_SEND;
-       wr.send_flags = (sge.length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
+       wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0;
        wr.wr.ud.ah = rs->conn_dest->ah;
        wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
        wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
@@ -2189,7 +2189,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        rs = idm_at(&idm, socket);
        if (rs->type == SOCK_DGRAM) {
                fastlock_acquire(&rs->slock);
-               ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen);
+               ret = ds_recvfrom(rs, buf, len, flags, NULL, 0);
                fastlock_release(&rs->slock);
                return ret;
        }
@@ -2252,6 +2252,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
 ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
                  struct sockaddr *src_addr, socklen_t *addrlen)
 {
+       struct rsocket *rs;
        int ret;
 
        rs = idm_at(&idm, socket);
@@ -2369,7 +2370,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
        struct ds_udp_header hdr;
        struct msghdr msg;
        struct iovec miov[8];
-       struct ds_qp *qp;
 
        if (iovcnt > 8)
                return ERR(ENOTSUP);
@@ -2384,7 +2384,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
                hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4;
        } else {
                hdr.length = DS_UDP_IPV6_HDR_LEN;
-               memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16);
+               memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16);
        }
 
        miov[0].iov_base = &hdr;
@@ -2393,11 +2393,11 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
                memcpy(&miov[1], iov, sizeof *iov * iovcnt);
 
        memset(&msg, 0, sizeof msg);
-       msg.msg_name = rs->conn_dest->addr;
+       msg.msg_name = &rs->conn_dest->addr;
        msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa);
        msg.msg_iov = miov;
        msg.msg_iovlen = iovcnt + 1;
-       return sendmsg(rs->udp_sock, msg, flags);
+       return sendmsg(rs->udp_sock, &msg, flags);
 }
 
 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
@@ -2405,8 +2405,8 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
 {
        struct iovec iov;
        if (buf && len) {
-               iov.iov_base = buf;
-               iov_iov_len = len;
+               iov.iov_base = (void *) buf;
+               iov.iov_len = len;
                return ds_sendv_udp(rs, &iov, 1, flags, op);
        } else {
                return ds_sendv_udp(rs, NULL, 0, flags, op);
@@ -2418,7 +2418,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
        struct ds_smsg *msg;
        struct ibv_sge sge;
        uint64_t offset;
-       int flags, ret = 0;
+       int ret = 0;
 
        if (!rs->conn_dest->ah)
                return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
@@ -2433,7 +2433,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
        rs->smsg_free = msg->next;
        rs->sqe_avail--;
 
-       memcpy((void *) msg, rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
+       memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length);
        memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len);
        sge.addr = (uintptr_t) msg;
        sge.length = rs->conn_dest->qp->hdr.length + len;
@@ -2539,6 +2539,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
 {
        struct rsocket *rs;
+       int ret;
 
        rs = idm_at(&idm, socket);
        if (rs->type == SOCK_STREAM) {
@@ -2550,7 +2551,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
 
        fastlock_acquire(&rs->slock);
        if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-               ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+               ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest);
                if (ret)
                        goto out;
        }
@@ -3021,8 +3022,6 @@ int rshutdown(int socket, int how)
 
 static void ds_shutdown(struct rsocket *rs)
 {
-       int ret = 0;
-
        if (rs->fd_flags & O_NONBLOCK)
                rs_set_nonblocking(rs, 0);
 
@@ -3099,7 +3098,7 @@ int rsetsockopt(int socket, int level, int optname,
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
        if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-               ret = setsockopt(rs->udp_sock, optname, optval, optlen);
+               ret = setsockopt(rs->udp_sock, level, optname, optval, optlen);
                if (ret)
                        return ret;
        }
@@ -3643,13 +3642,13 @@ static uint8_t rs_svc_path_bits(struct ds_dest *dest)
        struct ibv_port_attr attr;
 
        if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr))
-               return (uint8_t) ((1 << attr.lmc) - 1));
+               return (uint8_t) ((1 << attr.lmc) - 1);
        return 0x7f;
 }
 
 static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn)
 {
-       struct socket_addr saddr;
+       union socket_addr saddr;
        struct rdma_cm_id *id;
        struct ibv_ah_attr attr;
        int ret;
@@ -3661,7 +3660,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
                fastlock_release(&rs->slock);
        }
 
-       ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps);
+       ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps);
        if  (ret)
                return;
 
@@ -3671,7 +3670,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
                saddr.sin.sin_port = 0;
        else
                saddr.sin6.sin6_port = 0;
-       ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000);
+       ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000);
        if (ret)
                goto out;
 
@@ -3684,7 +3683,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t
                attr.is_global = 1;
                attr.grh.dgid = id->route.path_rec->dgid;
                attr.grh.flow_label = id->route.path_rec->flow_label;
-               attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid);
+               attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid);
                attr.grh.hop_limit = id->route.path_rec->hop_limit;
                attr.grh.traffic_class = id->route.path_rec->traffic_class;
        }
@@ -3702,13 +3701,13 @@ out:
        rdma_destroy_id(id);
 }
 
-static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header,
+static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr,
                                union socket_addr *addr)
 {
        return (udp_hdr->tag == DS_UDP_TAG) &&
-               ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET &&
+               ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET &&
                  udp_hdr->length == DS_UDP_IPV4_HDR_LEN) ||
-                (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 &&
+                (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 &&
                  udp_hdr->length == DS_UDP_IPV6_HDR_LEN));
 }
 
@@ -3771,7 +3770,7 @@ static void rs_svc_process_rs(struct rsocket *rs)
        cur_dest = rs->conn_dest;
        if (udp_hdr->op == RS_OP_DATA) {
                rs->conn_dest = &dest->qp->dest;
-               rs_svc_forward(rs, buf, len, addr);
+               rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr);
        }
 
        rs->conn_dest = dest;
@@ -3780,7 +3779,7 @@ static void rs_svc_process_rs(struct rsocket *rs)
        fastlock_release(&rs->slock);
 }
 
-static int rs_svc_run(void *arg)
+static void *rs_svc_run(void *arg)
 {
        struct rs_svc_msg msg;
        int i, ret;
@@ -3788,8 +3787,8 @@ static int rs_svc_run(void *arg)
        ret = rs_svc_grow_sets();
        if (ret) {
                msg.status = ret;
-               write(svc_sock[1] &msg, sizeof msg);
-               return ret;
+               write(svc_sock[1], &msg, sizeof msg);
+               return (void *) (uintptr_t) ret;
        }
 
        svc_fds[0].fd = svc_sock[1];
@@ -3808,5 +3807,5 @@ static int rs_svc_run(void *arg)
                }
        } while (svc_cnt > 1);
 
-       return 0;
+       return NULL;
 }