From 817eacc979df6c00af54c0c00eebb8b25f496dde Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Tue, 20 Nov 2012 23:43:13 -0800 Subject: [PATCH] Refresh of dsocket --- src/cma.c | 11 +- src/rsocket.c | 459 ++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 398 insertions(+), 72 deletions(-) diff --git a/src/cma.c b/src/cma.c index 91bf1084..2c6b0320 100755 --- a/src/cma.c +++ b/src/cma.c @@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id) int ucma_max_qpsize(struct rdma_cm_id *id) { struct cma_id_private *id_priv; + int i, max_size = 0; id_priv = container_of(id, struct cma_id_private, id); - return id_priv->cma_dev->max_qpsize; + if (id && id_priv->cma_dev) { + max_size = id_priv->cma_dev->max_qpsize; + } else { + for (i = 0; i < cma_dev_cnt; i++) { + if (!max_size || max_size > cma_dev_array[i].max_qpsize) + max_size = cma_dev_array[i].max_qpsize; + } + } + return max_size; } uint16_t ucma_get_port(struct sockaddr *addr) diff --git a/src/rsocket.c b/src/rsocket.c index 99e638c8..0695d12d 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -55,7 +55,7 @@ #define RS_OLAP_START_SIZE 2048 #define RS_MAX_TRANSFER 65536 -#define RS_SNDLOWAT 64 +#define RS_SNDLOWAT 2048 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 @@ -63,6 +63,23 @@ static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +enum { + RS_SVC_INSERT, + RS_SVC_REMOVE +}; + +struct rsocket; + +struct rs_svc_msg { + uint32_t op; + uint32_t status; + struct rsocket *rs; +}; + +static pthread_t svc_id; +static int svc_cnt; +static int svc_fds[2]; + static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; @@ -165,9 +182,9 @@ enum rs_state { rs_connecting = rs_opening | 0x0040, rs_accepting = rs_opening | 0x0080, rs_connected = 0x0100, - rs_connect_wr = 0x0200, - rs_connect_rd = 0x0400, - rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr, + rs_writable = 0x0200, + rs_readable = 0x0400, + rs_connect_rdwr = rs_connected | rs_readable | rs_writable, rs_connect_error = 0x0800, rs_disconnected = 0x1000, rs_error = 0x2000, @@ -182,18 +199,23 @@ union socket_addr { }; struct ds_qp { + dlist_t list; + struct rsocket *rs; struct rdma_cm_id *cm_id; - int rbuf_cnt; + + struct ibv_mr *rmr; + uint8_t *rbuf; + + struct ibv_mr *smr; uint16_t lid; uint8_t sl; }; struct ds_dest { - union socket_addr addr; + union socket_addr addr; /* must be first */ struct ds_qp *qp; struct ibv_ah *ah; uint32_t qpn; - atomic_t refcnt; }; struct rsocket { @@ -223,12 +245,12 @@ struct rsocket { }; /* datagram */ struct { - struct ds_qp *qp_array; + dlist_t qp_list; void *dest_map; struct ds_dest *conn_dest; struct pollfd *fds; nfds_t nfds; - int fd; + int dsock; int sbytes_needed; }; }; @@ -290,6 +312,62 @@ struct ds_udp_header { }; +static int rs_svc_run(void *arg) +{ + return 0; +} + +static int rs_svc_insert(struct rsocket *rs) +{ + struct rs_svc_msg msg; + int ret; + + pthread_mutex_lock(&mut); + if (!svc_cnt) { + ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds); + if (ret) + goto out; + + ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL); + if (ret) { + close(svc_fds[0]); + close(svc_fds[1]); + ret = ERR(ret); + goto out; + } + } + + msg.op = RS_SVC_INSERT; + msg.status = EINVAL; + msg.rs = rs; + svc_cnt++; + write(svc_fds[0], &msg, sizeof msg); + read(svc_fds[0], &msg, sizeof msg); + ret = ERR(msg.status); +out: + pthread_mutex_unlock(&mut); + return ret; +} + +static int rs_svc_remove(struct rsocket *rs) +{ + struct rs_svc_msg msg; + int ret; + + pthread_mutex_lock(&mut); + msg.op = RS_SVC_REMOVE; + msg.status = EINVAL; + msg.rs = rs; + write(svc_fds[0], &msg, sizeof msg); + read(svc_fds[0], &msg, sizeof msg); + ret = ERR(msg.status); + if (!ret && !--svn_cnt) + pthread_join(svc_id, NULL); + + pthread_mutex_unlock(&mut); + return ret; +} + static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? @@ -390,7 +468,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) rs->type = type; rs->index = -1; - rs->fd = -1; + rs->dsock = -1; if (inherited_rs) { rs->sbuf_size = inherited_rs->sbuf_size; rs->rbuf_size = inherited_rs->rbuf_size; @@ -418,6 +496,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) return rs; } +/* + * TODO: Support datagram rsockets + */ static int rs_set_nonblocking(struct rsocket *rs, long arg) { int ret = 0; @@ -500,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs) return 0; } -static int rs_create_cq(struct rsocket *rs) +static int ds_init_bufs(struct ds_qp *qp) { - rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs); - if (!rs->cm_id->recv_cq_channel) + qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); + if (!qp->rbuf) + return ERR(ENOMEM); + + qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size); + if (!qp->smr) + return -1; + + qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); + if (!qp->rmr) return -1; - rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size, - rs->cm_id, rs->cm_id->recv_cq_channel, 0); - if (!rs->cm_id->recv_cq) + return 0; +} + +static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) +{ + cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); + if (!cm_id->recv_cq_channel) + return -1; + + cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, + cm_id, cm_id->recv_cq_channel, 0); + if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { @@ -516,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs) goto err2; } - rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel; - rs->cm_id->send_cq = rs->cm_id->recv_cq; + cm_id->send_cq_channel = cm_id->recv_cq_channel; + cm_id->send_cq = cm_id->recv_cq; return 0; err2: - ibv_destroy_cq(rs->cm_id->recv_cq); - rs->cm_id->recv_cq = NULL; + ibv_destroy_cq(cm_id->recv_cq); + cm_id->recv_cq = NULL; err1: - ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel); - rs->cm_id->recv_cq_channel = NULL; + ibv_destroy_comp_channel(cm_id->recv_cq_channel); + cm_id->recv_cq_channel = NULL; return -1; } -static inline int -rs_post_recv(struct rsocket *rs) +static inline int rs_post_recv(struct rsocket *rs) { struct ibv_recv_wr wr, *bad; @@ -542,6 +639,23 @@ rs_post_recv(struct rsocket *rs) return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } +static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) +{ + struct ibv_recv_wr wr, *bad; + struct ibv_sge sge; + + sge.addr = (uintptr_t) buf; + sge.length = RS_SNDLOWAT; + sge.lkey = qp->rmr; + + wr.wr_id = RS_RECV_WR_ID; + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; + + return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad)); +} + static int rs_create_ep(struct rsocket *rs) { struct ibv_qp_init_attr qp_attr; @@ -552,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs) if (ret) return ret; - ret = rs_create_cq(rs); + ret = rs_create_cq(rs, rs->cm_id); if (ret) return ret; @@ -609,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs) } } +static void ds_free_qp(struct ds_qp *qp) +{ + if (qp->smr) + rdma_dereg_mr(qp->smr); + + if (qp->rbuf) { + if (qp->rmr) + rdma_dereg_mr(qp->rmr); + free(qp->rbuf); + } + + if (qp->cm_id) { + if (qp->cm_id->qp) { + dlist_remove(&qp->list); + rdma_destroy_qp(qp->cm_id); + } + rdma_destroy_id(qp->cm_id); + } + free(qp); +} + +static void ds_free_qps(struct rsocket *rs) +{ + struct ds_qp *qp; + dlist_t *entry; + + while (!dlist_empty(&rs->qp_list)) { + qp = container_of(rs->qp_list.next, struct ds_qp, list); + ds_free_qp(qp); + } +} + +static void ds_free(struct rsocket *rs) +{ + if (rs->state & (rs_readable | rs_writable)) + rs_svc_remove(rs); + + if (rs->dsock >= 0) + close(rs->dsock); + + if (rs->index >= 0) + rs_remove(rs); + + ds_free_qps(rs); + if (rs->fds) + free(rs->fds); + + if (rs->sbuf) + free(rs->sbuf); + + fastlock_destroy(&rs->map_lock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); + free(rs); +} + static void rs_free(struct rsocket *rs) { + if (rs->type == SOCK_DGRAM) { + ds_free(rs); + return; + } + if (rs->index >= 0) rs_remove(rs); @@ -642,11 +819,6 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - if (rs->fd >= 0) - close(rs->fd); - if (rs->fds) - free(rs->fds); - fastlock_destroy(&rs->map_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); @@ -703,14 +875,15 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) static int ds_init(struct rsocket *rs, int domain) { - rs->fd = socket(domain, SOCK_DGRAM, 0); - if (rs->fd < 0) - return rs->fd; + rs->dsock = socket(domain, SOCK_DGRAM, 0); + if (rs->dsock < 0) + return rs->dsock; rs->fds = calloc(1, sizeof *fds); if (!rs->fds) return ERR(ENOMEM); rs->nfds = 1; + dlist_init(&rs->qp_list); return 0; } @@ -743,7 +916,7 @@ int rsocket(int domain, int type, int protocol) if (ret) goto err; - index = rs->fd; + index = rs->dsock; } ret = rs_insert(rs, index); @@ -768,7 +941,12 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) if (!ret) rs->state = rs_bound; } else { - ret = bind(rs->fd, addr, addrlen); + ret = bind(rs->dsock, addr, addrlen); + if (!ret) { + ret = rs_svc_insert(rs); + if (!ret) + rs->state = rs_readable | rs_writable; + } } return ret; } @@ -950,7 +1128,33 @@ connected: return ret; } -static int ds_compare_dest(const void *dst1, const void *dst2) +static int ds_init_ep(struct rsocket *rs) +{ + int ret; + + rs_set_qp_size(rs); + if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT)) + rs->rq_size = rs->rbuf_size / RS_SNDLOWAT; + else + rs->rbuf_size = rs->rq_size * RS_SNDLOWAT; + + rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf)); + if (!rs->sbuf) + return ERR(ENOMEM); + + rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf; + rs->sbuf_bytes_avail = rs->sbuf_size; + rs->sqe_avail = rs->sq_size; + + ret = rs_svc_insert(rs); + if (ret) + return ret; + + rs->state = rs_readable | rs_writable; + return 0; +} + +static int ds_compare_addr(const void *dst1, const void *dst2) { const struct sockaddr *sa1, *sa2; size_t len; @@ -963,60 +1167,173 @@ static int ds_compare_dest(const void *dst1, const void *dst2) return memcmp(dst1, dst2, len); } -/* Caller must hold map_lock around accessing source address */ -static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr, - socklen_t dst_len, socklen_t *src_len) +static int rs_any_addr(const union socket_addr *addr) +{ + if (addr->sa.sa_family == AF_INET) { + return (addr->sin.sin_addr == INADDR_ANY || + addr->sin.sin_addr == INADDR_LOOPBACK); + } else { + return (addr->sin6.sin6_addr == in6addr_any || + addr->sin6.sin6_addr == in6addr_loopback); + } +} + +static int ds_get_src_addr(struct rsocket *rs, + const struct sockaddr *dest_addr, socklen_t dest_len, + union socket_addr *src_addr, socklen_t *src_len) { - static union socket_addr src_addr; int sock, ret; + uint16_t port; - sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0); + *src_len = sizeof src_addr; + ret = getsockname(rs->dsock, &src_addr->sa, src_len); + if (ret || !rs_any_addr(src_addr)) + return ret; + + port = src_addr->sin.sin_port; + sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0); if (sock < 0) - return NULL; + return sock; - ret = connect(sock, &dst_addr->sa, dst_len); + ret = connect(sock, dest_addr, dest_len); if (ret) goto out; *src_len = sizeof src_addr; - ret = getsockname(sock, &src_addr.sa, src_len); - + ret = getsockname(sock, &src_addr->sa, src_len); + src_addr->sin.sin_port = port; out: close(sock); - return ret ? NULL : &src_addr; + return ret; +} + +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, + socklen_t addrlen, struct ds_qp **qp) +{ + struct ibv_qp_init_attr qp_attr; + int ret; + + *qp = calloc(1, sizeof(struct ds_qp)); + if (!*qp) + return ERR(ENOMEM); + + (*qp)->rs = rs; + ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP); + if (ret) + goto err; + + ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa); + if (ret) + goto err; + + ret = ds_init_bufs(*qp); + if (ret) + goto err; + + ret = rs_create_cq(rs, (*qp)->cm_id); + if (ret) + goto err; + + memset(&qp_attr, 0, sizeof qp_attr); + qp_attr.qp_context = qp; + qp_attr.send_cq = rs->cm_id->send_cq; + qp_attr.recv_cq = rs->cm_id->recv_cq; + qp_attr.qp_type = IBV_QPT_UD; + qp_attr.sq_sig_all = 1; + qp_attr.cap.max_send_wr = rs->sq_size; + qp_attr.cap.max_recv_wr = rs->rq_size; + qp_attr.cap.max_send_sge = 2; + qp_attr.cap.max_recv_sge = 1; + qp_attr.cap.max_inline_data = rs->sq_inline; + + ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); + if (ret) + return ret; + + for (i = 0; i < rs->rq_size; i++) { + ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); + if (ret) + goto err; + } + list_insert_head(&(*qp)->list, &rs->qp_list); + return 0; +err: + ds_free_qp(*qp); + return ret; } -static struct ds_qp *ds_get_qp(struct rsocket *rs, - const struct sockaddr *src_addr, socklen_t addrlen) +static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr, + socklen_t addrlen, struct ds_qp **qp) { - union socket_addr *addr; - socklen_t len; + dlist_t *entry; + for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) { + *qp = container_of(entry, struct ds_qp, list); + if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr) + return 0; + } + + return ds_create_qp(rs, src_addr, addrlen, qp); } -static int ds_connect(struct rsocket *rs, - const struct sockaddr *dest_addr, socklen_t addrlen) +static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, + socklen_t addrlen, struct ds_dest **dest) { - struct ds_dest **dest; + union socket_addr src_addr; + socklen_t src_len; + struct ds_qp *qp; + int ret = 0; fastlock_acquire(&rs->map_lock); - dest = tfind(dest_addr, rs->dest_map, ds_compare_dest); - if (dest) { - rs->conn_dest = *dest; + dest = tfind(addr, &rs->dest_map, ds_compare_addr); + if (dest) + goto out; + + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) + goto out; + } + + ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); + if (ret) + goto out; + + ret = ds_get_qp(rs, src_addr, src_len, &qp); + if (ret) + goto out; + + *dest = calloc(1, sizeof(struct ds_dest)); + if (!*dest) { + ret = ERR(ENOMEM); goto out; } + + memcpy(&(*dest)->addr, addr, addrlen); + (*dest)->qp = qp; + tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr); out: fastlock_release(&rs->map_lock); - return 0; + return ret; } int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) { struct rsocket *rs; + int ret; rs = idm_at(&idm, socket); - memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); - return rs_do_connect(rs); + if (rs->type == SOCK_STREAM) { + memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); + ret = rs_do_connect(rs); + } else { + fastlock_acquire(&rs->slock); + ret = connect(rs->dsock, addr, addrlen); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + fastlock_release(&rs->slock); + } + return ret; } static int rs_post_write_msg(struct rsocket *rs, @@ -1229,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs) rs->state = rs_disconnected; return 0; } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { - rs->state &= ~rs_connect_rd; + rs->state &= ~rs_readable; } break; case RS_OP_WRITE: @@ -1409,7 +1726,7 @@ static int ds_can_send(struct rsocket *rs) static int rs_conn_can_send(struct rsocket *rs) { - return rs_can_send(rs) || !(rs->state & rs_connect_wr); + return rs_can_send(rs) || !(rs->state & rs_writable); } static int rs_conn_can_send_ctrl(struct rsocket *rs) @@ -1424,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs) static int rs_conn_have_rdata(struct rsocket *rs) { - return rs_have_rdata(rs) || !(rs->state & rs_connect_rd); + return rs_have_rdata(rs) || !(rs->state & rs_readable); } static int rs_conn_all_sends_done(struct rsocket *rs) @@ -1527,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs->rbuf_bytes_avail += rsize; } - } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd)); + } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable)); fastlock_release(&rs->rlock); return ret ? ret : len - left; @@ -1586,7 +1903,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -1764,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -1890,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -2239,7 +2556,7 @@ int rshutdown(int socket, int how) rs = idm_at(&idm, socket); if (how == SHUT_RD) { - rs->state &= ~rs_connect_rd; + rs->state &= ~rs_readable; return 0; } @@ -2249,10 +2566,10 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) { if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; - rs->state &= ~(rs_connect_rd | rs_connect_wr); + rs->state &= ~(rs_readable | rs_writable); } else { - rs->state &= ~rs_connect_wr; - ctrl = (rs->state & rs_connect_rd) ? + rs->state &= ~rs_writable; + ctrl = (rs->state & rs_readable) ? RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; } if (!rs->ctrl_avail) { @@ -2733,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } -- 2.41.0