From b44050733707b62dcb809f582c7cc27049154b4f Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 5 Dec 2012 12:53:38 -0800 Subject: [PATCH] Refresh of dsocket --- src/rsocket.c | 143 +++++++++++++++++++++++++------------------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index 018af90a..9996d333 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include @@ -85,7 +86,7 @@ static int svc_size; static struct rsocket **svc_rss; static struct pollfd *svc_fds; static uint8_t svc_buf[RS_SNDLOWAT]; -static int rs_svc_run(void *arg); +static void *rs_svc_run(void *arg); static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; @@ -241,7 +242,7 @@ struct ds_dest { }; struct ds_qp { - dlist_t list; + dlist_entry list; struct rsocket *rs; struct rdma_cm_id *cm_id; struct ds_header hdr; @@ -362,15 +363,15 @@ struct ds_udp_header { static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) { if (!rs->qp_list) - list_init(&qp->list); + dlist_init(&qp->list); else - list_insert_head(&qp->list, &rs->qp_list->list); - rs->qp_list = *qp; + dlist_insert_head(&qp->list, &rs->qp_list->list); + rs->qp_list = qp; } static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp) { - if (qp->list.next != qp->list) { + if (qp->list.next != &qp->list) { rs->qp_list = ds_next_qp(qp); dlist_remove(&qp->list); } else { @@ -385,7 +386,7 @@ static int rs_add_to_svc(struct rsocket *rs) pthread_mutex_lock(&mut); if (!svc_cnt) { - ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_sock); + ret = socketpair(AF_INET, SOCK_STREAM, 0, svc_sock); if (ret) goto err1; @@ -402,7 +403,7 @@ static int rs_add_to_svc(struct rsocket *rs) write(svc_sock[0], &msg, sizeof msg); read(svc_sock[0], &msg, sizeof msg); ret = ERR(msg.status); - if (ret && !svn_cnt) + if (ret && !svc_cnt) goto err3; pthread_mutex_unlock(&mut); @@ -430,7 +431,7 @@ static int rs_remove_from_svc(struct rsocket *rs) write(svc_sock[0], &msg, sizeof msg); read(svc_sock[0], &msg, sizeof msg); ret = ERR(msg.status); - if (!svn_cnt) { + if (!svc_cnt) { pthread_join(svc_id, NULL); close(svc_sock[0]); close(svc_sock[1]); @@ -440,6 +441,19 @@ static int rs_remove_from_svc(struct rsocket *rs) return ret; } +static int ds_compare_addr(const void *dst1, const void *dst2) +{ + const struct sockaddr *sa1, *sa2; + size_t len; + + sa1 = (const struct sockaddr *) dst1; + sa2 = (const struct sockaddr *) dst2; + + len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); + return memcmp(dst1, dst2, len); +} + static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? @@ -515,7 +529,7 @@ out: pthread_mutex_unlock(&mut); } -static int rs_insert(struct rsocket *rs, index) +static int rs_insert(struct rsocket *rs, int index) { pthread_mutex_lock(&mut); rs->index = idm_set(&idm, index, rs); @@ -644,11 +658,11 @@ static int rs_init_bufs(struct rsocket *rs) rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); if (!rs->rmsg) - return ERR(ENOEMEM); + return ERR(ENOMEM); rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf)); if (!rs->sbuf) - return ERR(ENOEMEM); + return ERR(ENOMEM); rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size); if (!rs->smr) @@ -658,7 +672,7 @@ static int rs_init_bufs(struct rsocket *rs) sizeof(*rs->target_iomap) * rs->target_iomap_size; rs->target_buffer_list = malloc(len); if (!rs->target_buffer_list) - return ERR(ENOEMEM); + return ERR(ENOMEM); rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); if (!rs->target_mr) @@ -671,7 +685,7 @@ static int rs_init_bufs(struct rsocket *rs) rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); if (!rs->rbuf) - return ERR(ENOEMEM); + return ERR(ENOMEM); rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size); if (!rs->rmr) @@ -753,9 +767,9 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) sge.addr = (uintptr_t) buf; sge.length = RS_SNDLOWAT; - sge.lkey = qp->rmr; + sge.lkey = qp->rmr->lkey; - wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf)); + wr.wr_id = ds_recv_wr_id((uint32_t) ((uint8_t *) buf - rs->rbuf)); wr.next = NULL; wr.sg_list = &sge; wr.num_sge = 1; @@ -843,7 +857,7 @@ static void ds_free_qp(struct ds_qp *qp) if (qp->cm_id) { if (qp->cm_id->qp) { - tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_dest); + tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, qp->cm_id->recv_cq_channel->fd, NULL); rdma_destroy_qp(qp->cm_id); @@ -856,8 +870,6 @@ static void ds_free_qp(struct ds_qp *qp) static void ds_free(struct rsocket *rs) { - struct ds_qp *qp; - if (rs->state & (rs_readable | rs_writable)) rs_remove_from_svc(rs); @@ -870,8 +882,8 @@ static void ds_free(struct rsocket *rs) if (rs->dmsg) free(rs->dmsg); - if (rs->smsg) - free(rs->smsg); + if (rs->smsg_free) + free(rs->smsg_free); while (rs->qp_list) { ds_remove_qp(rs, rs->qp_list); @@ -1251,7 +1263,7 @@ static int ds_init_ep(struct rsocket *rs) rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); if (!rs->dmsg) - return ERR(ENOEMEM); + return ERR(ENOMEM); rs->sbuf_bytes_avail = rs->sbuf_size; rs->sqe_avail = rs->sq_size; @@ -1273,27 +1285,14 @@ static int ds_init_ep(struct rsocket *rs) return 0; } -static int ds_compare_addr(const void *dst1, const void *dst2) -{ - const struct sockaddr *sa1, *sa2; - size_t len; - - sa1 = (const struct sockaddr *) dst1; - sa2 = (const struct sockaddr *) dst2; - - len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? - sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); - return memcmp(dst1, dst2, len); -} - static int rs_any_addr(const union socket_addr *addr) { if (addr->sa.sa_family == AF_INET) { - return (addr->sin.sin_addr == INADDR_ANY || - addr->sin.sin_addr == INADDR_LOOPBACK); + return (addr->sin.sin_addr.s_addr == INADDR_ANY || + addr->sin.sin_addr.s_addr == INADDR_LOOPBACK); } else { - return (addr->sin6.sin6_addr == in6addr_any || - addr->sin6.sin6_addr == in6addr_loopback); + return (!memcmp(&addr->sin6.sin6_addr, &in6addr_any, 16) || + !memcmp(&addr->sin6.sin6_addr, &in6addr_loopback, 16)); } } @@ -1332,7 +1331,7 @@ static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr) hdr->version = 4; hdr->length = DS_IPV4_HDR_LEN; hdr->port = addr->sin.sin_port; - hdr->addr.ipv4 = addr->sin.sin_addr; + hdr->addr.ipv4 = addr->sin.sin_addr.s_addr; } else { hdr->version = 6; hdr->length = DS_IPV6_HDR_LEN; @@ -1373,7 +1372,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, { struct ibv_qp_init_attr qp_attr; struct epoll_event event; - int ret; + int i, ret; *qp = calloc(1, sizeof(struct ds_qp)); if (!*qp) @@ -1442,7 +1441,8 @@ static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr, if (rs->qp_list) { *qp = rs->qp_list; do { - if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr) + if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id), + src_addr)) return 0; *qp = ds_next_qp(*qp); @@ -1475,11 +1475,11 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, if (ret) goto out; - ret = ds_get_qp(rs, src_addr, src_len, &qp); + ret = ds_get_qp(rs, &src_addr, src_len, &qp); if (ret) goto out; - if ((addrlen != src_len) || memcmp(addr, src_addr, addrlen)) { + if ((addrlen != src_len) || memcmp(addr, &src_addr, addrlen)) { *dest = calloc(1, sizeof(struct ds_dest)); if (!*dest) { ret = ERR(ENOMEM); @@ -1488,7 +1488,7 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, memcpy(&(*dest)->addr, addr, addrlen); (*dest)->qp = qp; - tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr); + tsearch(&(*dest)->addr, &rs->dest_map, ds_compare_addr); } out: fastlock_release(&rs->map_lock); @@ -1563,7 +1563,7 @@ static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge, wr.sg_list = sge; wr.num_sge = 1; wr.opcode = IBV_WR_SEND; - wr.send_flags = (sge.length <= rs->sq_inline) ? IBV_SEND_INLINE : 0; + wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0; wr.wr.ud.ah = rs->conn_dest->ah; wr.wr.ud.remote_qpn = rs->conn_dest->qpn; wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; @@ -2189,7 +2189,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs = idm_at(&idm, socket); if (rs->type == SOCK_DGRAM) { fastlock_acquire(&rs->slock); - ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen); + ret = ds_recvfrom(rs, buf, len, flags, NULL, 0); fastlock_release(&rs->slock); return ret; } @@ -2252,6 +2252,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen) { + struct rsocket *rs; int ret; rs = idm_at(&idm, socket); @@ -2369,7 +2370,6 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, struct ds_udp_header hdr; struct msghdr msg; struct iovec miov[8]; - struct ds_qp *qp; if (iovcnt > 8) return ERR(ENOTSUP); @@ -2384,7 +2384,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4; } else { hdr.length = DS_UDP_IPV6_HDR_LEN; - memcpy(hdr.addr.ipv6, rs->conn_dest->qp->hdr.addr.ipv6, 16); + memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16); } miov[0].iov_base = &hdr; @@ -2393,11 +2393,11 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, memcpy(&miov[1], iov, sizeof *iov * iovcnt); memset(&msg, 0, sizeof msg); - msg.msg_name = rs->conn_dest->addr; + msg.msg_name = &rs->conn_dest->addr; msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); msg.msg_iov = miov; msg.msg_iovlen = iovcnt + 1; - return sendmsg(rs->udp_sock, msg, flags); + return sendmsg(rs->udp_sock, &msg, flags); } static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, @@ -2405,8 +2405,8 @@ static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, { struct iovec iov; if (buf && len) { - iov.iov_base = buf; - iov_iov_len = len; + iov.iov_base = (void *) buf; + iov.iov_len = len; return ds_sendv_udp(rs, &iov, 1, flags, op); } else { return ds_sendv_udp(rs, NULL, 0, flags, op); @@ -2418,7 +2418,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) struct ds_smsg *msg; struct ibv_sge sge; uint64_t offset; - int flags, ret = 0; + int ret = 0; if (!rs->conn_dest->ah) return ds_send_udp(rs, buf, len, flags, RS_OP_DATA); @@ -2433,7 +2433,7 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) rs->smsg_free = msg->next; rs->sqe_avail--; - memcpy((void *) msg, rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length); + memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length); memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len); sge.addr = (uintptr_t) msg; sge.length = rs->conn_dest->qp->hdr.length + len; @@ -2539,6 +2539,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { struct rsocket *rs; + int ret; rs = idm_at(&idm, socket); if (rs->type == SOCK_STREAM) { @@ -2550,7 +2551,7 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags, fastlock_acquire(&rs->slock); if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { - ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); if (ret) goto out; } @@ -3021,8 +3022,6 @@ int rshutdown(int socket, int how) static void ds_shutdown(struct rsocket *rs) { - int ret = 0; - if (rs->fd_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); @@ -3099,7 +3098,7 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { - ret = setsockopt(rs->udp_sock, optname, optval, optlen); + ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); if (ret) return ret; } @@ -3643,13 +3642,13 @@ static uint8_t rs_svc_path_bits(struct ds_dest *dest) struct ibv_port_attr attr; if (!ibv_query_port(dest->qp->cm_id->verbs, dest->qp->cm_id->port_num, &attr)) - return (uint8_t) ((1 << attr.lmc) - 1)); + return (uint8_t) ((1 << attr.lmc) - 1); return 0x7f; } static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn) { - struct socket_addr saddr; + union socket_addr saddr; struct rdma_cm_id *id; struct ibv_ah_attr attr; int ret; @@ -3661,7 +3660,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t fastlock_release(&rs->slock); } - ret = rdma_create_cm_id(NULL, &id, NULL, dest->qp->cm_id->ps); + ret = rdma_create_id(NULL, &id, NULL, dest->qp->cm_id->ps); if (ret) return; @@ -3671,7 +3670,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t saddr.sin.sin_port = 0; else saddr.sin6.sin6_port = 0; - ret = rdma_resolve_addr(id, &saddr, &dest->addr.sa, 2000); + ret = rdma_resolve_addr(id, &saddr.sa, &dest->addr.sa, 2000); if (ret) goto out; @@ -3684,7 +3683,7 @@ static void rs_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t attr.is_global = 1; attr.grh.dgid = id->route.path_rec->dgid; attr.grh.flow_label = id->route.path_rec->flow_label; - attr.grh.sgid_index = rs_svc_sgid_index(dest, id->route.path_rec->sgid); + attr.grh.sgid_index = rs_svc_sgid_index(dest, &id->route.path_rec->sgid); attr.grh.hop_limit = id->route.path_rec->hop_limit; attr.grh.traffic_class = id->route.path_rec->traffic_class; } @@ -3702,13 +3701,13 @@ out: rdma_destroy_id(id); } -static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_header, +static int rs_svc_valid_udp_hdr(struct ds_udp_header *udp_hdr, union socket_addr *addr) { return (udp_hdr->tag == DS_UDP_TAG) && - ((udp_hdr->version == 4 && addr.sa.sa_family == AF_INET && + ((udp_hdr->version == 4 && addr->sa.sa_family == AF_INET && udp_hdr->length == DS_UDP_IPV4_HDR_LEN) || - (udp_hdr->version == 6 && addr.sa.sa_family == AF_INET6 && + (udp_hdr->version == 6 && addr->sa.sa_family == AF_INET6 && udp_hdr->length == DS_UDP_IPV6_HDR_LEN)); } @@ -3771,7 +3770,7 @@ static void rs_svc_process_rs(struct rsocket *rs) cur_dest = rs->conn_dest; if (udp_hdr->op == RS_OP_DATA) { rs->conn_dest = &dest->qp->dest; - rs_svc_forward(rs, buf, len, addr); + rs_svc_forward(rs, svc_buf + udp_hdr->length, len, &addr); } rs->conn_dest = dest; @@ -3780,7 +3779,7 @@ static void rs_svc_process_rs(struct rsocket *rs) fastlock_release(&rs->slock); } -static int rs_svc_run(void *arg) +static void *rs_svc_run(void *arg) { struct rs_svc_msg msg; int i, ret; @@ -3788,8 +3787,8 @@ static int rs_svc_run(void *arg) ret = rs_svc_grow_sets(); if (ret) { msg.status = ret; - write(svc_sock[1] &msg, sizeof msg); - return ret; + write(svc_sock[1], &msg, sizeof msg); + return (void *) (uintptr_t) ret; } svc_fds[0].fd = svc_sock[1]; @@ -3808,5 +3807,5 @@ static int rs_svc_run(void *arg) } } while (svc_cnt > 1); - return 0; + return NULL; } -- 2.41.0