From: Sean Hefty Date: Fri, 9 Nov 2012 18:26:38 +0000 (-0800) Subject: rsocket: Add datagram support X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=158ea5a8c1a0fcf3ca61c642095023af16759c90;p=~shefty%2Flibrdmacm.git rsocket: Add datagram support Signed-off-by: Sean Hefty --- diff --git a/docs/rsocket b/docs/rsocket index 1484f65b..03d49df7 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -1,7 +1,7 @@ -rsocket Protocol and Design Guide 9/10/2012 +rsocket Protocol and Design Guide 11/11/2012 -Overview --------- +Data Streaming (TCP) Overview +----------------------------- Rsockets is a protocol over RDMA that supports a socket-level API for applications. For details on the current state of the implementation, readers should refer to the rsocket man page. This @@ -189,3 +189,47 @@ registered remote data buffer. From host A's perspective, the transfer appears as a normal send/write operation, with the data stream redirected directly into the receiving application's buffer. + + + +Datagram Overview +----------------- +THe rsocket API supports datagram sockets. Datagram support is handled through an +entirely different protocol and internal implementation. Unlike connected rsockets, +datagram rsockets are not necessarily bound to a network (IP) address. A datagram +socket may use any number of network (IP) addresses, including those which map to +different RDMA devices. As a result, a single datagram rsocket must support +using multiple RDMA devices and ports, and a datagram rsocket references a single +UDP socket, plus zero or more UD QPs. + +Rsockets uses headers inserted before user data sent over UDP sockets to resolve +remote UD QP numbers. When a user first attempts to send a datagram to a remote +address (IP and UDP port), rsockets will take the following steps. First, it +will allocate and store the destination address into a lookup table for future +use. Then it will resolve which local network address should be used when sending +to the specified destination. It will allocate a UD QP on the RDMA device +associated with the local address. Finally, rsockets will send the user's +datagram to the remote UDP socket, but inserting a header before the datagram. +The header specifies the UD QP number associated with the local network address +(IP and UDP port) of the send. + +Under many circumstances, the rsocket UDP header also provides enough path +information for the receiver to send the reply using a UD QP. However, certain +fabric topologies may require contacting a subnet administrator. Whenever +rsockets lacks sufficient information to send data to a remote peer using +a UD QP, it will automatically fall back to using a standard UDP socket. This +helps minimize the latency for performing a given send, while lenghty address +and route resolution protocols complete. + +Currently, rsockets allocates a single UD QP, with an infrastructure is provided +to support more. Future enhancements may add support for multiple UD QPs, each +associated with a single network (IP) address. Multiplexing different network +addresses over a single UD QP and using shared receive queues are other possible +enhancements. + +Because rsockets may use multiple UD QPs, buffer management can become an issue. +The total size of receive buffers is specified by the mem_default configuration +files, but may be overridden with rsetsockopt. The buffer is divided into +MTU sized messages and distributed among the allocated QPs. As new QPs are +created, the receive buffers may be redistributed by posting loopback sends +on a QP in order to free the receive buffers for reposting to a different QP. \ No newline at end of file diff --git a/src/rsocket.c b/src/rsocket.c index 58fcb8e5..99e638c8 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -110,6 +110,12 @@ struct rs_msg { uint32_t data; }; +struct ds_qp; + +struct ds_msg { + struct ds_qp *qp; +}; + struct rs_sge { uint64_t addr; uint32_t key; @@ -169,13 +175,63 @@ enum rs_state { #define RS_OPT_SWAP_SGL 1 -struct rsocket { +union socket_addr { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; +}; + +struct ds_qp { struct rdma_cm_id *cm_id; + int rbuf_cnt; + uint16_t lid; + uint8_t sl; +}; + +struct ds_dest { + union socket_addr addr; + struct ds_qp *qp; + struct ibv_ah *ah; + uint32_t qpn; + atomic_t refcnt; +}; + +struct rsocket { + int type; + int index; fastlock_t slock; fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; - fastlock_t iomap_lock; + fastlock_t map_lock; /* acquire slock first if needed */ + + union { + /* data stream */ + struct { + struct rdma_cm_id *cm_id; + + int remote_sge; + struct rs_sge remote_sgl; + struct rs_sge remote_iomap; + + struct ibv_mr *target_mr; + int target_sge; + int target_iomap_size; + void *target_buffer_list; + volatile struct rs_sge *target_sgl; + struct rs_iomap *target_iomap; + }; + /* datagram */ + struct { + struct ds_qp *qp_array; + void *dest_map; + struct ds_dest *conn_dest; + struct pollfd *fds; + nfds_t nfds; + int fd; + int sbytes_needed; + }; + }; int opts; long fd_flags; @@ -186,7 +242,7 @@ struct rsocket { int cq_armed; int retries; int err; - int index; + int ctrl_avail; int sqe_avail; int sbuf_bytes_avail; @@ -203,34 +259,37 @@ struct rsocket { int rbuf_offset; int rmsg_head; int rmsg_tail; - struct rs_msg *rmsg; - - int remote_sge; - struct rs_sge remote_sgl; - struct rs_sge remote_iomap; + union { + struct rs_msg *rmsg; + struct ds_msg *dmsg; + }; struct rs_iomap_mr *remote_iomappings; dlist_entry iomap_list; dlist_entry iomap_queue; int iomap_pending; - struct ibv_mr *target_mr; - int target_sge; - int target_iomap_size; - void *target_buffer_list; - volatile struct rs_sge *target_sgl; - struct rs_iomap *target_iomap; - uint32_t rbuf_size; - struct ibv_mr *rmr; + struct ibv_mr *rmr; uint8_t *rbuf; uint32_t sbuf_size; - struct ibv_mr *smr; + struct ibv_mr *smr; struct ibv_sge ssgl[2]; uint8_t *sbuf; }; +#define DS_UDP_TAG 0x5555555555555555ULL + +struct ds_udp_header { + uint64_t tag; + uint8_t version; + uint8_t sl; + uint16_t slid; + uint32_t qpn; /* upper 8-bits reserved */ +}; + + static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? @@ -306,10 +365,10 @@ out: pthread_mutex_unlock(&mut); } -static int rs_insert(struct rsocket *rs) +static int rs_insert(struct rsocket *rs, index) { pthread_mutex_lock(&mut); - rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs); + rs->index = idm_set(&idm, index, rs); pthread_mutex_unlock(&mut); return rs->index; } @@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs) pthread_mutex_unlock(&mut); } -static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) { struct rsocket *rs; @@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) if (!rs) return NULL; + rs->type = type; rs->index = -1; + rs->fd = -1; if (inherited_rs) { rs->sbuf_size = inherited_rs->sbuf_size; rs->rbuf_size = inherited_rs->rbuf_size; @@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); - fastlock_init(&rs->iomap_lock); + fastlock_init(&rs->map_lock); dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); return rs; @@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - fastlock_destroy(&rs->iomap_lock); + if (rs->fd >= 0) + close(rs->fd); + if (rs->fds) + free(rs->fds); + + fastlock_destroy(&rs->map_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) rs->sseq_comp = ntohs(conn->credits); } +static int ds_init(struct rsocket *rs, int domain) +{ + rs->fd = socket(domain, SOCK_DGRAM, 0); + if (rs->fd < 0) + return rs->fd; + + rs->fds = calloc(1, sizeof *fds); + if (!rs->fds) + return ERR(ENOMEM); + rs->nfds = 1; + + return 0; +} + int rsocket(int domain, int type, int protocol) { struct rsocket *rs; - int ret; + int index, ret; if ((domain != PF_INET && domain != PF_INET6) || - (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) + ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || + (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || + (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) return ERR(ENOTSUP); rs_configure(); - rs = rs_alloc(NULL); + rs = rs_alloc(NULL, type); if (!rs) return ERR(ENOMEM); - ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); - if (ret) - goto err; + if (type == SOCK_STREAM) { + ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); + if (ret) + goto err; + + rs->cm_id->route.addr.src_addr.sa_family = domain; + index = rs->cm_id->channel->fd; + } else { + ret = ds_init(rs, domain); + if (ret) + goto err; - ret = rs_insert(rs); + index = rs->fd; + } + + ret = rs_insert(rs, index); if (ret < 0) goto err; - rs->cm_id->route.addr.src_addr.sa_family = domain; return rs->index; err: @@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) int ret; rs = idm_at(&idm, socket); - ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); - if (!ret) - rs->state = rs_bound; + if (rs->type == SOCK_STREAM) { + ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); + if (!ret) + rs->state = rs_bound; + } else { + ret = bind(rs->fd, addr, addrlen); + } return ret; } @@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) int ret; rs = idm_at(&idm, socket); - new_rs = rs_alloc(rs); + new_rs = rs_alloc(rs, rs->type); if (!new_rs) return ERR(ENOMEM); @@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) if (ret) goto err; - ret = rs_insert(new_rs); + ret = rs_insert(new_rs, rs->cm_id->channel->fd); if (ret < 0) goto err; @@ -854,6 +950,66 @@ connected: return ret; } +static int ds_compare_dest(const void *dst1, const void *dst2) +{ + const struct sockaddr *sa1, *sa2; + size_t len; + + sa1 = (const struct sockaddr *) dst1; + sa2 = (const struct sockaddr *) dst2; + + len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); + return memcmp(dst1, dst2, len); +} + +/* Caller must hold map_lock around accessing source address */ +static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr, + socklen_t dst_len, socklen_t *src_len) +{ + static union socket_addr src_addr; + int sock, ret; + + sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0); + if (sock < 0) + return NULL; + + ret = connect(sock, &dst_addr->sa, dst_len); + if (ret) + goto out; + + *src_len = sizeof src_addr; + ret = getsockname(sock, &src_addr.sa, src_len); + +out: + close(sock); + return ret ? NULL : &src_addr; +} + +static struct ds_qp *ds_get_qp(struct rsocket *rs, + const struct sockaddr *src_addr, socklen_t addrlen) +{ + union socket_addr *addr; + socklen_t len; + +} + +static int ds_connect(struct rsocket *rs, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + struct ds_dest **dest; + + fastlock_acquire(&rs->map_lock); + dest = tfind(dest_addr, rs->dest_map, ds_compare_dest); + if (dest) { + rs->conn_dest = *dest; + goto out; + } +out: + fastlock_release(&rs->map_lock); + return 0; +} + int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) { struct rsocket *rs; @@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int ds_post_send(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_SEND; + wr.send_flags = flags; + wr.wr.ud.ah = rs->conn_dest->ah; + wr.wr.ud.remote_qpn = rs->conn_dest->qpn; + wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; + + return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs, flags, addr, rkey); } +static int ds_send_data(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint32_t length, int flags) +{ + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags); +} + static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, struct ibv_sge *sgl, int nsge, uint32_t length, int flags) { @@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } +static int ds_can_send(struct rsocket *rs) +{ + return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed); +} + static int rs_conn_can_send(struct rsocket *rs) { return rs_can_send(rs) || !(rs->state & rs_connect_wr); @@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) struct rs_iomap iom; int ret; - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); while (!dlist_empty(&rs->iomap_queue)) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), @@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) } rs->iomap_pending = !dlist_empty(&rs->iomap_queue); - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } +static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, + int iovcnt, int flags) +{ + struct ds_udp_header hdr; + struct msghdr msg; + struct iovec miov[4]; + struct ds_qp *qp; + + if (iovcnt > 4) + return ERR(ENOTSUP); + + qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL; + hdr.tag = htonll(DS_UDP_TAG); + hdr.version = 1; + if (qp) { + hdr.sl = qp->sl; + hdr.slid = htons(qp->lid); + hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF); + } else { + hdr.sl = 0; + hdr.slid = 0; + hdr.qpn = 0; + } + + miov[0].iov_base = &hdr; + miov[0].iov_len = sizeof hdr; + memcpy(&miov[1], iov, sizeof *iov * iovcnt); + + memset(&msg, 0, sizeof msg); + /* TODO: specify name if needed */ + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; + return sendmsg(rs->fd, msg, flags); +} + +static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct iovec iov; + iov.iov_base = buf; + iov_iov_len = len; + return ds_sendv_udp(s, &iov, 1, flags); +} + +static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct ibv_sge sge; + int ret = 0; + + if (!rs->conn_dest || !rs->conn_dest->ah) + return ds_send_udp(rs, buf, len, flags); + + rs->sbytes_needed = len; + if (!ds_can_send(rs)) { + ret = rs_get_comp(rs, 1, ds_can_send); + if (ret) + return ds_send_udp(rs, buf, len, flags); + } + + if (len <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = len; + sge.lkey = 0; + ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE); + } else if (len <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len); + rs->ssgl[0].length = len; + ret = ds_send_data(rs, rs->ssgl, 1, len, 0); + if (len < rs_sbuf_left(rs)) + rs->ssgl[0].addr += len; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = len - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = ds_send_data(rs, rs->ssgl, 2, len, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + + return ret ? ret : len; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM) { + fastlock_acquire(&rs->slock); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; + } + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { @@ -1537,10 +1817,21 @@ out: ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { - if (dest_addr || addrlen) - return ERR(EISCONN); + struct rsocket *rs; + + rs = idm_at(&idm, socket); + if (rs->type == SOCK_STREAM) { + if (dest_addr || addrlen) + return ERR(EISCONN); + + return rsend(socket, buf, len, flags); + } - return rsend(socket, buf, len, flags); + fastlock_acquire(&rs->slock); + ds_connect(rs, dest_addr, addrlen); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; } static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) @@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) if (msg->msg_control && msg->msg_controllen) return ERR(ENOTSUP); - return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); + return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags); } ssize_t rwrite(int socket, const void *buf, size_t count) @@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); + return 0; + } else { + return getpeername(rs->fs, addr, addrlen); + } } int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) @@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); + return 0; + } else { + return getsockname(rs->fd, addr, addrlen); + } } int rsetsockopt(int socket, int level, int optname, @@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { + ret = setsockopt(rs->fd, optname, optval, optlen); + if (ret) + return ret; + } + switch (level) { case SOL_SOCKET: opts = &rs->so_opts; @@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname, int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) + return getsockopt(rs->fd, level, optname, optval, optlen); + switch (level) { case SOL_SOCKET: switch (optname) { @@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) return ERR(EINVAL); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); if (prot & PROT_WRITE) { iomr = rs_get_iomap_mr(rs); access |= IBV_ACCESS_REMOTE_WRITE; @@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse dlist_insert_tail(&iomr->entry, &rs->iomap_list); } out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return offset; } @@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len) int ret = 0; rs = idm_at(&idm, socket); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); for (entry = rs->iomap_list.next; entry != &rs->iomap_list; entry = entry->next) { @@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len) } ret = ERR(EINVAL); out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } @@ -2475,3 +2783,24 @@ out: return (ret && left == count) ? ret : count - left; } + +ssize_t urecvfrom(int socket, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + int ret; + + ret = rrecv(socket, buf, len, flags); + if (ret > 0 && src_addr) + rgetpeername(socket, src_addr, addrlen); + + return ret; +} + +ssize_t usendto(int socket, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + if (dest_addr || addrlen) + return ERR(EISCONN); + + return usend(socket, buf, len, flags); +}