Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top: da0048097eea01b21df587e85b3f7ac44a2582c8
+Top: faabde8e748d27fc0f733e702ed7f3c3902d8f58
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
---
diff --git a/docs/rsocket b/docs/rsocket
-index 1484f65..03d49df 100644
+index 1484f65..a660208 100644
--- a/docs/rsocket
+++ b/docs/rsocket
@@ -1,7 +1,7 @@
Rsockets is a protocol over RDMA that supports a socket-level API
for applications. For details on the current state of the
implementation, readers should refer to the rsocket man page. This
-@@ -189,3 +189,47 @@ registered remote data buffer.
+@@ -189,3 +189,34 @@ registered remote data buffer.
From host A's perspective, the transfer appears as a normal send/write
operation, with the data stream redirected directly into the receiving
application's buffer.
+
+Datagram Overview
+-----------------
-+THe rsocket API supports datagram sockets. Datagram support is handled through an
++The rsocket API supports datagram sockets. Datagram support is handled through an
+entirely different protocol and internal implementation. Unlike connected rsockets,
+datagram rsockets are not necessarily bound to a network (IP) address. A datagram
+socket may use any number of network (IP) addresses, including those which map to
+
+Rsockets uses headers inserted before user data sent over UDP sockets to resolve
+remote UD QP numbers. When a user first attempts to send a datagram to a remote
-+address (IP and UDP port), rsockets will take the following steps. First, it
-+will allocate and store the destination address into a lookup table for future
-+use. Then it will resolve which local network address should be used when sending
-+to the specified destination. It will allocate a UD QP on the RDMA device
-+associated with the local address. Finally, rsockets will send the user's
-+datagram to the remote UDP socket, but inserting a header before the datagram.
-+The header specifies the UD QP number associated with the local network address
-+(IP and UDP port) of the send.
-+
-+Under many circumstances, the rsocket UDP header also provides enough path
-+information for the receiver to send the reply using a UD QP. However, certain
-+fabric topologies may require contacting a subnet administrator. Whenever
-+rsockets lacks sufficient information to send data to a remote peer using
-+a UD QP, it will automatically fall back to using a standard UDP socket. This
-+helps minimize the latency for performing a given send, while lenghty address
-+and route resolution protocols complete.
-+
-+Currently, rsockets allocates a single UD QP, with an infrastructure is provided
-+to support more. Future enhancements may add support for multiple UD QPs, each
-+associated with a single network (IP) address. Multiplexing different network
-+addresses over a single UD QP and using shared receive queues are other possible
-+enhancements.
-+
-+Because rsockets may use multiple UD QPs, buffer management can become an issue.
-+The total size of receive buffers is specified by the mem_default configuration
-+files, but may be overridden with rsetsockopt. The buffer is divided into
-+MTU sized messages and distributed among the allocated QPs. As new QPs are
-+created, the receive buffers may be redistributed by posting loopback sends
-+on a QP in order to free the receive buffers for reposting to a different QP.
++address (IP and UDP port), rsockets will take the following steps:
++
++1. Store the destination address into a lookup table.
++2. Resolve which local network address should be used when sending
++ to the specified destination.
++3. Allocate a UD QP on the RDMA device associated with the local address.
++4. Send the user's datagram to the remote UDP socket.
++
++A header is inserted before the user's datagram. The header specifies the
++UD QP number associated with the local network address (IP and UDP port) of
++the send.
++
++A service thread is used to process messages received on the UDP socket. This
++thread updates the rsocket lookup tables with the remote QPN and path record
++data. The service thread forwards data received on the UDP socket to an
++rsocket QP.
\ No newline at end of file
diff --git a/src/cma.c b/src/cma.c
index 91bf108..2c6b032 100755
uint16_t ucma_get_port(struct sockaddr *addr)
diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..0695d12 100644
+index 58fcb8e..a81b8f3 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -55,7 +55,7 @@
+@@ -46,6 +46,7 @@
+ #include <string.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
++#include <sys/epoll.h>
+
+ #include <rdma/rdma_cma.h>
+ #include <rdma/rdma_verbs.h>
+@@ -55,7 +56,7 @@
#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
-@@ -63,6 +63,23 @@
+@@ -63,6 +64,23 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
-@@ -110,6 +127,12 @@ struct rs_msg {
+@@ -99,6 +117,14 @@ enum {
+ #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
+ #define rs_msg_op(imm_data) (imm_data >> 29)
+ #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
++#define RS_RECV_WR_ID (~((uint64_t) 0))
++
++#define DS_WR_RECV 0xFFFFFFFF
++#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
++#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
++#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
++#define ds_wr_length(wr_id) ((uint32_t) wr_id)
++#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
+
+ enum {
+ RS_CTRL_DISCONNECT,
+@@ -110,6 +136,18 @@ struct rs_msg {
uint32_t data;
};
+struct ds_qp;
+
-+struct ds_msg {
-+ struct ds_qp *qp;
++struct ds_rmsg {
++ struct ds_qp *qp;
++ uint32_t offset;
++ uint32_t length;
++};
++
++struct ds_smsg {
++ struct ds_smsg *next;
+};
+
struct rs_sge {
uint64_t addr;
uint32_t key;
-@@ -159,9 +182,9 @@ enum rs_state {
+@@ -144,8 +182,6 @@ struct rs_conn_data {
+ struct rs_sge data_buf;
+ };
+
+-#define RS_RECV_WR_ID (~((uint64_t) 0))
+-
+ /*
+ * rsocket states are ordered as passive, connecting, connected, disconnected.
+ */
+@@ -159,9 +195,9 @@ enum rs_state {
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -169,13 +192,68 @@ enum rs_state {
+@@ -169,68 +205,203 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
+ struct rsocket *rs;
struct rdma_cm_id *cm_id;
+
++ struct ibv_mr *smr;
+ struct ibv_mr *rmr;
+ uint8_t *rbuf;
+
-+ struct ibv_mr *smr;
-+ uint16_t lid;
-+ uint8_t sl;
++ int cq_armed;
+};
+
+struct ds_dest {
+ /* data stream */
+ struct {
+ struct rdma_cm_id *cm_id;
++ uint64_t tcp_opts;
++
++ int ctrl_avail;
++ uint16_t sseq_no;
++ uint16_t sseq_comp;
++ uint16_t rseq_no;
++ uint16_t rseq_comp;
+
+ int remote_sge;
+ struct rs_sge remote_sgl;
+ void *target_buffer_list;
+ volatile struct rs_sge *target_sgl;
+ struct rs_iomap *target_iomap;
++
++ int rbuf_bytes_avail;
++ int rbuf_free_offset;
++ int rbuf_offset;
++ struct ibv_mr *rmr;
++ uint8_t *rbuf;
++
++ int sbuf_bytes_avail;
++ struct ibv_mr *smr;
++ struct ibv_sge ssgl[2];
+ };
+ /* datagram */
+ struct {
-+ dlist_t qp_list;
++ struct ds_qp *qp_list;
+ void *dest_map;
+ struct ds_dest *conn_dest;
-+ struct pollfd *fds;
-+ nfds_t nfds;
-+ int dsock;
-+ int sbytes_needed;
++
++ int udp_sock;
++ int epfd;
++ int rqe_avail;
++ struct ds_smsg *smsg_free;
+ };
+ };
int opts;
long fd_flags;
-@@ -186,7 +264,7 @@ struct rsocket {
+ uint64_t so_opts;
+- uint64_t tcp_opts;
+ uint64_t ipv6_opts;
+ int state;
int cq_armed;
int retries;
int err;
- int index;
+- int ctrl_avail;
+
- int ctrl_avail;
int sqe_avail;
- int sbuf_bytes_avail;
-@@ -203,34 +281,93 @@ struct rsocket {
- int rbuf_offset;
+- int sbuf_bytes_avail;
+- uint16_t sseq_no;
+- uint16_t sseq_comp;
++ uint32_t sbuf_size;
+ uint16_t sq_size;
+ uint16_t sq_inline;
+
++ uint32_t rbuf_size;
+ uint16_t rq_size;
+- uint16_t rseq_no;
+- uint16_t rseq_comp;
+- int rbuf_bytes_avail;
+- int rbuf_free_offset;
+- int rbuf_offset;
int rmsg_head;
int rmsg_tail;
- struct rs_msg *rmsg;
- struct rs_sge remote_iomap;
+ union {
+ struct rs_msg *rmsg;
-+ struct ds_msg *dmsg;
++ struct ds_rmsg *dmsg;
+ };
++ uint8_t *sbuf;
struct rs_iomap_mr *remote_iomappings;
dlist_entry iomap_list;
dlist_entry iomap_queue;
int iomap_pending;
++};
- struct ibv_mr *target_mr;
- int target_sge;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
-
- uint32_t rbuf_size;
+- uint32_t rbuf_size;
- struct ibv_mr *rmr;
-+ struct ibv_mr *rmr;
- uint8_t *rbuf;
+- uint8_t *rbuf;
++#define DS_UDP_TAG 0x5555555555555555ULL
- uint32_t sbuf_size;
+- uint32_t sbuf_size;
- struct ibv_mr *smr;
-+ struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
- uint8_t *sbuf;
- };
-
-+#define DS_UDP_TAG 0x5555555555555555ULL
-+
+- struct ibv_sge ssgl[2];
+- uint8_t *sbuf;
+struct ds_udp_header {
+ uint64_t tag;
+ uint8_t version;
-+ uint8_t sl;
-+ uint16_t slid;
++ uint8_t reserved[3];
+ uint32_t qpn; /* upper 8-bits reserved */
-+};
+ };
+
+
++#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
++
++static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
++{
++ if (!rs->qp_list)
++ list_init(&qp->list);
++ else
++ list_insert_head(&qp->list, &rs->qp_list->list);
++ rs->qp_list = *qp;
++}
++
++static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
++{
++ if (qp->list.next != qp->list) {
++ rs->qp_list = ds_next_qp(qp);
++ dlist_remove(&qp->list);
++ } else {
++ rs->qp_list = NULL;
++ }
++}
+
+static int rs_svc_run(void *arg)
+{
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -306,10 +443,10 @@ out:
+@@ -306,10 +477,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -321,7 +458,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -321,7 +492,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -329,7 +466,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -329,7 +500,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
+ rs->type = type;
rs->index = -1;
-+ rs->dsock = -1;
++ rs->udp_sock = -1;
++ rs->epfd = -1;
++
if (inherited_rs) {
rs->sbuf_size = inherited_rs->sbuf_size;
rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,12 +490,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -351,7 +526,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
- }
+@@ -359,13 +534,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-+/*
-+ * TODO: Support datagram rsockets
-+ */
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
++ struct ds_qp *qp;
int ret = 0;
-@@ -439,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
+
+- if (rs->cm_id->recv_cq_channel)
+- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
++ if (rs->type == SOCK_STREAM) {
++ if (rs->cm_id->recv_cq_channel)
++ ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
+
+- if (!ret && rs->state < rs_connected)
+- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++ if (!ret && rs->state < rs_connected)
++ ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
++ } else {
++ ret = fcntl(rs->epfd, F_SETFL, arg);
++
++ if (!ret && rs->qp_list) {
++ qp = rs->qp_list;
++ do {
++ ret = fcntl(qp->cm_id->recv_cq_channel->fd,
++ F_SETFL, arg);
++ qp = ds_next_qp(qp);
++ } while (qp != rs->qp_list && !ret);
++ }
++ }
+
+ return ret;
+ }
+@@ -389,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+ rs->rq_size = 2;
+ }
+
++static void ds_set_qp_size(struct rsocket *rs)
++{
++ uint16_t max_size;
++
++ max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
++
++ if (rs->sq_size > max_size)
++ rs->sq_size = max_size;
++ if (rs->rq_size > max_size)
++ rs->rq_size = max_size;
++
++ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
++ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
++ else
++ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++
++ if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
++ rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
++ else
++ rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
++}
++
+ static int rs_init_bufs(struct rsocket *rs)
+ {
+ size_t len;
+
+ rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
+ if (!rs->rmsg)
+- return -1;
++ return ERR(ENOEMEM);
+
+ rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+ if (!rs->sbuf)
+- return -1;
++ return ERR(ENOEMEM);
+
+ rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
+ if (!rs->smr)
+@@ -409,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
+ sizeof(*rs->target_iomap) * rs->target_iomap_size;
+ rs->target_buffer_list = malloc(len);
+ if (!rs->target_buffer_list)
+- return -1;
++ return ERR(ENOEMEM);
+
+ rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
+ if (!rs->target_mr)
+@@ -422,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
+
+ rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
+ if (!rs->rbuf)
+- return -1;
++ return ERR(ENOEMEM);
+
+ rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
+ if (!rs->rmr)
+@@ -439,15 +650,32 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -455,21 +683,20 @@ static int rs_create_cq(struct rsocket *rs)
goto err2;
}
{
struct ibv_recv_wr wr, *bad;
-@@ -481,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -481,6 +708,23 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
+ sge.length = RS_SNDLOWAT;
+ sge.lkey = qp->rmr;
+
-+ wr.wr_id = RS_RECV_WR_ID;
++ wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
+ wr.next = NULL;
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -491,7 +735,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -548,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -548,8 +792,73 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
+
+ if (qp->cm_id) {
+ if (qp->cm_id->qp) {
-+ dlist_remove(&qp->list);
++ epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
++ qp->cm_id->recv_cq_channel->fd, NULL);
+ rdma_destroy_qp(qp->cm_id);
+ }
+ rdma_destroy_id(qp->cm_id);
+ free(qp);
+}
+
-+static void ds_free_qps(struct rsocket *rs)
++static void ds_free(struct rsocket *rs)
+{
+ struct ds_qp *qp;
-+ dlist_t *entry;
+
-+ while (!dlist_empty(&rs->qp_list)) {
-+ qp = container_of(rs->qp_list.next, struct ds_qp, list);
-+ ds_free_qp(qp);
-+ }
-+}
-+
-+static void ds_free(struct rsocket *rs)
-+{
+ if (rs->state & (rs_readable | rs_writable))
+ rs_svc_remove(rs);
+
-+ if (rs->dsock >= 0)
-+ close(rs->dsock);
++ if (rs->udp_sock >= 0)
++ close(rs->udp_sock);
+
+ if (rs->index >= 0)
+ rs_remove(rs);
+
-+ ds_free_qps(rs);
-+ if (rs->fds)
-+ free(rs->fds);
++ if (rs->dmsg)
++ free(rs->dmsg);
++
++ if (rs->smsg)
++ free(rs->smsg);
++
++ while (rs->qp_list) {
++ ds_remove_qp(rs, rs->qp_list);
++ ds_free_qp(rs->qp_list);
++ }
++
++ if (rs->epfd >= 0)
++ close(rs->epfd);
+
+ if (rs->sbuf)
+ free(rs->sbuf);
if (rs->index >= 0)
rs_remove(rs);
-@@ -581,7 +819,7 @@ static void rs_free(struct rsocket *rs)
+@@ -581,7 +890,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -635,29 +873,56 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -635,29 +944,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
+static int ds_init(struct rsocket *rs, int domain)
+{
-+ rs->dsock = socket(domain, SOCK_DGRAM, 0);
-+ if (rs->dsock < 0)
-+ return rs->dsock;
++ rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
++ if (rs->udp_sock < 0)
++ return rs->udp_sock;
+
-+ rs->fds = calloc(1, sizeof *fds);
-+ if (!rs->fds)
-+ return ERR(ENOMEM);
-+ rs->nfds = 1;
-+ dlist_init(&rs->qp_list);
++ rs->epfd = epoll_create(0);
++ if (rs->epfd < 0)
++ return rs->epfd;
+
+ return 0;
+}
+ if (ret)
+ goto err;
+
-+ index = rs->dsock;
++ index = rs->udp_sock;
+ }
- ret = rs_insert(rs);
return rs->index;
err:
-@@ -671,9 +936,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -671,9 +1005,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
+ if (!ret)
+ rs->state = rs_bound;
+ } else {
-+ ret = bind(rs->dsock, addr, addrlen);
++ ret = bind(rs->udp_sock, addr, addrlen);
+ if (!ret) {
+ ret = rs_svc_insert(rs);
+ if (!ret)
return ret;
}
-@@ -709,7 +983,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -709,7 +1052,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -717,7 +991,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -717,7 +1060,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -854,13 +1128,212 @@ connected:
+@@ -854,13 +1197,231 @@ connected:
return ret;
}
+static int ds_init_ep(struct rsocket *rs)
+{
-+ int ret;
++ struct ds_smsg *msg;
++ int i, ret;
+
-+ rs_set_qp_size(rs);
-+ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-+ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-+ else
-+ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
++ ds_set_qp_size(rs);
+
-+ rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
++ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
+ if (!rs->sbuf)
+ return ERR(ENOMEM);
+
-+ rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
++ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
++ if (!rs->dmsg)
++ return ERR(ENOEMEM);
++
+ rs->sbuf_bytes_avail = rs->sbuf_size;
+ rs->sqe_avail = rs->sq_size;
++ rs->rqe_avail = rs->rq_size;
++
++ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
++ msg = rs->smsg_free;
++ for (i = 0; i < rs->sq_size - 1; i++) {
++ msg->next = (void *) msg + i * RS_SNDLOWAT;
++ msg = msg->next;
++ }
++ msg->next = NULL;
+
+ ret = rs_svc_insert(rs);
+ if (ret)
+ uint16_t port;
+
+ *src_len = sizeof src_addr;
-+ ret = getsockname(rs->dsock, &src_addr->sa, src_len);
++ ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
+ if (ret || !rs_any_addr(src_addr))
+ return ret;
+
+ socklen_t addrlen, struct ds_qp **qp)
+{
+ struct ibv_qp_init_attr qp_attr;
++ struct epoll_event event;
+ int ret;
+
+ *qp = calloc(1, sizeof(struct ds_qp));
+ qp_attr.cap.max_send_sge = 2;
+ qp_attr.cap.max_recv_sge = 1;
+ qp_attr.cap.max_inline_data = rs->sq_inline;
-+
+ ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
+ if (ret)
-+ return ret;
++ goto err;
++
++ event.events = EPOLLIN | EPOLLOUT;
++ event.data.ptr = *qp;
++ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
++ (*qp)->cm_id->recv_cq_channel->fd, &event);
++ if (ret)
++ goto err;
+
+ for (i = 0; i < rs->rq_size; i++) {
+ ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
+ if (ret)
+ goto err;
+ }
-+ list_insert_head(&(*qp)->list, &rs->qp_list);
++
++ ds_insert_qp(rs, *qp);
+ return 0;
+err:
+ ds_free_qp(*qp);
+static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
+ socklen_t addrlen, struct ds_qp **qp)
+{
-+ dlist_t *entry;
-+
-+ for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
-+ *qp = container_of(entry, struct ds_qp, list);
-+ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-+ return 0;
++ if (rs->qp_list) {
++ *qp = rs->qp_list;
++ do {
++ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
++ return 0;
++
++ *qp = ds_next_qp(*qp);
++ } while (*qp != rs->qp_list);
+ }
+
+ return ds_create_qp(rs, src_addr, addrlen, qp);
+ ret = rs_do_connect(rs);
+ } else {
+ fastlock_acquire(&rs->slock);
-+ ret = connect(rs->dsock, addr, addrlen);
++ ret = connect(rs->udp_sock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+ fastlock_release(&rs->slock);
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1375,25 @@ static int rs_post_write(struct rsocket *rs,
+@@ -902,6 +1463,25 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
-@@ -932,6 +1424,15 @@ static int rs_write_data(struct rsocket *rs,
+@@ -932,6 +1512,18 @@ static int rs_write_data(struct rsocket *rs,
flags, addr, rkey);
}
+ struct ibv_sge *sgl, int nsge,
+ uint32_t length, int flags)
+{
++ uint64_t offset;
++
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= length;
-+ return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
++ offset = sgl->addr - (uintptr_t) rs->sbuf;
++ return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
+}
+
static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
{
-@@ -1045,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1045,7 +1637,7 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
}
break;
case RS_OP_WRITE:
-@@ -1218,9 +1719,14 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1187,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+ return ret;
+ }
+
++/*
++ * Poll all CQs associated with a datagram rsocket. We need to drop any
++ * received messages that we do not have room to store. To limit drops,
++ * we only poll if we have room to store the receive or we need a send
++ * buffer. To ensure fairness, we poll the CQs round robin, remembering
++ * where we left off.
++ */
++static void ds_poll_cqs(struct rsocket *rs)
++{
++ struct ds_qp *qp;
++ struct ds_smsg *msg;
++ struct ibv_wc wc;
++ int ret, cnt;
++
++ qp = rs->qp_list;
++ do {
++ cnt = 0;
++ do {
++ ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
++ if (ret <= 0) {
++ qp = ds_next_qp(qp);
++ continue;
++ }
++
++ if (ds_wr_is_recv(wc.wr_id)) {
++ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
++ rs->rqe_avail--;
++ rs->dmsg[rs->rmsg_tail].qp = qp;
++ if (++rs->rmsg_tail == rs->rq_size + 1)
++ rs->rmsg_tail = 0;
++ } else {
++ ds_post_recv(rs, qp, qp->rbuf +
++ ds_wr_offset(wc.wr_id));
++ }
++ } else {
++ if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
++ msg = (struct ds_smsg *)
++ (rs->sbuf + ds_wr_offset(wc.wr_id));
++ msg->next = rs->smsg_free;
++ rs->smsg_free = msg;
++ }
++ rs->sqe_avail++;
++ }
++
++ qp = ds_next_qp(qp);
++ if (!rs->rqe_avail && rs->sqe_avail) {
++ rs->qp_list = qp;
++ return;
++ }
++ cnt++;
++ } while (qp != rs->qp_list);
++ } while (cnt);
++}
++
++static void ds_req_notify_cqs(struct rsocket *rs)
++{
++ struct ds_qp *qp;
++
++ qp = rs->qp_list;
++ do {
++ if (!qp->cq_armed) {
++ ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
++ qp->cq_armed = 1;
++ }
++ qp = ds_next_qp(qp);
++ } while (qp != rs->qp_list);
++}
++
++static int ds_get_cq_event(struct rsocket *rs)
++{
++ struct epoll_event event;
++ struct ds_qp *qp;
++ struct ibv_cq *cq;
++ void *context;
++ int ret;
++
++ if (!rs->cq_armed)
++ return 0;
++
++ ret = epoll_wait(rs->epfd, &event, 1, -1);
++ if (ret <= 0)
++ return ret;
++
++ qp = event.data.ptr;
++ ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
++ if (!ret) {
++ ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
++ qp->cq_armed = 0;
++ rs->cq_armed = 0;
++ }
++
++ return ret;
++}
++
++static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
++{
++ int ret = 0;
++
++ fastlock_acquire(&rs->cq_lock);
++ do {
++ ds_poll_cqs(rs);
++ if (test(rs)) {
++ ret = 0;
++ break;
++ } else if (nonblock) {
++ ret = ERR(EWOULDBLOCK);
++ } else if (!rs->cq_armed) {
++ ds_req_notify_cqs(rs);
++ rs->cq_armed = 1;
++ } else {
++ rs_update_credits(rs);
++ fastlock_acquire(&rs->cq_wait_lock);
++ fastlock_release(&rs->cq_lock);
++
++ ret = ds_get_cq_event(rs);
++ fastlock_release(&rs->cq_wait_lock);
++ fastlock_acquire(&rs->cq_lock);
++ }
++ } while (!ret);
++
++ fastlock_release(&rs->cq_lock);
++ return ret;
++}
++
++static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
++{
++ struct timeval s, e;
++ uint32_t poll_time = 0;
++ int ret;
++
++ do {
++ ret = ds_process_cqs(rs, 1, test);
++ if (!ret || nonblock || errno != EWOULDBLOCK)
++ return ret;
++
++ if (!poll_time)
++ gettimeofday(&s, NULL);
++
++ gettimeofday(&e, NULL);
++ poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
++ (e.tv_usec - s.tv_usec) + 1;
++ } while (poll_time <= polling_time);
++
++ ret = ds_process_cqs(rs, 0, test);
++ return ret;
++}
++
+ static int rs_nonblocking(struct rsocket *rs, int flags)
+ {
+ return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
+@@ -1218,9 +1957,14 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
+static int ds_can_send(struct rsocket *rs)
+{
-+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
++ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
+}
+
static int rs_conn_can_send(struct rsocket *rs)
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1235,7 +1979,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1338,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1338,7 +2082,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1390,14 +1896,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1390,14 +2134,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1446,10 +1952,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1446,10 +2190,84 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+{
+ struct ds_udp_header hdr;
+ struct msghdr msg;
-+ struct iovec miov[4];
++ struct iovec miov[8];
+ struct ds_qp *qp;
+
-+ if (iovcnt > 4)
++ if (iovcnt > 8)
+ return ERR(ENOTSUP);
+
-+ qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
+ hdr.tag = htonll(DS_UDP_TAG);
+ hdr.version = 1;
-+ if (qp) {
-+ hdr.sl = qp->sl;
-+ hdr.slid = htons(qp->lid);
-+ hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
-+ } else {
-+ hdr.sl = 0;
-+ hdr.slid = 0;
-+ hdr.qpn = 0;
-+ }
++ memset(&hdr->reserved, 0, sizeof hdr->reserved);
++ hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
+
+ miov[0].iov_base = &hdr;
+ miov[0].iov_len = sizeof hdr;
+ memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+ memset(&msg, 0, sizeof msg);
-+ /* TODO: specify name if needed */
+ msg.msg_iov = miov;
+ msg.msg_iovlen = iovcnt + 1;
+ return sendmsg(rs->fd, msg, flags);
+ struct ibv_sge sge;
+ int ret = 0;
+
-+ if (!rs->conn_dest || !rs->conn_dest->ah)
++ if (!rs->conn_dest->ah)
+ return ds_send_udp(rs, buf, len, flags);
+
-+ rs->sbytes_needed = len;
+ if (!ds_can_send(rs)) {
-+ ret = rs_get_comp(rs, 1, ds_can_send);
++ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
+ if (ret)
-+ return ds_send_udp(rs, buf, len, flags);
++ return ret;
+ }
+
+ if (len <= rs->sq_inline) {
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1463,6 +2053,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1463,6 +2281,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1484,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1484,7 +2309,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1537,10 +2134,21 @@ out:
+@@ -1537,10 +2362,26 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
+ return ERR(EISCONN);
-
-- return rsend(socket, buf, len, flags);
++
+ return rsend(socket, buf, len, flags);
+ }
-+
+
+- return rsend(socket, buf, len, flags);
+ fastlock_acquire(&rs->slock);
-+ ds_connect(rs, dest_addr, addrlen);
++ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
++ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
++ if (ret)
++ goto out;
++ }
+ ret = dsend(rs, buf, len, flags);
++out:
+ fastlock_release(&rs->slock);
+ return ret;
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1599,7 +2440,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1652,7 +2260,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1652,7 +2493,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1948,7 +2556,7 @@ int rshutdown(int socket, int how)
+@@ -1948,7 +2789,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1958,10 +2566,10 @@ int rshutdown(int socket, int how)
+@@ -1958,10 +2799,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -2017,8 +2625,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2017,8 +2858,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
+ rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
-+ return getpeername(rs->fs, addr, addrlen);
++ return getpeername(rs->udp_sock, addr, addrlen);
+ }
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2638,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +2871,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
+ rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+ return 0;
+ } else {
-+ return getsockname(rs->fd, addr, addrlen);
++ return getsockname(rs->udp_sock, addr, addrlen);
+ }
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2039,6 +2655,12 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,18 +2888,26 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-+ ret = setsockopt(rs->fd, optname, optval, optlen);
++ ret = setsockopt(rs->udp_sock, optname, optval, optlen);
+ if (ret)
+ return ret;
+ }
switch (level) {
case SOL_SOCKET:
opts = &rs->so_opts;
-@@ -2156,6 +2778,9 @@ int rgetsockopt(int socket, int level, int optname,
- int ret = 0;
-
- rs = idm_at(&idm, socket);
-+ if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
-+ return getsockopt(rs->fd, level, optname, optval, optlen);
-+
- switch (level) {
- case SOL_SOCKET:
switch (optname) {
-@@ -2314,7 +2939,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+ case SO_REUSEADDR:
+- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+- RDMA_OPTION_ID_REUSEADDR,
+- (void *) optval, optlen);
+- if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
+- rs->cm_id->context &&
+- (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
+- ret = 0;
++ if (rs->type == SOCK_STREAM) {
++ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
++ RDMA_OPTION_ID_REUSEADDR,
++ (void *) optval, optlen);
++ if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
++ rs->cm_id->context &&
++ (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
++ ret = 0;
++ }
+ opt_on = *(int *) optval;
+ break;
+ case SO_RCVBUF:
+@@ -2100,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
+ opts = &rs->ipv6_opts;
+ switch (optname) {
+ case IPV6_V6ONLY:
+- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
+- RDMA_OPTION_ID_AFONLY,
+- (void *) optval, optlen);
++ if (rs->type == SOCK_STREAM) {
++ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
++ RDMA_OPTION_ID_AFONLY,
++ (void *) optval, optlen);
++ }
+ opt_on = *(int *) optval;
+ break;
+ default:
+@@ -2314,7 +3173,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +2973,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2348,7 +3207,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2360,7 +2985,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2360,7 +3219,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2381,7 +3006,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2381,7 +3240,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2425,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2425,7 +3284,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2475,3 +3100,24 @@ out:
-
- return (ret && left == count) ? ret : count - left;
- }
-+
-+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
-+ struct sockaddr *src_addr, socklen_t *addrlen)
-+{
-+ int ret;
-+
-+ ret = rrecv(socket, buf, len, flags);
-+ if (ret > 0 && src_addr)
-+ rgetpeername(socket, src_addr, addrlen);
-+
-+ return ret;
-+}
-+
-+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
-+ const struct sockaddr *dest_addr, socklen_t addrlen)
-+{
-+ if (dest_addr || addrlen)
-+ return ERR(EISCONN);
-+
-+ return usend(socket, buf, len, flags);
-+}
+++ /dev/null
-Bottom: da0048097eea01b21df587e85b3f7ac44a2582c8
-Top: faabde8e748d27fc0f733e702ed7f3c3902d8f58
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-11-26 12:10:16 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/docs/rsocket b/docs/rsocket
-index 03d49df..a660208 100644
---- a/docs/rsocket
-+++ b/docs/rsocket
-@@ -194,7 +194,7 @@ application's buffer.
-
- Datagram Overview
- -----------------
--THe rsocket API supports datagram sockets. Datagram support is handled through an
-+The rsocket API supports datagram sockets. Datagram support is handled through an
- entirely different protocol and internal implementation. Unlike connected rsockets,
- datagram rsockets are not necessarily bound to a network (IP) address. A datagram
- socket may use any number of network (IP) addresses, including those which map to
-@@ -204,32 +204,19 @@ UDP socket, plus zero or more UD QPs.
-
- Rsockets uses headers inserted before user data sent over UDP sockets to resolve
- remote UD QP numbers. When a user first attempts to send a datagram to a remote
--address (IP and UDP port), rsockets will take the following steps. First, it
--will allocate and store the destination address into a lookup table for future
--use. Then it will resolve which local network address should be used when sending
--to the specified destination. It will allocate a UD QP on the RDMA device
--associated with the local address. Finally, rsockets will send the user's
--datagram to the remote UDP socket, but inserting a header before the datagram.
--The header specifies the UD QP number associated with the local network address
--(IP and UDP port) of the send.
--
--Under many circumstances, the rsocket UDP header also provides enough path
--information for the receiver to send the reply using a UD QP. However, certain
--fabric topologies may require contacting a subnet administrator. Whenever
--rsockets lacks sufficient information to send data to a remote peer using
--a UD QP, it will automatically fall back to using a standard UDP socket. This
--helps minimize the latency for performing a given send, while lenghty address
--and route resolution protocols complete.
--
--Currently, rsockets allocates a single UD QP, with an infrastructure is provided
--to support more. Future enhancements may add support for multiple UD QPs, each
--associated with a single network (IP) address. Multiplexing different network
--addresses over a single UD QP and using shared receive queues are other possible
--enhancements.
--
--Because rsockets may use multiple UD QPs, buffer management can become an issue.
--The total size of receive buffers is specified by the mem_default configuration
--files, but may be overridden with rsetsockopt. The buffer is divided into
--MTU sized messages and distributed among the allocated QPs. As new QPs are
--created, the receive buffers may be redistributed by posting loopback sends
--on a QP in order to free the receive buffers for reposting to a different QP.
-\ No newline at end of file
-+address (IP and UDP port), rsockets will take the following steps:
-+
-+1. Store the destination address into a lookup table.
-+2. Resolve which local network address should be used when sending
-+ to the specified destination.
-+3. Allocate a UD QP on the RDMA device associated with the local address.
-+4. Send the user's datagram to the remote UDP socket.
-+
-+A header is inserted before the user's datagram. The header specifies the
-+UD QP number associated with the local network address (IP and UDP port) of
-+the send.
-+
-+A service thread is used to process messages received on the UDP socket. This
-+thread updates the rsocket lookup tables with the remote QPN and path record
-+data. The service thread forwards data received on the UDP socket to an
-+rsocket QP.
-\ No newline at end of file
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 0695d12..a81b8f3 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -46,6 +46,7 @@
- #include <string.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
-+#include <sys/epoll.h>
-
- #include <rdma/rdma_cma.h>
- #include <rdma/rdma_verbs.h>
-@@ -116,6 +117,14 @@ enum {
- #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
- #define rs_msg_op(imm_data) (imm_data >> 29)
- #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-+#define RS_RECV_WR_ID (~((uint64_t) 0))
-+
-+#define DS_WR_RECV 0xFFFFFFFF
-+#define ds_send_wr_id(offset, length) (((uint64_t) (offset)) << 32 | (uint64_t) length)
-+#define ds_recv_wr_id(offset) (((uint64_t) (offset)) << 32 | (uint64_t) DS_WR_RECV)
-+#define ds_wr_offset(wr_id) ((uint32_t) (wr_id >> 32))
-+#define ds_wr_length(wr_id) ((uint32_t) wr_id)
-+#define ds_wr_is_recv(wr_id) (ds_wr_length(wr_id) == DS_WR_RECV)
-
- enum {
- RS_CTRL_DISCONNECT,
-@@ -129,8 +138,14 @@ struct rs_msg {
-
- struct ds_qp;
-
--struct ds_msg {
-- struct ds_qp *qp;
-+struct ds_rmsg {
-+ struct ds_qp *qp;
-+ uint32_t offset;
-+ uint32_t length;
-+};
-+
-+struct ds_smsg {
-+ struct ds_smsg *next;
- };
-
- struct rs_sge {
-@@ -167,8 +182,6 @@ struct rs_conn_data {
- struct rs_sge data_buf;
- };
-
--#define RS_RECV_WR_ID (~((uint64_t) 0))
--
- /*
- * rsocket states are ordered as passive, connecting, connected, disconnected.
- */
-@@ -203,12 +216,11 @@ struct ds_qp {
- struct rsocket *rs;
- struct rdma_cm_id *cm_id;
-
-+ struct ibv_mr *smr;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
-
-- struct ibv_mr *smr;
-- uint16_t lid;
-- uint8_t sl;
-+ int cq_armed;
- };
-
- struct ds_dest {
-@@ -231,6 +243,13 @@ struct rsocket {
- /* data stream */
- struct {
- struct rdma_cm_id *cm_id;
-+ uint64_t tcp_opts;
-+
-+ int ctrl_avail;
-+ uint16_t sseq_no;
-+ uint16_t sseq_comp;
-+ uint16_t rseq_no;
-+ uint16_t rseq_comp;
-
- int remote_sge;
- struct rs_sge remote_sgl;
-@@ -242,63 +261,58 @@ struct rsocket {
- void *target_buffer_list;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
-+
-+ int rbuf_bytes_avail;
-+ int rbuf_free_offset;
-+ int rbuf_offset;
-+ struct ibv_mr *rmr;
-+ uint8_t *rbuf;
-+
-+ int sbuf_bytes_avail;
-+ struct ibv_mr *smr;
-+ struct ibv_sge ssgl[2];
- };
- /* datagram */
- struct {
-- dlist_t qp_list;
-+ struct ds_qp *qp_list;
- void *dest_map;
- struct ds_dest *conn_dest;
-- struct pollfd *fds;
-- nfds_t nfds;
-- int dsock;
-- int sbytes_needed;
-+
-+ int udp_sock;
-+ int epfd;
-+ int rqe_avail;
-+ struct ds_smsg *smsg_free;
- };
- };
-
- int opts;
- long fd_flags;
- uint64_t so_opts;
-- uint64_t tcp_opts;
- uint64_t ipv6_opts;
- int state;
- int cq_armed;
- int retries;
- int err;
-
-- int ctrl_avail;
- int sqe_avail;
-- int sbuf_bytes_avail;
-- uint16_t sseq_no;
-- uint16_t sseq_comp;
-+ uint32_t sbuf_size;
- uint16_t sq_size;
- uint16_t sq_inline;
-
-+ uint32_t rbuf_size;
- uint16_t rq_size;
-- uint16_t rseq_no;
-- uint16_t rseq_comp;
-- int rbuf_bytes_avail;
-- int rbuf_free_offset;
-- int rbuf_offset;
- int rmsg_head;
- int rmsg_tail;
- union {
- struct rs_msg *rmsg;
-- struct ds_msg *dmsg;
-+ struct ds_rmsg *dmsg;
- };
-
-+ uint8_t *sbuf;
- struct rs_iomap_mr *remote_iomappings;
- dlist_entry iomap_list;
- dlist_entry iomap_queue;
- int iomap_pending;
--
-- uint32_t rbuf_size;
-- struct ibv_mr *rmr;
-- uint8_t *rbuf;
--
-- uint32_t sbuf_size;
-- struct ibv_mr *smr;
-- struct ibv_sge ssgl[2];
-- uint8_t *sbuf;
- };
-
- #define DS_UDP_TAG 0x5555555555555555ULL
-@@ -306,12 +320,32 @@ struct rsocket {
- struct ds_udp_header {
- uint64_t tag;
- uint8_t version;
-- uint8_t sl;
-- uint16_t slid;
-+ uint8_t reserved[3];
- uint32_t qpn; /* upper 8-bits reserved */
- };
-
-
-+#define ds_next_qp(qp) container_of((qp)->list.next, struct ds_qp, list)
-+
-+static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp)
-+{
-+ if (!rs->qp_list)
-+ list_init(&qp->list);
-+ else
-+ list_insert_head(&qp->list, &rs->qp_list->list);
-+ rs->qp_list = *qp;
-+}
-+
-+static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp)
-+{
-+ if (qp->list.next != qp->list) {
-+ rs->qp_list = ds_next_qp(qp);
-+ dlist_remove(&qp->list);
-+ } else {
-+ rs->qp_list = NULL;
-+ }
-+}
-+
- static int rs_svc_run(void *arg)
- {
- return 0;
-@@ -468,7 +502,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
-
- rs->type = type;
- rs->index = -1;
-- rs->dsock = -1;
-+ rs->udp_sock = -1;
-+ rs->epfd = -1;
-+
- if (inherited_rs) {
- rs->sbuf_size = inherited_rs->sbuf_size;
- rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -496,18 +532,29 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
- return rs;
- }
-
--/*
-- * TODO: Support datagram rsockets
-- */
- static int rs_set_nonblocking(struct rsocket *rs, long arg)
- {
-+ struct ds_qp *qp;
- int ret = 0;
-
-- if (rs->cm_id->recv_cq_channel)
-- ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
-+ if (rs->type == SOCK_STREAM) {
-+ if (rs->cm_id->recv_cq_channel)
-+ ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
-
-- if (!ret && rs->state < rs_connected)
-- ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
-+ if (!ret && rs->state < rs_connected)
-+ ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
-+ } else {
-+ ret = fcntl(rs->epfd, F_SETFL, arg);
-+
-+ if (!ret && rs->qp_list) {
-+ qp = rs->qp_list;
-+ do {
-+ ret = fcntl(qp->cm_id->recv_cq_channel->fd,
-+ F_SETFL, arg);
-+ qp = ds_next_qp(qp);
-+ } while (qp != rs->qp_list && !ret);
-+ }
-+ }
-
- return ret;
- }
-@@ -531,17 +578,39 @@ static void rs_set_qp_size(struct rsocket *rs)
- rs->rq_size = 2;
- }
-
-+static void ds_set_qp_size(struct rsocket *rs)
-+{
-+ uint16_t max_size;
-+
-+ max_size = min(ucma_max_qpsize(NULL), RS_QP_MAX_SIZE);
-+
-+ if (rs->sq_size > max_size)
-+ rs->sq_size = max_size;
-+ if (rs->rq_size > max_size)
-+ rs->rq_size = max_size;
-+
-+ if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-+ rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-+ else
-+ rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
-+
-+ if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT))
-+ rs->sq_size = rs->sbuf_size / RS_SNDLOWAT;
-+ else
-+ rs->sbuf_size = rs->sq_size * RS_SNDLOWAT;
-+}
-+
- static int rs_init_bufs(struct rsocket *rs)
- {
- size_t len;
-
- rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
- if (!rs->rmsg)
-- return -1;
-+ return ERR(ENOEMEM);
-
- rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
- if (!rs->sbuf)
-- return -1;
-+ return ERR(ENOEMEM);
-
- rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
- if (!rs->smr)
-@@ -551,7 +620,7 @@ static int rs_init_bufs(struct rsocket *rs)
- sizeof(*rs->target_iomap) * rs->target_iomap_size;
- rs->target_buffer_list = malloc(len);
- if (!rs->target_buffer_list)
-- return -1;
-+ return ERR(ENOEMEM);
-
- rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
- if (!rs->target_mr)
-@@ -564,7 +633,7 @@ static int rs_init_bufs(struct rsocket *rs)
-
- rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
- if (!rs->rbuf)
-- return -1;
-+ return ERR(ENOEMEM);
-
- rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
- if (!rs->rmr)
-@@ -648,7 +717,7 @@ static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
- sge.length = RS_SNDLOWAT;
- sge.lkey = qp->rmr;
-
-- wr.wr_id = RS_RECV_WR_ID;
-+ wr.wr_id = ds_recv_wr_id((uint32_t) (buf - rs->rbuf));
- wr.next = NULL;
- wr.sg_list = &sge;
- wr.num_sge = 1;
-@@ -736,7 +805,8 @@ static void ds_free_qp(struct ds_qp *qp)
-
- if (qp->cm_id) {
- if (qp->cm_id->qp) {
-- dlist_remove(&qp->list);
-+ epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL,
-+ qp->cm_id->recv_cq_channel->fd, NULL);
- rdma_destroy_qp(qp->cm_id);
- }
- rdma_destroy_id(qp->cm_id);
-@@ -744,31 +814,32 @@ static void ds_free_qp(struct ds_qp *qp)
- free(qp);
- }
-
--static void ds_free_qps(struct rsocket *rs)
-+static void ds_free(struct rsocket *rs)
- {
- struct ds_qp *qp;
-- dlist_t *entry;
-
-- while (!dlist_empty(&rs->qp_list)) {
-- qp = container_of(rs->qp_list.next, struct ds_qp, list);
-- ds_free_qp(qp);
-- }
--}
--
--static void ds_free(struct rsocket *rs)
--{
- if (rs->state & (rs_readable | rs_writable))
- rs_svc_remove(rs);
-
-- if (rs->dsock >= 0)
-- close(rs->dsock);
-+ if (rs->udp_sock >= 0)
-+ close(rs->udp_sock);
-
- if (rs->index >= 0)
- rs_remove(rs);
-
-- ds_free_qps(rs);
-- if (rs->fds)
-- free(rs->fds);
-+ if (rs->dmsg)
-+ free(rs->dmsg);
-+
-+ if (rs->smsg)
-+ free(rs->smsg);
-+
-+ while (rs->qp_list) {
-+ ds_remove_qp(rs, rs->qp_list);
-+ ds_free_qp(rs->qp_list);
-+ }
-+
-+ if (rs->epfd >= 0)
-+ close(rs->epfd);
-
- if (rs->sbuf)
- free(rs->sbuf);
-@@ -875,15 +946,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
-
- static int ds_init(struct rsocket *rs, int domain)
- {
-- rs->dsock = socket(domain, SOCK_DGRAM, 0);
-- if (rs->dsock < 0)
-- return rs->dsock;
-+ rs->udp_sock = socket(domain, SOCK_DGRAM, 0);
-+ if (rs->udp_sock < 0)
-+ return rs->udp_sock;
-
-- rs->fds = calloc(1, sizeof *fds);
-- if (!rs->fds)
-- return ERR(ENOMEM);
-- rs->nfds = 1;
-- dlist_init(&rs->qp_list);
-+ rs->epfd = epoll_create(0);
-+ if (rs->epfd < 0)
-+ return rs->epfd;
-
- return 0;
- }
-@@ -916,7 +985,7 @@ int rsocket(int domain, int type, int protocol)
- if (ret)
- goto err;
-
-- index = rs->dsock;
-+ index = rs->udp_sock;
- }
-
- ret = rs_insert(rs, index);
-@@ -941,7 +1010,7 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
- if (!ret)
- rs->state = rs_bound;
- } else {
-- ret = bind(rs->dsock, addr, addrlen);
-+ ret = bind(rs->udp_sock, addr, addrlen);
- if (!ret) {
- ret = rs_svc_insert(rs);
- if (!ret)
-@@ -1130,21 +1199,30 @@ connected:
-
- static int ds_init_ep(struct rsocket *rs)
- {
-- int ret;
-+ struct ds_smsg *msg;
-+ int i, ret;
-
-- rs_set_qp_size(rs);
-- if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
-- rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
-- else
-- rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
-+ ds_set_qp_size(rs);
-
-- rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
-+ rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT);
- if (!rs->sbuf)
- return ERR(ENOMEM);
-
-- rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
-+ rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg));
-+ if (!rs->dmsg)
-+ return ERR(ENOEMEM);
-+
- rs->sbuf_bytes_avail = rs->sbuf_size;
- rs->sqe_avail = rs->sq_size;
-+ rs->rqe_avail = rs->rq_size;
-+
-+ rs->smsg_free = (struct ds_smsg *) rs->sbuf;
-+ msg = rs->smsg_free;
-+ for (i = 0; i < rs->sq_size - 1; i++) {
-+ msg->next = (void *) msg + i * RS_SNDLOWAT;
-+ msg = msg->next;
-+ }
-+ msg->next = NULL;
-
- ret = rs_svc_insert(rs);
- if (ret)
-@@ -1186,7 +1264,7 @@ static int ds_get_src_addr(struct rsocket *rs,
- uint16_t port;
-
- *src_len = sizeof src_addr;
-- ret = getsockname(rs->dsock, &src_addr->sa, src_len);
-+ ret = getsockname(rs->udp_sock, &src_addr->sa, src_len);
- if (ret || !rs_any_addr(src_addr))
- return ret;
-
-@@ -1211,6 +1289,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- socklen_t addrlen, struct ds_qp **qp)
- {
- struct ibv_qp_init_attr qp_attr;
-+ struct epoll_event event;
- int ret;
-
- *qp = calloc(1, sizeof(struct ds_qp));
-@@ -1245,17 +1324,24 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- qp_attr.cap.max_send_sge = 2;
- qp_attr.cap.max_recv_sge = 1;
- qp_attr.cap.max_inline_data = rs->sq_inline;
--
- ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
- if (ret)
-- return ret;
-+ goto err;
-+
-+ event.events = EPOLLIN | EPOLLOUT;
-+ event.data.ptr = *qp;
-+ ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD,
-+ (*qp)->cm_id->recv_cq_channel->fd, &event);
-+ if (ret)
-+ goto err;
-
- for (i = 0; i < rs->rq_size; i++) {
- ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
- if (ret)
- goto err;
- }
-- list_insert_head(&(*qp)->list, &rs->qp_list);
-+
-+ ds_insert_qp(rs, *qp);
- return 0;
- err:
- ds_free_qp(*qp);
-@@ -1265,12 +1351,14 @@ err:
- static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
- socklen_t addrlen, struct ds_qp **qp)
- {
-- dlist_t *entry;
-+ if (rs->qp_list) {
-+ *qp = rs->qp_list;
-+ do {
-+ if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-+ return 0;
-
-- for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
-- *qp = container_of(entry, struct ds_qp, list);
-- if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
-- return 0;
-+ *qp = ds_next_qp(*qp);
-+ } while (*qp != rs->qp_list);
- }
-
- return ds_create_qp(rs, src_addr, addrlen, qp);
-@@ -1328,7 +1416,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- ret = rs_do_connect(rs);
- } else {
- fastlock_acquire(&rs->slock);
-- ret = connect(rs->dsock, addr, addrlen);
-+ ret = connect(rs->udp_sock, addr, addrlen);
- if (!ret)
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
- fastlock_release(&rs->slock);
-@@ -1428,9 +1516,12 @@ static int ds_send_data(struct rsocket *rs,
- struct ibv_sge *sgl, int nsge,
- uint32_t length, int flags)
- {
-+ uint64_t offset;
-+
- rs->sqe_avail--;
- rs->sbuf_bytes_avail -= length;
-- return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
-+ offset = sgl->addr - (uintptr_t) rs->sbuf;
-+ return ds_post_send(rs, sgl, nsge, ds_send_wr_id(offset, length), flags);
- }
-
- static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-@@ -1688,6 +1779,153 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
- return ret;
- }
-
-+/*
-+ * Poll all CQs associated with a datagram rsocket. We need to drop any
-+ * received messages that we do not have room to store. To limit drops,
-+ * we only poll if we have room to store the receive or we need a send
-+ * buffer. To ensure fairness, we poll the CQs round robin, remembering
-+ * where we left off.
-+ */
-+static void ds_poll_cqs(struct rsocket *rs)
-+{
-+ struct ds_qp *qp;
-+ struct ds_smsg *msg;
-+ struct ibv_wc wc;
-+ int ret, cnt;
-+
-+ qp = rs->qp_list;
-+ do {
-+ cnt = 0;
-+ do {
-+ ret = ibv_poll_cq(qp->cm_id->recv_cq, 1, &wc);
-+ if (ret <= 0) {
-+ qp = ds_next_qp(qp);
-+ continue;
-+ }
-+
-+ if (ds_wr_is_recv(wc.wr_id)) {
-+ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS) {
-+ rs->rqe_avail--;
-+ rs->dmsg[rs->rmsg_tail].qp = qp;
-+ if (++rs->rmsg_tail == rs->rq_size + 1)
-+ rs->rmsg_tail = 0;
-+ } else {
-+ ds_post_recv(rs, qp, qp->rbuf +
-+ ds_wr_offset(wc.wr_id));
-+ }
-+ } else {
-+ if (ds_wr_length(wc.wr_id) > rs->sq_inline) {
-+ msg = (struct ds_smsg *)
-+ (rs->sbuf + ds_wr_offset(wc.wr_id));
-+ msg->next = rs->smsg_free;
-+ rs->smsg_free = msg;
-+ }
-+ rs->sqe_avail++;
-+ }
-+
-+ qp = ds_next_qp(qp);
-+ if (!rs->rqe_avail && rs->sqe_avail) {
-+ rs->qp_list = qp;
-+ return;
-+ }
-+ cnt++;
-+ } while (qp != rs->qp_list);
-+ } while (cnt);
-+}
-+
-+static void ds_req_notify_cqs(struct rsocket *rs)
-+{
-+ struct ds_qp *qp;
-+
-+ qp = rs->qp_list;
-+ do {
-+ if (!qp->cq_armed) {
-+ ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
-+ qp->cq_armed = 1;
-+ }
-+ qp = ds_next_qp(qp);
-+ } while (qp != rs->qp_list);
-+}
-+
-+static int ds_get_cq_event(struct rsocket *rs)
-+{
-+ struct epoll_event event;
-+ struct ds_qp *qp;
-+ struct ibv_cq *cq;
-+ void *context;
-+ int ret;
-+
-+ if (!rs->cq_armed)
-+ return 0;
-+
-+ ret = epoll_wait(rs->epfd, &event, 1, -1);
-+ if (ret <= 0)
-+ return ret;
-+
-+ qp = event.data.ptr;
-+ ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
-+ if (!ret) {
-+ ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
-+ qp->cq_armed = 0;
-+ rs->cq_armed = 0;
-+ }
-+
-+ return ret;
-+}
-+
-+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
-+{
-+ int ret = 0;
-+
-+ fastlock_acquire(&rs->cq_lock);
-+ do {
-+ ds_poll_cqs(rs);
-+ if (test(rs)) {
-+ ret = 0;
-+ break;
-+ } else if (nonblock) {
-+ ret = ERR(EWOULDBLOCK);
-+ } else if (!rs->cq_armed) {
-+ ds_req_notify_cqs(rs);
-+ rs->cq_armed = 1;
-+ } else {
-+ rs_update_credits(rs);
-+ fastlock_acquire(&rs->cq_wait_lock);
-+ fastlock_release(&rs->cq_lock);
-+
-+ ret = ds_get_cq_event(rs);
-+ fastlock_release(&rs->cq_wait_lock);
-+ fastlock_acquire(&rs->cq_lock);
-+ }
-+ } while (!ret);
-+
-+ fastlock_release(&rs->cq_lock);
-+ return ret;
-+}
-+
-+static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
-+{
-+ struct timeval s, e;
-+ uint32_t poll_time = 0;
-+ int ret;
-+
-+ do {
-+ ret = ds_process_cqs(rs, 1, test);
-+ if (!ret || nonblock || errno != EWOULDBLOCK)
-+ return ret;
-+
-+ if (!poll_time)
-+ gettimeofday(&s, NULL);
-+
-+ gettimeofday(&e, NULL);
-+ poll_time = (e.tv_sec - s.tv_sec) * 1000000 +
-+ (e.tv_usec - s.tv_usec) + 1;
-+ } while (poll_time <= polling_time);
-+
-+ ret = ds_process_cqs(rs, 0, test);
-+ return ret;
-+}
-+
- static int rs_nonblocking(struct rsocket *rs, int flags)
- {
- return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1721,7 +1959,7 @@ static int rs_can_send(struct rsocket *rs)
-
- static int ds_can_send(struct rsocket *rs)
- {
-- return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
-+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT);
- }
-
- static int rs_conn_can_send(struct rsocket *rs)
-@@ -1961,31 +2199,22 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- {
- struct ds_udp_header hdr;
- struct msghdr msg;
-- struct iovec miov[4];
-+ struct iovec miov[8];
- struct ds_qp *qp;
-
-- if (iovcnt > 4)
-+ if (iovcnt > 8)
- return ERR(ENOTSUP);
-
-- qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
- hdr.tag = htonll(DS_UDP_TAG);
- hdr.version = 1;
-- if (qp) {
-- hdr.sl = qp->sl;
-- hdr.slid = htons(qp->lid);
-- hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
-- } else {
-- hdr.sl = 0;
-- hdr.slid = 0;
-- hdr.qpn = 0;
-- }
-+ memset(&hdr->reserved, 0, sizeof hdr->reserved);
-+ hdr.qpn = htonl(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF);
-
- miov[0].iov_base = &hdr;
- miov[0].iov_len = sizeof hdr;
- memcpy(&miov[1], iov, sizeof *iov * iovcnt);
-
- memset(&msg, 0, sizeof msg);
-- /* TODO: specify name if needed */
- msg.msg_iov = miov;
- msg.msg_iovlen = iovcnt + 1;
- return sendmsg(rs->fd, msg, flags);
-@@ -2004,14 +2233,13 @@ static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
- struct ibv_sge sge;
- int ret = 0;
-
-- if (!rs->conn_dest || !rs->conn_dest->ah)
-+ if (!rs->conn_dest->ah)
- return ds_send_udp(rs, buf, len, flags);
-
-- rs->sbytes_needed = len;
- if (!ds_can_send(rs)) {
-- ret = rs_get_comp(rs, 1, ds_can_send);
-+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
- if (ret)
-- return ds_send_udp(rs, buf, len, flags);
-+ return ret;
- }
-
- if (len <= rs->sq_inline) {
-@@ -2145,8 +2373,13 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- }
-
- fastlock_acquire(&rs->slock);
-- ds_connect(rs, dest_addr, addrlen);
-+ if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) {
-+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-+ if (ret)
-+ goto out;
-+ }
- ret = dsend(rs, buf, len, flags);
-+out:
- fastlock_release(&rs->slock);
- return ret;
- }
-@@ -2629,7 +2862,7 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
- rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
- return 0;
- } else {
-- return getpeername(rs->fs, addr, addrlen);
-+ return getpeername(rs->udp_sock, addr, addrlen);
- }
- }
-
-@@ -2642,7 +2875,7 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
- rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
- return 0;
- } else {
-- return getsockname(rs->fd, addr, addrlen);
-+ return getsockname(rs->udp_sock, addr, addrlen);
- }
- }
-
-@@ -2656,7 +2889,7 @@ int rsetsockopt(int socket, int level, int optname,
- ret = ERR(ENOTSUP);
- rs = idm_at(&idm, socket);
- if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-- ret = setsockopt(rs->fd, optname, optval, optlen);
-+ ret = setsockopt(rs->udp_sock, optname, optval, optlen);
- if (ret)
- return ret;
- }
-@@ -2666,13 +2899,15 @@ int rsetsockopt(int socket, int level, int optname,
- opts = &rs->so_opts;
- switch (optname) {
- case SO_REUSEADDR:
-- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-- RDMA_OPTION_ID_REUSEADDR,
-- (void *) optval, optlen);
-- if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
-- rs->cm_id->context &&
-- (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
-- ret = 0;
-+ if (rs->type == SOCK_STREAM) {
-+ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-+ RDMA_OPTION_ID_REUSEADDR,
-+ (void *) optval, optlen);
-+ if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) &&
-+ rs->cm_id->context &&
-+ (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB))))
-+ ret = 0;
-+ }
- opt_on = *(int *) optval;
- break;
- case SO_RCVBUF:
-@@ -2722,9 +2957,11 @@ int rsetsockopt(int socket, int level, int optname,
- opts = &rs->ipv6_opts;
- switch (optname) {
- case IPV6_V6ONLY:
-- ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-- RDMA_OPTION_ID_AFONLY,
-- (void *) optval, optlen);
-+ if (rs->type == SOCK_STREAM) {
-+ ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID,
-+ RDMA_OPTION_ID_AFONLY,
-+ (void *) optval, optlen);
-+ }
- opt_on = *(int *) optval;
- break;
- default:
-@@ -2778,9 +3015,6 @@ int rgetsockopt(int socket, int level, int optname,
- int ret = 0;
-
- rs = idm_at(&idm, socket);
-- if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
-- return getsockopt(rs->fd, level, optname, optval, optlen);
--
- switch (level) {
- case SOL_SOCKET:
- switch (optname) {
-@@ -3100,24 +3334,3 @@ out:
-
- return (ret && left == count) ? ret : count - left;
- }
--
--ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
-- struct sockaddr *src_addr, socklen_t *addrlen)
--{
-- int ret;
--
-- ret = rrecv(socket, buf, len, flags);
-- if (ret > 0 && src_addr)
-- rgetpeername(socket, src_addr, addrlen);
--
-- return ret;
--}
--
--ssize_t usendto(int socket, const void *buf, size_t len, int flags,
-- const struct sockaddr *dest_addr, socklen_t addrlen)
--{
-- if (dest_addr || addrlen)
-- return ERR(EISCONN);
--
-- return usend(socket, buf, len, flags);
--}