From: Sean Hefty Date: Fri, 9 Nov 2012 18:26:38 +0000 (-0800) Subject: rsocket: Add datagram support X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=7029a9a237f690cfe1fd840d35a93647ca447ab5;p=~shefty%2Flibrdmacm.git rsocket: Add datagram support Signed-off-by: Sean Hefty --- diff --git a/docs/rsocket b/docs/rsocket index 1484f65b..03d49df7 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -1,7 +1,7 @@ -rsocket Protocol and Design Guide 9/10/2012 +rsocket Protocol and Design Guide 11/11/2012 -Overview --------- +Data Streaming (TCP) Overview +----------------------------- Rsockets is a protocol over RDMA that supports a socket-level API for applications. For details on the current state of the implementation, readers should refer to the rsocket man page. This @@ -189,3 +189,47 @@ registered remote data buffer. From host A's perspective, the transfer appears as a normal send/write operation, with the data stream redirected directly into the receiving application's buffer. + + + +Datagram Overview +----------------- +THe rsocket API supports datagram sockets. Datagram support is handled through an +entirely different protocol and internal implementation. Unlike connected rsockets, +datagram rsockets are not necessarily bound to a network (IP) address. A datagram +socket may use any number of network (IP) addresses, including those which map to +different RDMA devices. As a result, a single datagram rsocket must support +using multiple RDMA devices and ports, and a datagram rsocket references a single +UDP socket, plus zero or more UD QPs. + +Rsockets uses headers inserted before user data sent over UDP sockets to resolve +remote UD QP numbers. When a user first attempts to send a datagram to a remote +address (IP and UDP port), rsockets will take the following steps. First, it +will allocate and store the destination address into a lookup table for future +use. Then it will resolve which local network address should be used when sending +to the specified destination. It will allocate a UD QP on the RDMA device +associated with the local address. Finally, rsockets will send the user's +datagram to the remote UDP socket, but inserting a header before the datagram. +The header specifies the UD QP number associated with the local network address +(IP and UDP port) of the send. + +Under many circumstances, the rsocket UDP header also provides enough path +information for the receiver to send the reply using a UD QP. However, certain +fabric topologies may require contacting a subnet administrator. Whenever +rsockets lacks sufficient information to send data to a remote peer using +a UD QP, it will automatically fall back to using a standard UDP socket. This +helps minimize the latency for performing a given send, while lenghty address +and route resolution protocols complete. + +Currently, rsockets allocates a single UD QP, with an infrastructure is provided +to support more. Future enhancements may add support for multiple UD QPs, each +associated with a single network (IP) address. Multiplexing different network +addresses over a single UD QP and using shared receive queues are other possible +enhancements. + +Because rsockets may use multiple UD QPs, buffer management can become an issue. +The total size of receive buffers is specified by the mem_default configuration +files, but may be overridden with rsetsockopt. The buffer is divided into +MTU sized messages and distributed among the allocated QPs. As new QPs are +created, the receive buffers may be redistributed by posting loopback sends +on a QP in order to free the receive buffers for reposting to a different QP. \ No newline at end of file diff --git a/src/cma.c b/src/cma.c index 91bf1084..2c6b0320 100755 --- a/src/cma.c +++ b/src/cma.c @@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id) int ucma_max_qpsize(struct rdma_cm_id *id) { struct cma_id_private *id_priv; + int i, max_size = 0; id_priv = container_of(id, struct cma_id_private, id); - return id_priv->cma_dev->max_qpsize; + if (id && id_priv->cma_dev) { + max_size = id_priv->cma_dev->max_qpsize; + } else { + for (i = 0; i < cma_dev_cnt; i++) { + if (!max_size || max_size > cma_dev_array[i].max_qpsize) + max_size = cma_dev_array[i].max_qpsize; + } + } + return max_size; } uint16_t ucma_get_port(struct sockaddr *addr) diff --git a/src/rsocket.c b/src/rsocket.c index 58fcb8e5..0695d12d 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -55,7 +55,7 @@ #define RS_OLAP_START_SIZE 2048 #define RS_MAX_TRANSFER 65536 -#define RS_SNDLOWAT 64 +#define RS_SNDLOWAT 2048 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 @@ -63,6 +63,23 @@ static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +enum { + RS_SVC_INSERT, + RS_SVC_REMOVE +}; + +struct rsocket; + +struct rs_svc_msg { + uint32_t op; + uint32_t status; + struct rsocket *rs; +}; + +static pthread_t svc_id; +static int svc_cnt; +static int svc_fds[2]; + static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; @@ -110,6 +127,12 @@ struct rs_msg { uint32_t data; }; +struct ds_qp; + +struct ds_msg { + struct ds_qp *qp; +}; + struct rs_sge { uint64_t addr; uint32_t key; @@ -159,9 +182,9 @@ enum rs_state { rs_connecting = rs_opening | 0x0040, rs_accepting = rs_opening | 0x0080, rs_connected = 0x0100, - rs_connect_wr = 0x0200, - rs_connect_rd = 0x0400, - rs_connect_rdwr = rs_connected | rs_connect_rd | rs_connect_wr, + rs_writable = 0x0200, + rs_readable = 0x0400, + rs_connect_rdwr = rs_connected | rs_readable | rs_writable, rs_connect_error = 0x0800, rs_disconnected = 0x1000, rs_error = 0x2000, @@ -169,13 +192,68 @@ enum rs_state { #define RS_OPT_SWAP_SGL 1 -struct rsocket { +union socket_addr { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; +}; + +struct ds_qp { + dlist_t list; + struct rsocket *rs; struct rdma_cm_id *cm_id; + + struct ibv_mr *rmr; + uint8_t *rbuf; + + struct ibv_mr *smr; + uint16_t lid; + uint8_t sl; +}; + +struct ds_dest { + union socket_addr addr; /* must be first */ + struct ds_qp *qp; + struct ibv_ah *ah; + uint32_t qpn; +}; + +struct rsocket { + int type; + int index; fastlock_t slock; fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; - fastlock_t iomap_lock; + fastlock_t map_lock; /* acquire slock first if needed */ + + union { + /* data stream */ + struct { + struct rdma_cm_id *cm_id; + + int remote_sge; + struct rs_sge remote_sgl; + struct rs_sge remote_iomap; + + struct ibv_mr *target_mr; + int target_sge; + int target_iomap_size; + void *target_buffer_list; + volatile struct rs_sge *target_sgl; + struct rs_iomap *target_iomap; + }; + /* datagram */ + struct { + dlist_t qp_list; + void *dest_map; + struct ds_dest *conn_dest; + struct pollfd *fds; + nfds_t nfds; + int dsock; + int sbytes_needed; + }; + }; int opts; long fd_flags; @@ -186,7 +264,7 @@ struct rsocket { int cq_armed; int retries; int err; - int index; + int ctrl_avail; int sqe_avail; int sbuf_bytes_avail; @@ -203,34 +281,93 @@ struct rsocket { int rbuf_offset; int rmsg_head; int rmsg_tail; - struct rs_msg *rmsg; - - int remote_sge; - struct rs_sge remote_sgl; - struct rs_sge remote_iomap; + union { + struct rs_msg *rmsg; + struct ds_msg *dmsg; + }; struct rs_iomap_mr *remote_iomappings; dlist_entry iomap_list; dlist_entry iomap_queue; int iomap_pending; - struct ibv_mr *target_mr; - int target_sge; - int target_iomap_size; - void *target_buffer_list; - volatile struct rs_sge *target_sgl; - struct rs_iomap *target_iomap; - uint32_t rbuf_size; - struct ibv_mr *rmr; + struct ibv_mr *rmr; uint8_t *rbuf; uint32_t sbuf_size; - struct ibv_mr *smr; + struct ibv_mr *smr; struct ibv_sge ssgl[2]; uint8_t *sbuf; }; +#define DS_UDP_TAG 0x5555555555555555ULL + +struct ds_udp_header { + uint64_t tag; + uint8_t version; + uint8_t sl; + uint16_t slid; + uint32_t qpn; /* upper 8-bits reserved */ +}; + + +static int rs_svc_run(void *arg) +{ + return 0; +} + +static int rs_svc_insert(struct rsocket *rs) +{ + struct rs_svc_msg msg; + int ret; + + pthread_mutex_lock(&mut); + if (!svc_cnt) { + ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds); + if (ret) + goto out; + + ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL); + if (ret) { + close(svc_fds[0]); + close(svc_fds[1]); + ret = ERR(ret); + goto out; + } + } + + msg.op = RS_SVC_INSERT; + msg.status = EINVAL; + msg.rs = rs; + svc_cnt++; + write(svc_fds[0], &msg, sizeof msg); + read(svc_fds[0], &msg, sizeof msg); + ret = ERR(msg.status); +out: + pthread_mutex_unlock(&mut); + return ret; +} + +static int rs_svc_remove(struct rsocket *rs) +{ + struct rs_svc_msg msg; + int ret; + + pthread_mutex_lock(&mut); + msg.op = RS_SVC_REMOVE; + msg.status = EINVAL; + msg.rs = rs; + write(svc_fds[0], &msg, sizeof msg); + read(svc_fds[0], &msg, sizeof msg); + ret = ERR(msg.status); + if (!ret && !--svn_cnt) + pthread_join(svc_id, NULL); + + pthread_mutex_unlock(&mut); + return ret; +} + static int rs_value_to_scale(int value, int bits) { return value <= (1 << (bits - 1)) ? @@ -306,10 +443,10 @@ out: pthread_mutex_unlock(&mut); } -static int rs_insert(struct rsocket *rs) +static int rs_insert(struct rsocket *rs, index) { pthread_mutex_lock(&mut); - rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs); + rs->index = idm_set(&idm, index, rs); pthread_mutex_unlock(&mut); return rs->index; } @@ -321,7 +458,7 @@ static void rs_remove(struct rsocket *rs) pthread_mutex_unlock(&mut); } -static struct rsocket *rs_alloc(struct rsocket *inherited_rs) +static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type) { struct rsocket *rs; @@ -329,7 +466,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) if (!rs) return NULL; + rs->type = type; rs->index = -1; + rs->dsock = -1; if (inherited_rs) { rs->sbuf_size = inherited_rs->sbuf_size; rs->rbuf_size = inherited_rs->rbuf_size; @@ -351,12 +490,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); - fastlock_init(&rs->iomap_lock); + fastlock_init(&rs->map_lock); dlist_init(&rs->iomap_list); dlist_init(&rs->iomap_queue); return rs; } +/* + * TODO: Support datagram rsockets + */ static int rs_set_nonblocking(struct rsocket *rs, long arg) { int ret = 0; @@ -439,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs) return 0; } -static int rs_create_cq(struct rsocket *rs) +static int ds_init_bufs(struct ds_qp *qp) { - rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs); - if (!rs->cm_id->recv_cq_channel) + qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf)); + if (!qp->rbuf) + return ERR(ENOMEM); + + qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size); + if (!qp->smr) + return -1; + + qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size); + if (!qp->rmr) return -1; - rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size, - rs->cm_id, rs->cm_id->recv_cq_channel, 0); - if (!rs->cm_id->recv_cq) + return 0; +} + +static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) +{ + cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs); + if (!cm_id->recv_cq_channel) + return -1; + + cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, + cm_id, cm_id->recv_cq_channel, 0); + if (!cm_id->recv_cq) goto err1; if (rs->fd_flags & O_NONBLOCK) { @@ -455,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs) goto err2; } - rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel; - rs->cm_id->send_cq = rs->cm_id->recv_cq; + cm_id->send_cq_channel = cm_id->recv_cq_channel; + cm_id->send_cq = cm_id->recv_cq; return 0; err2: - ibv_destroy_cq(rs->cm_id->recv_cq); - rs->cm_id->recv_cq = NULL; + ibv_destroy_cq(cm_id->recv_cq); + cm_id->recv_cq = NULL; err1: - ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel); - rs->cm_id->recv_cq_channel = NULL; + ibv_destroy_comp_channel(cm_id->recv_cq_channel); + cm_id->recv_cq_channel = NULL; return -1; } -static inline int -rs_post_recv(struct rsocket *rs) +static inline int rs_post_recv(struct rsocket *rs) { struct ibv_recv_wr wr, *bad; @@ -481,6 +639,23 @@ rs_post_recv(struct rsocket *rs) return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); } +static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf) +{ + struct ibv_recv_wr wr, *bad; + struct ibv_sge sge; + + sge.addr = (uintptr_t) buf; + sge.length = RS_SNDLOWAT; + sge.lkey = qp->rmr; + + wr.wr_id = RS_RECV_WR_ID; + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; + + return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad)); +} + static int rs_create_ep(struct rsocket *rs) { struct ibv_qp_init_attr qp_attr; @@ -491,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs) if (ret) return ret; - ret = rs_create_cq(rs); + ret = rs_create_cq(rs, rs->cm_id); if (ret) return ret; @@ -548,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs) } } +static void ds_free_qp(struct ds_qp *qp) +{ + if (qp->smr) + rdma_dereg_mr(qp->smr); + + if (qp->rbuf) { + if (qp->rmr) + rdma_dereg_mr(qp->rmr); + free(qp->rbuf); + } + + if (qp->cm_id) { + if (qp->cm_id->qp) { + dlist_remove(&qp->list); + rdma_destroy_qp(qp->cm_id); + } + rdma_destroy_id(qp->cm_id); + } + free(qp); +} + +static void ds_free_qps(struct rsocket *rs) +{ + struct ds_qp *qp; + dlist_t *entry; + + while (!dlist_empty(&rs->qp_list)) { + qp = container_of(rs->qp_list.next, struct ds_qp, list); + ds_free_qp(qp); + } +} + +static void ds_free(struct rsocket *rs) +{ + if (rs->state & (rs_readable | rs_writable)) + rs_svc_remove(rs); + + if (rs->dsock >= 0) + close(rs->dsock); + + if (rs->index >= 0) + rs_remove(rs); + + ds_free_qps(rs); + if (rs->fds) + free(rs->fds); + + if (rs->sbuf) + free(rs->sbuf); + + fastlock_destroy(&rs->map_lock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); + free(rs); +} + static void rs_free(struct rsocket *rs) { + if (rs->type == SOCK_DGRAM) { + ds_free(rs); + return; + } + if (rs->index >= 0) rs_remove(rs); @@ -581,7 +819,7 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - fastlock_destroy(&rs->iomap_lock); + fastlock_destroy(&rs->map_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -635,29 +873,56 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) rs->sseq_comp = ntohs(conn->credits); } +static int ds_init(struct rsocket *rs, int domain) +{ + rs->dsock = socket(domain, SOCK_DGRAM, 0); + if (rs->dsock < 0) + return rs->dsock; + + rs->fds = calloc(1, sizeof *fds); + if (!rs->fds) + return ERR(ENOMEM); + rs->nfds = 1; + dlist_init(&rs->qp_list); + + return 0; +} + int rsocket(int domain, int type, int protocol) { struct rsocket *rs; - int ret; + int index, ret; if ((domain != PF_INET && domain != PF_INET6) || - (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP)) + ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) || + (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) || + (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP)) return ERR(ENOTSUP); rs_configure(); - rs = rs_alloc(NULL); + rs = rs_alloc(NULL, type); if (!rs) return ERR(ENOMEM); - ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); - if (ret) - goto err; + if (type == SOCK_STREAM) { + ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); + if (ret) + goto err; + + rs->cm_id->route.addr.src_addr.sa_family = domain; + index = rs->cm_id->channel->fd; + } else { + ret = ds_init(rs, domain); + if (ret) + goto err; + + index = rs->dsock; + } - ret = rs_insert(rs); + ret = rs_insert(rs, index); if (ret < 0) goto err; - rs->cm_id->route.addr.src_addr.sa_family = domain; return rs->index; err: @@ -671,9 +936,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen) int ret; rs = idm_at(&idm, socket); - ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); - if (!ret) - rs->state = rs_bound; + if (rs->type == SOCK_STREAM) { + ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); + if (!ret) + rs->state = rs_bound; + } else { + ret = bind(rs->dsock, addr, addrlen); + if (!ret) { + ret = rs_svc_insert(rs); + if (!ret) + rs->state = rs_readable | rs_writable; + } + } return ret; } @@ -709,7 +983,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) int ret; rs = idm_at(&idm, socket); - new_rs = rs_alloc(rs); + new_rs = rs_alloc(rs, rs->type); if (!new_rs) return ERR(ENOMEM); @@ -717,7 +991,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen) if (ret) goto err; - ret = rs_insert(new_rs); + ret = rs_insert(new_rs, rs->cm_id->channel->fd); if (ret < 0) goto err; @@ -854,13 +1128,212 @@ connected: return ret; } +static int ds_init_ep(struct rsocket *rs) +{ + int ret; + + rs_set_qp_size(rs); + if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT)) + rs->rq_size = rs->rbuf_size / RS_SNDLOWAT; + else + rs->rbuf_size = rs->rq_size * RS_SNDLOWAT; + + rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf)); + if (!rs->sbuf) + return ERR(ENOMEM); + + rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf; + rs->sbuf_bytes_avail = rs->sbuf_size; + rs->sqe_avail = rs->sq_size; + + ret = rs_svc_insert(rs); + if (ret) + return ret; + + rs->state = rs_readable | rs_writable; + return 0; +} + +static int ds_compare_addr(const void *dst1, const void *dst2) +{ + const struct sockaddr *sa1, *sa2; + size_t len; + + sa1 = (const struct sockaddr *) dst1; + sa2 = (const struct sockaddr *) dst2; + + len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr); + return memcmp(dst1, dst2, len); +} + +static int rs_any_addr(const union socket_addr *addr) +{ + if (addr->sa.sa_family == AF_INET) { + return (addr->sin.sin_addr == INADDR_ANY || + addr->sin.sin_addr == INADDR_LOOPBACK); + } else { + return (addr->sin6.sin6_addr == in6addr_any || + addr->sin6.sin6_addr == in6addr_loopback); + } +} + +static int ds_get_src_addr(struct rsocket *rs, + const struct sockaddr *dest_addr, socklen_t dest_len, + union socket_addr *src_addr, socklen_t *src_len) +{ + int sock, ret; + uint16_t port; + + *src_len = sizeof src_addr; + ret = getsockname(rs->dsock, &src_addr->sa, src_len); + if (ret || !rs_any_addr(src_addr)) + return ret; + + port = src_addr->sin.sin_port; + sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0); + if (sock < 0) + return sock; + + ret = connect(sock, dest_addr, dest_len); + if (ret) + goto out; + + *src_len = sizeof src_addr; + ret = getsockname(sock, &src_addr->sa, src_len); + src_addr->sin.sin_port = port; +out: + close(sock); + return ret; +} + +static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, + socklen_t addrlen, struct ds_qp **qp) +{ + struct ibv_qp_init_attr qp_attr; + int ret; + + *qp = calloc(1, sizeof(struct ds_qp)); + if (!*qp) + return ERR(ENOMEM); + + (*qp)->rs = rs; + ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP); + if (ret) + goto err; + + ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa); + if (ret) + goto err; + + ret = ds_init_bufs(*qp); + if (ret) + goto err; + + ret = rs_create_cq(rs, (*qp)->cm_id); + if (ret) + goto err; + + memset(&qp_attr, 0, sizeof qp_attr); + qp_attr.qp_context = qp; + qp_attr.send_cq = rs->cm_id->send_cq; + qp_attr.recv_cq = rs->cm_id->recv_cq; + qp_attr.qp_type = IBV_QPT_UD; + qp_attr.sq_sig_all = 1; + qp_attr.cap.max_send_wr = rs->sq_size; + qp_attr.cap.max_recv_wr = rs->rq_size; + qp_attr.cap.max_send_sge = 2; + qp_attr.cap.max_recv_sge = 1; + qp_attr.cap.max_inline_data = rs->sq_inline; + + ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr); + if (ret) + return ret; + + for (i = 0; i < rs->rq_size; i++) { + ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT); + if (ret) + goto err; + } + list_insert_head(&(*qp)->list, &rs->qp_list); + return 0; +err: + ds_free_qp(*qp); + return ret; +} + +static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr, + socklen_t addrlen, struct ds_qp **qp) +{ + dlist_t *entry; + + for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) { + *qp = container_of(entry, struct ds_qp, list); + if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr) + return 0; + } + + return ds_create_qp(rs, src_addr, addrlen, qp); +} + +static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, + socklen_t addrlen, struct ds_dest **dest) +{ + union socket_addr src_addr; + socklen_t src_len; + struct ds_qp *qp; + int ret = 0; + + fastlock_acquire(&rs->map_lock); + dest = tfind(addr, &rs->dest_map, ds_compare_addr); + if (dest) + goto out; + + if (rs->state == rs_init) { + ret = ds_init_ep(rs); + if (ret) + goto out; + } + + ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); + if (ret) + goto out; + + ret = ds_get_qp(rs, src_addr, src_len, &qp); + if (ret) + goto out; + + *dest = calloc(1, sizeof(struct ds_dest)); + if (!*dest) { + ret = ERR(ENOMEM); + goto out; + } + + memcpy(&(*dest)->addr, addr, addrlen); + (*dest)->qp = qp; + tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr); +out: + fastlock_release(&rs->map_lock); + return ret; +} + int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) { struct rsocket *rs; + int ret; rs = idm_at(&idm, socket); - memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); - return rs_do_connect(rs); + if (rs->type == SOCK_STREAM) { + memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); + ret = rs_do_connect(rs); + } else { + fastlock_acquire(&rs->slock); + ret = connect(rs->dsock, addr, addrlen); + if (!ret) + ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); + fastlock_release(&rs->slock); + } + return ret; } static int rs_post_write_msg(struct rsocket *rs, @@ -902,6 +1375,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int ds_post_send(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_SEND; + wr.send_flags = flags; + wr.wr.ud.ah = rs->conn_dest->ah; + wr.wr.ud.remote_qpn = rs->conn_dest->qpn; + wr.wr.ud.remote_qkey = RDMA_UDP_QKEY; + + return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -932,6 +1424,15 @@ static int rs_write_data(struct rsocket *rs, flags, addr, rkey); } +static int ds_send_data(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint32_t length, int flags) +{ + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags); +} + static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, struct ibv_sge *sgl, int nsge, uint32_t length, int flags) { @@ -1045,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs) rs->state = rs_disconnected; return 0; } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) { - rs->state &= ~rs_connect_rd; + rs->state &= ~rs_readable; } break; case RS_OP_WRITE: @@ -1218,9 +1719,14 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } +static int ds_can_send(struct rsocket *rs) +{ + return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed); +} + static int rs_conn_can_send(struct rsocket *rs) { - return rs_can_send(rs) || !(rs->state & rs_connect_wr); + return rs_can_send(rs) || !(rs->state & rs_writable); } static int rs_conn_can_send_ctrl(struct rsocket *rs) @@ -1235,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs) static int rs_conn_have_rdata(struct rsocket *rs) { - return rs_have_rdata(rs) || !(rs->state & rs_connect_rd); + return rs_have_rdata(rs) || !(rs->state & rs_readable); } static int rs_conn_all_sends_done(struct rsocket *rs) @@ -1338,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) rs->rbuf_bytes_avail += rsize; } - } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd)); + } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable)); fastlock_release(&rs->rlock); return ret ? ret : len - left; @@ -1390,14 +1896,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) struct rs_iomap iom; int ret; - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); while (!dlist_empty(&rs->iomap_queue)) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -1446,10 +1952,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags) } rs->iomap_pending = !dlist_empty(&rs->iomap_queue); - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } +static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, + int iovcnt, int flags) +{ + struct ds_udp_header hdr; + struct msghdr msg; + struct iovec miov[4]; + struct ds_qp *qp; + + if (iovcnt > 4) + return ERR(ENOTSUP); + + qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL; + hdr.tag = htonll(DS_UDP_TAG); + hdr.version = 1; + if (qp) { + hdr.sl = qp->sl; + hdr.slid = htons(qp->lid); + hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF); + } else { + hdr.sl = 0; + hdr.slid = 0; + hdr.qpn = 0; + } + + miov[0].iov_base = &hdr; + miov[0].iov_len = sizeof hdr; + memcpy(&miov[1], iov, sizeof *iov * iovcnt); + + memset(&msg, 0, sizeof msg); + /* TODO: specify name if needed */ + msg.msg_iov = miov; + msg.msg_iovlen = iovcnt + 1; + return sendmsg(rs->fd, msg, flags); +} + +static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct iovec iov; + iov.iov_base = buf; + iov_iov_len = len; + return ds_sendv_udp(s, &iov, 1, flags); +} + +static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) +{ + struct ibv_sge sge; + int ret = 0; + + if (!rs->conn_dest || !rs->conn_dest->ah) + return ds_send_udp(rs, buf, len, flags); + + rs->sbytes_needed = len; + if (!ds_can_send(rs)) { + ret = rs_get_comp(rs, 1, ds_can_send); + if (ret) + return ds_send_udp(rs, buf, len, flags); + } + + if (len <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = len; + sge.lkey = 0; + ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE); + } else if (len <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len); + rs->ssgl[0].length = len; + ret = ds_send_data(rs, rs->ssgl, 1, len, 0); + if (len < rs_sbuf_left(rs)) + rs->ssgl[0].addr += len; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = len - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = ds_send_data(rs, rs->ssgl, 2, len, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + + return ret ? ret : len; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1463,6 +2053,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM) { + fastlock_acquire(&rs->slock); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; + } + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { @@ -1484,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -1537,10 +2134,21 @@ out: ssize_t rsendto(int socket, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) { - if (dest_addr || addrlen) - return ERR(EISCONN); + struct rsocket *rs; + + rs = idm_at(&idm, socket); + if (rs->type == SOCK_STREAM) { + if (dest_addr || addrlen) + return ERR(EISCONN); - return rsend(socket, buf, len, flags); + return rsend(socket, buf, len, flags); + } + + fastlock_acquire(&rs->slock); + ds_connect(rs, dest_addr, addrlen); + ret = dsend(rs, buf, len, flags); + fastlock_release(&rs->slock); + return ret; } static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len) @@ -1599,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -1652,7 +2260,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags) if (msg->msg_control && msg->msg_controllen) return ERR(ENOTSUP); - return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags); + return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags); } ssize_t rwrite(int socket, const void *buf, size_t count) @@ -1948,7 +2556,7 @@ int rshutdown(int socket, int how) rs = idm_at(&idm, socket); if (how == SHUT_RD) { - rs->state &= ~rs_connect_rd; + rs->state &= ~rs_readable; return 0; } @@ -1958,10 +2566,10 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) { if (how == SHUT_RDWR) { ctrl = RS_CTRL_DISCONNECT; - rs->state &= ~(rs_connect_rd | rs_connect_wr); + rs->state &= ~(rs_readable | rs_writable); } else { - rs->state &= ~rs_connect_wr; - ctrl = (rs->state & rs_connect_rd) ? + rs->state &= ~rs_writable; + ctrl = (rs->state & rs_readable) ? RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT; } if (!rs->ctrl_avail) { @@ -2017,8 +2625,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); + return 0; + } else { + return getpeername(rs->fs, addr, addrlen); + } } int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) @@ -2026,8 +2638,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen) struct rsocket *rs; rs = idm_at(&idm, socket); - rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); - return 0; + if (rs->type == SOCK_STREAM) { + rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); + return 0; + } else { + return getsockname(rs->fd, addr, addrlen); + } } int rsetsockopt(int socket, int level, int optname, @@ -2039,6 +2655,12 @@ int rsetsockopt(int socket, int level, int optname, ret = ERR(ENOTSUP); rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { + ret = setsockopt(rs->fd, optname, optval, optlen); + if (ret) + return ret; + } + switch (level) { case SOL_SOCKET: opts = &rs->so_opts; @@ -2156,6 +2778,9 @@ int rgetsockopt(int socket, int level, int optname, int ret = 0; rs = idm_at(&idm, socket); + if (rs->type == SOCK_DGRAM && level != SOL_RDMA) + return getsockopt(rs->fd, level, optname, optval, optlen); + switch (level) { case SOL_SOCKET: switch (optname) { @@ -2314,7 +2939,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) return ERR(EINVAL); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); if (prot & PROT_WRITE) { iomr = rs_get_iomap_mr(rs); access |= IBV_ACCESS_REMOTE_WRITE; @@ -2348,7 +2973,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse dlist_insert_tail(&iomr->entry, &rs->iomap_list); } out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return offset; } @@ -2360,7 +2985,7 @@ int riounmap(int socket, void *buf, size_t len) int ret = 0; rs = idm_at(&idm, socket); - fastlock_acquire(&rs->iomap_lock); + fastlock_acquire(&rs->map_lock); for (entry = rs->iomap_list.next; entry != &rs->iomap_list; entry = entry->next) { @@ -2381,7 +3006,7 @@ int riounmap(int socket, void *buf, size_t len) } ret = ERR(EINVAL); out: - fastlock_release(&rs->iomap_lock); + fastlock_release(&rs->map_lock); return ret; } @@ -2425,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla rs_conn_can_send); if (ret) break; - if (!(rs->state & rs_connect_wr)) { + if (!(rs->state & rs_writable)) { ret = ERR(ECONNRESET); break; } @@ -2475,3 +3100,24 @@ out: return (ret && left == count) ? ret : count - left; } + +ssize_t urecvfrom(int socket, void *buf, size_t len, int flags, + struct sockaddr *src_addr, socklen_t *addrlen) +{ + int ret; + + ret = rrecv(socket, buf, len, flags); + if (ret > 0 && src_addr) + rgetpeername(socket, src_addr, addrlen); + + return ret; +} + +ssize_t usendto(int socket, const void *buf, size_t len, int flags, + const struct sockaddr *dest_addr, socklen_t addrlen) +{ + if (dest_addr || addrlen) + return ERR(EISCONN); + + return usend(socket, buf, len, flags); +}