]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add datagram support
authorSean Hefty <sean.hefty@intel.com>
Fri, 9 Nov 2012 18:26:38 +0000 (10:26 -0800)
committerSean Hefty <sean.hefty@intel.com>
Fri, 9 Nov 2012 18:26:38 +0000 (10:26 -0800)
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
docs/rsocket
src/cma.c
src/rsocket.c

index 1484f65b7b5a8ee67e8a7d3b551363d924d64f95..03d49df7824fca62f5a670f5df5870df25e298a4 100644 (file)
@@ -1,7 +1,7 @@
-rsocket Protocol and Design Guide               9/10/2012
+rsocket Protocol and Design Guide               11/11/2012
 
-Overview
---------
+Data Streaming (TCP) Overview
+-----------------------------
 Rsockets is a protocol over RDMA that supports a socket-level API
 for applications.  For details on the current state of the
 implementation, readers should refer to the rsocket man page.  This
@@ -189,3 +189,47 @@ registered remote data buffer.
 From host A's perspective, the transfer appears as a normal send/write
 operation, with the data stream redirected directly into the receiving
 application's buffer.
+
+
+
+Datagram Overview
+-----------------
+THe rsocket API supports datagram sockets.  Datagram support is handled through an
+entirely different protocol and internal implementation.  Unlike connected rsockets,
+datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
+socket may use any number of network (IP) addresses, including those which map to
+different RDMA devices.  As a result, a single datagram rsocket must support
+using multiple RDMA devices and ports, and a datagram rsocket references a single
+UDP socket, plus zero or more UD QPs.
+
+Rsockets uses headers inserted before user data sent over UDP sockets to resolve
+remote UD QP numbers.  When a user first attempts to send a datagram to a remote
+address (IP and UDP port), rsockets will take the following steps.  First, it
+will allocate and store the destination address into a lookup table for future
+use.  Then it will resolve which local network address should be used when sending
+to the specified destination.  It will allocate a UD QP on the RDMA device
+associated with the local address.  Finally, rsockets will send the user's
+datagram to the remote UDP socket, but inserting a header before the datagram.
+The header specifies the UD QP number associated with the local network address
+(IP and UDP port) of the send.
+
+Under many circumstances, the rsocket UDP header also provides enough path
+information for the receiver to send the reply using a UD QP.  However, certain
+fabric topologies may require contacting a subnet administrator.  Whenever
+rsockets lacks sufficient information to send data to a remote peer using
+a UD QP, it will automatically fall back to using a standard UDP socket.  This
+helps minimize the latency for performing a given send, while lenghty address
+and route resolution protocols complete.
+
+Currently, rsockets allocates a single UD QP, with an infrastructure is provided
+to support more.  Future enhancements may add support for multiple UD QPs, each
+associated with a single network (IP) address.  Multiplexing different network
+addresses over a single UD QP and using shared receive queues are other possible
+enhancements.
+
+Because rsockets may use multiple UD QPs, buffer management can become an issue.
+The total size of receive buffers is specified by the mem_default configuration
+files, but may be overridden with rsetsockopt.  The buffer is divided into
+MTU sized messages and distributed among the allocated QPs.  As new QPs are
+created, the receive buffers may be redistributed by posting loopback sends
+on a QP in order to free the receive buffers for reposting to a different QP.  
\ No newline at end of file
index 91bf108405c58f9221221858f81db3a13e9cc154..2c6b0320ccec0b7c46519ce0177cc2b0b14a384d 100755 (executable)
--- a/src/cma.c
+++ b/src/cma.c
@@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
 int ucma_max_qpsize(struct rdma_cm_id *id)
 {
        struct cma_id_private *id_priv;
+       int i, max_size = 0;
 
        id_priv = container_of(id, struct cma_id_private, id);
-       return id_priv->cma_dev->max_qpsize;
+       if (id && id_priv->cma_dev) {
+               max_size = id_priv->cma_dev->max_qpsize;
+       } else {
+               for (i = 0; i < cma_dev_cnt; i++) {
+                       if (!max_size || max_size > cma_dev_array[i].max_qpsize)
+                               max_size = cma_dev_array[i].max_qpsize;
+               }
+       }
+       return max_size;
 }
 
 uint16_t ucma_get_port(struct sockaddr *addr)
index 58fcb8e583234b87e3e992375bee14bb4595a557..0695d12d72304d465b4e26ea03f8895d42a9290b 100644 (file)
@@ -55,7 +55,7 @@
 
 #define RS_OLAP_START_SIZE 2048
 #define RS_MAX_TRANSFER 65536
-#define RS_SNDLOWAT 64
+#define RS_SNDLOWAT 2048
 #define RS_QP_MAX_SIZE 0xFFFE
 #define RS_QP_CTRL_SIZE 4
 #define RS_CONN_RETRIES 6
 static struct index_map idm;
 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
 
+enum {
+       RS_SVC_INSERT,
+       RS_SVC_REMOVE
+};
+
+struct rsocket;
+
+struct rs_svc_msg {
+       uint32_t op;
+       uint32_t status;
+       struct rsocket *rs;
+};
+
+static pthread_t svc_id;
+static int svc_cnt;
+static int svc_fds[2];
+
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
 static uint16_t def_sqsize = 384;
@@ -110,6 +127,12 @@ struct rs_msg {
        uint32_t data;
 };
 
+struct ds_qp;
+
+struct ds_msg {
+       struct ds_qp *qp;
+};
+
 struct rs_sge {
        uint64_t addr;
        uint32_t key;
@@ -159,9 +182,9 @@ enum rs_state {
        rs_connecting      = rs_opening |   0x0040,
        rs_accepting       = rs_opening |   0x0080,
        rs_connected       =                0x0100,
-       rs_connect_wr      =                0x0200,
-       rs_connect_rd      =                0x0400,
-       rs_connect_rdwr    = rs_connected | rs_connect_rd | rs_connect_wr,
+       rs_writable        =                0x0200,
+       rs_readable        =                0x0400,
+       rs_connect_rdwr    = rs_connected | rs_readable | rs_writable,
        rs_connect_error   =                0x0800,
        rs_disconnected    =                0x1000,
        rs_error           =                0x2000,
@@ -169,13 +192,68 @@ enum rs_state {
 
 #define RS_OPT_SWAP_SGL 1
 
-struct rsocket {
+union socket_addr {
+       struct sockaddr         sa;
+       struct sockaddr_in      sin;
+       struct sockaddr_in6     sin6;
+};
+
+struct ds_qp {
+       dlist_t           list;
+       struct rsocket    *rs;
        struct rdma_cm_id *cm_id;
+
+       struct ibv_mr     *rmr;
+       uint8_t           *rbuf;
+
+       struct ibv_mr     *smr;
+       uint16_t          lid;
+       uint8_t           sl;
+};
+
+struct ds_dest {
+       union socket_addr addr; /* must be first */
+       struct ds_qp      *qp;
+       struct ibv_ah     *ah;
+       uint32_t           qpn;
+};
+
+struct rsocket {
+       int               type;
+       int               index;
        fastlock_t        slock;
        fastlock_t        rlock;
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
-       fastlock_t        iomap_lock;
+       fastlock_t        map_lock; /* acquire slock first if needed */
+
+       union {
+               /* data stream */
+               struct {
+                       struct rdma_cm_id *cm_id;
+
+                       int               remote_sge;
+                       struct rs_sge     remote_sgl;
+                       struct rs_sge     remote_iomap;
+
+                       struct ibv_mr     *target_mr;
+                       int               target_sge;
+                       int               target_iomap_size;
+                       void              *target_buffer_list;
+                       volatile struct rs_sge    *target_sgl;
+                       struct rs_iomap   *target_iomap;
+               };
+               /* datagram */
+               struct {
+                       dlist_t           qp_list;
+                       void              *dest_map;
+                       struct ds_dest    *conn_dest;
+                       struct pollfd     *fds;
+                       nfds_t            nfds;
+                       int               dsock;
+                       int               sbytes_needed;
+               };
+       };
 
        int               opts;
        long              fd_flags;
@@ -186,7 +264,7 @@ struct rsocket {
        int               cq_armed;
        int               retries;
        int               err;
-       int               index;
+
        int               ctrl_avail;
        int               sqe_avail;
        int               sbuf_bytes_avail;
@@ -203,34 +281,93 @@ struct rsocket {
        int               rbuf_offset;
        int               rmsg_head;
        int               rmsg_tail;
-       struct rs_msg     *rmsg;
-
-       int               remote_sge;
-       struct rs_sge     remote_sgl;
-       struct rs_sge     remote_iomap;
+       union {
+               struct rs_msg     *rmsg;
+               struct ds_msg     *dmsg;
+       };
 
        struct rs_iomap_mr *remote_iomappings;
        dlist_entry       iomap_list;
        dlist_entry       iomap_queue;
        int               iomap_pending;
 
-       struct ibv_mr    *target_mr;
-       int               target_sge;
-       int               target_iomap_size;
-       void             *target_buffer_list;
-       volatile struct rs_sge    *target_sgl;
-       struct rs_iomap  *target_iomap;
-
        uint32_t          rbuf_size;
-       struct ibv_mr    *rmr;
+       struct ibv_mr     *rmr;
        uint8_t           *rbuf;
 
        uint32_t          sbuf_size;
-       struct ibv_mr    *smr;
+       struct ibv_mr     *smr;
        struct ibv_sge    ssgl[2];
        uint8_t           *sbuf;
 };
 
+#define DS_UDP_TAG 0x5555555555555555ULL
+
+struct ds_udp_header {
+       uint64_t          tag;
+       uint8_t           version;
+       uint8_t           sl;
+       uint16_t          slid;
+       uint32_t          qpn;  /* upper 8-bits reserved */
+};
+
+
+static int rs_svc_run(void *arg)
+{
+       return 0;
+}
+
+static int rs_svc_insert(struct rsocket *rs)
+{
+       struct rs_svc_msg msg;
+       int ret;
+
+       pthread_mutex_lock(&mut);
+       if (!svc_cnt) {
+               ret = socketpair(AF_INET, SOCK_STREAM, 0, &svc_fds);
+               if (ret)
+                       goto out;
+
+               ret = pthread_create(&svc_id, NULL, rs_svc_run, NULL);
+               if (ret) {
+                       close(svc_fds[0]);
+                       close(svc_fds[1]);
+                       ret = ERR(ret);
+                       goto out;
+               }
+       }
+
+       msg.op = RS_SVC_INSERT;
+       msg.status = EINVAL;
+       msg.rs = rs;
+       svc_cnt++;
+       write(svc_fds[0], &msg, sizeof msg);
+       read(svc_fds[0], &msg, sizeof msg);
+       ret = ERR(msg.status);
+out:
+       pthread_mutex_unlock(&mut);
+       return ret;
+}
+
+static int rs_svc_remove(struct rsocket *rs)
+{
+       struct rs_svc_msg msg;
+       int ret;
+
+       pthread_mutex_lock(&mut);
+       msg.op = RS_SVC_REMOVE;
+       msg.status = EINVAL;
+       msg.rs = rs;
+       write(svc_fds[0], &msg, sizeof msg);
+       read(svc_fds[0], &msg, sizeof msg);
+       ret = ERR(msg.status);
+       if (!ret && !--svn_cnt)
+               pthread_join(svc_id, NULL);
+
+       pthread_mutex_unlock(&mut);
+       return ret;
+}
+
 static int rs_value_to_scale(int value, int bits)
 {
        return value <= (1 << (bits - 1)) ?
@@ -306,10 +443,10 @@ out:
        pthread_mutex_unlock(&mut);
 }
 
-static int rs_insert(struct rsocket *rs)
+static int rs_insert(struct rsocket *rs, index)
 {
        pthread_mutex_lock(&mut);
-       rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs);
+       rs->index = idm_set(&idm, index, rs);
        pthread_mutex_unlock(&mut);
        return rs->index;
 }
@@ -321,7 +458,7 @@ static void rs_remove(struct rsocket *rs)
        pthread_mutex_unlock(&mut);
 }
 
-static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
 {
        struct rsocket *rs;
 
@@ -329,7 +466,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        if (!rs)
                return NULL;
 
+       rs->type = type;
        rs->index = -1;
+       rs->dsock = -1;
        if (inherited_rs) {
                rs->sbuf_size = inherited_rs->sbuf_size;
                rs->rbuf_size = inherited_rs->rbuf_size;
@@ -351,12 +490,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
-       fastlock_init(&rs->iomap_lock);
+       fastlock_init(&rs->map_lock);
        dlist_init(&rs->iomap_list);
        dlist_init(&rs->iomap_queue);
        return rs;
 }
 
+/*
+ * TODO: Support datagram rsockets
+ */
 static int rs_set_nonblocking(struct rsocket *rs, long arg)
 {
        int ret = 0;
@@ -439,15 +581,32 @@ static int rs_init_bufs(struct rsocket *rs)
        return 0;
 }
 
-static int rs_create_cq(struct rsocket *rs)
+static int ds_init_bufs(struct ds_qp *qp)
 {
-       rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
-       if (!rs->cm_id->recv_cq_channel)
+       qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
+       if (!qp->rbuf)
+               return ERR(ENOMEM);
+
+       qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
+       if (!qp->smr)
+               return -1;
+
+       qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+       if (!qp->rmr)
                return -1;
 
-       rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
-                                          rs->cm_id, rs->cm_id->recv_cq_channel, 0);
-       if (!rs->cm_id->recv_cq)
+       return 0;
+}
+
+static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
+{
+       cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+       if (!cm_id->recv_cq_channel)
+               return -1;
+
+       cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+                                      cm_id, cm_id->recv_cq_channel, 0);
+       if (!cm_id->recv_cq)
                goto err1;
 
        if (rs->fd_flags & O_NONBLOCK) {
@@ -455,21 +614,20 @@ static int rs_create_cq(struct rsocket *rs)
                        goto err2;
        }
 
-       rs->cm_id->send_cq_channel = rs->cm_id->recv_cq_channel;
-       rs->cm_id->send_cq = rs->cm_id->recv_cq;
+       cm_id->send_cq_channel = cm_id->recv_cq_channel;
+       cm_id->send_cq = cm_id->recv_cq;
        return 0;
 
 err2:
-       ibv_destroy_cq(rs->cm_id->recv_cq);
-       rs->cm_id->recv_cq = NULL;
+       ibv_destroy_cq(cm_id->recv_cq);
+       cm_id->recv_cq = NULL;
 err1:
-       ibv_destroy_comp_channel(rs->cm_id->recv_cq_channel);
-       rs->cm_id->recv_cq_channel = NULL;
+       ibv_destroy_comp_channel(cm_id->recv_cq_channel);
+       cm_id->recv_cq_channel = NULL;
        return -1;
 }
 
-static inline int
-rs_post_recv(struct rsocket *rs)
+static inline int rs_post_recv(struct rsocket *rs)
 {
        struct ibv_recv_wr wr, *bad;
 
@@ -481,6 +639,23 @@ rs_post_recv(struct rsocket *rs)
        return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
 }
 
+static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, void *buf)
+{
+       struct ibv_recv_wr wr, *bad;
+       struct ibv_sge sge;
+
+       sge.addr = (uintptr_t) buf;
+       sge.length = RS_SNDLOWAT;
+       sge.lkey = qp->rmr;
+
+       wr.wr_id = RS_RECV_WR_ID;
+       wr.next = NULL;
+       wr.sg_list = &sge;
+       wr.num_sge = 1;
+
+       return rdma_seterrno(ibv_post_recv(qp->cm_id->qp, &wr, &bad));
+}
+
 static int rs_create_ep(struct rsocket *rs)
 {
        struct ibv_qp_init_attr qp_attr;
@@ -491,7 +666,7 @@ static int rs_create_ep(struct rsocket *rs)
        if (ret)
                return ret;
 
-       ret = rs_create_cq(rs);
+       ret = rs_create_cq(rs, rs->cm_id);
        if (ret)
                return ret;
 
@@ -548,8 +723,71 @@ static void rs_free_iomappings(struct rsocket *rs)
        }
 }
 
+static void ds_free_qp(struct ds_qp *qp)
+{
+       if (qp->smr)
+               rdma_dereg_mr(qp->smr);
+
+       if (qp->rbuf) {
+               if (qp->rmr)
+                       rdma_dereg_mr(qp->rmr);
+               free(qp->rbuf);
+       }
+
+       if (qp->cm_id) {
+               if (qp->cm_id->qp) {
+                       dlist_remove(&qp->list);
+                       rdma_destroy_qp(qp->cm_id);
+               }
+               rdma_destroy_id(qp->cm_id);
+       }
+       free(qp);
+}
+
+static void ds_free_qps(struct rsocket *rs)
+{
+       struct ds_qp *qp;
+       dlist_t *entry;
+
+       while (!dlist_empty(&rs->qp_list)) {
+               qp = container_of(rs->qp_list.next, struct ds_qp, list);
+               ds_free_qp(qp);
+       }
+}
+
+static void ds_free(struct rsocket *rs)
+{
+       if (rs->state & (rs_readable | rs_writable))
+               rs_svc_remove(rs);
+
+       if (rs->dsock >= 0)
+               close(rs->dsock);
+
+       if (rs->index >= 0)
+               rs_remove(rs);
+
+       ds_free_qps(rs);
+       if (rs->fds)
+               free(rs->fds);
+
+       if (rs->sbuf)
+               free(rs->sbuf);
+
+       fastlock_destroy(&rs->map_lock);
+       fastlock_destroy(&rs->cq_wait_lock);
+       fastlock_destroy(&rs->cq_lock);
+       fastlock_destroy(&rs->rlock);
+       fastlock_destroy(&rs->slock);
+       free(rs);
+}
+
 static void rs_free(struct rsocket *rs)
 {
+       if (rs->type == SOCK_DGRAM) {
+               ds_free(rs);
+               return;
+       }
+
        if (rs->index >= 0)
                rs_remove(rs);
 
@@ -581,7 +819,7 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
 
-       fastlock_destroy(&rs->iomap_lock);
+       fastlock_destroy(&rs->map_lock);
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
@@ -635,29 +873,56 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
        rs->sseq_comp = ntohs(conn->credits);
 }
 
+static int ds_init(struct rsocket *rs, int domain)
+{
+       rs->dsock = socket(domain, SOCK_DGRAM, 0);
+       if (rs->dsock < 0)
+               return rs->dsock;
+
+       rs->fds = calloc(1, sizeof *fds);
+       if (!rs->fds)
+               return ERR(ENOMEM);
+       rs->nfds = 1;
+       dlist_init(&rs->qp_list);
+
+       return 0;
+}
+
 int rsocket(int domain, int type, int protocol)
 {
        struct rsocket *rs;
-       int ret;
+       int index, ret;
 
        if ((domain != PF_INET && domain != PF_INET6) ||
-           (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
+           ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
+           (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
+           (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
                return ERR(ENOTSUP);
 
        rs_configure();
-       rs = rs_alloc(NULL);
+       rs = rs_alloc(NULL, type);
        if (!rs)
                return ERR(ENOMEM);
 
-       ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
-       if (ret)
-               goto err;
+       if (type == SOCK_STREAM) {
+               ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+               if (ret)
+                       goto err;
+
+               rs->cm_id->route.addr.src_addr.sa_family = domain;
+               index = rs->cm_id->channel->fd;
+       } else {
+               ret = ds_init(rs, domain);
+               if (ret)
+                       goto err;
+
+               index = rs->dsock;
+       }
 
-       ret = rs_insert(rs);
+       ret = rs_insert(rs, index);
        if (ret < 0)
                goto err;
 
-       rs->cm_id->route.addr.src_addr.sa_family = domain;
        return rs->index;
 
 err:
@@ -671,9 +936,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
        int ret;
 
        rs = idm_at(&idm, socket);
-       ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
-       if (!ret)
-               rs->state = rs_bound;
+       if (rs->type == SOCK_STREAM) {
+               ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
+               if (!ret)
+                       rs->state = rs_bound;
+       } else {
+               ret = bind(rs->dsock, addr, addrlen);
+               if (!ret) {
+                       ret = rs_svc_insert(rs);
+                       if (!ret)
+                               rs->state = rs_readable | rs_writable;
+               }
+       }
        return ret;
 }
 
@@ -709,7 +983,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        int ret;
 
        rs = idm_at(&idm, socket);
-       new_rs = rs_alloc(rs);
+       new_rs = rs_alloc(rs, rs->type);
        if (!new_rs)
                return ERR(ENOMEM);
 
@@ -717,7 +991,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
        if (ret)
                goto err;
 
-       ret = rs_insert(new_rs);
+       ret = rs_insert(new_rs, rs->cm_id->channel->fd);
        if (ret < 0)
                goto err;
 
@@ -854,13 +1128,212 @@ connected:
        return ret;
 }
 
+static int ds_init_ep(struct rsocket *rs)
+{
+       int ret;
+
+       rs_set_qp_size(rs);
+       if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT))
+               rs->rq_size = rs->rbuf_size / RS_SNDLOWAT;
+       else
+               rs->rbuf_size = rs->rq_size * RS_SNDLOWAT;
+
+       rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+       if (!rs->sbuf)
+               return ERR(ENOMEM);
+
+       rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf;
+       rs->sbuf_bytes_avail = rs->sbuf_size;
+       rs->sqe_avail = rs->sq_size;
+
+       ret = rs_svc_insert(rs);
+       if (ret)
+               return ret;
+
+       rs->state = rs_readable | rs_writable;
+       return 0;
+}
+
+static int ds_compare_addr(const void *dst1, const void *dst2)
+{
+       const struct sockaddr *sa1, *sa2;
+       size_t len;
+
+       sa1 = (const struct sockaddr *) dst1;
+       sa2 = (const struct sockaddr *) dst2;
+
+       len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
+             sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
+       return memcmp(dst1, dst2, len);
+}
+
+static int rs_any_addr(const union socket_addr *addr)
+{
+       if (addr->sa.sa_family == AF_INET) {
+               return (addr->sin.sin_addr == INADDR_ANY ||
+                       addr->sin.sin_addr == INADDR_LOOPBACK);
+       } else {
+               return (addr->sin6.sin6_addr == in6addr_any ||
+                       addr->sin6.sin6_addr == in6addr_loopback);
+       }
+}
+
+static int ds_get_src_addr(struct rsocket *rs,
+                          const struct sockaddr *dest_addr, socklen_t dest_len,
+                          union socket_addr *src_addr, socklen_t *src_len)
+{
+       int sock, ret;
+       uint16_t port;
+
+       *src_len = sizeof src_addr;
+       ret = getsockname(rs->dsock, &src_addr->sa, src_len);
+       if (ret || !rs_any_addr(src_addr))
+               return ret;
+
+       port = src_addr->sin.sin_port;
+       sock = socket(dest_addr->sa_family, SOCK_DGRAM, 0);
+       if (sock < 0)
+               return sock;
+
+       ret = connect(sock, dest_addr, dest_len);
+       if (ret)
+               goto out;
+
+       *src_len = sizeof src_addr;
+       ret = getsockname(sock, &src_addr->sa, src_len);
+       src_addr->sin.sin_port = port;
+out:
+       close(sock);
+       return ret;
+}
+
+static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
+                       socklen_t addrlen, struct ds_qp **qp)
+{
+       struct ibv_qp_init_attr qp_attr;
+       int ret;
+
+       *qp = calloc(1, sizeof(struct ds_qp));
+       if (!*qp)
+               return ERR(ENOMEM);
+
+       (*qp)->rs = rs;
+       ret = rdma_create_id(NULL, &(*qp)->cm_id, *qp, RDMA_PS_UDP);
+       if (ret)
+               goto err;
+
+       ret = rdma_bind_addr((*qp)->cm_id, &src_addr->sa);
+       if (ret)
+               goto err;
+
+       ret = ds_init_bufs(*qp);
+       if (ret)
+               goto err;
+
+       ret = rs_create_cq(rs, (*qp)->cm_id);
+       if (ret)
+               goto err;
+
+       memset(&qp_attr, 0, sizeof qp_attr);
+       qp_attr.qp_context = qp;
+       qp_attr.send_cq = rs->cm_id->send_cq;
+       qp_attr.recv_cq = rs->cm_id->recv_cq;
+       qp_attr.qp_type = IBV_QPT_UD;
+       qp_attr.sq_sig_all = 1;
+       qp_attr.cap.max_send_wr = rs->sq_size;
+       qp_attr.cap.max_recv_wr = rs->rq_size;
+       qp_attr.cap.max_send_sge = 2;
+       qp_attr.cap.max_recv_sge = 1;
+       qp_attr.cap.max_inline_data = rs->sq_inline;
+
+       ret = rdma_create_qp((*qp)->cm_id, NULL, &qp_attr);
+       if (ret)
+               return ret;
+
+       for (i = 0; i < rs->rq_size; i++) {
+               ret = ds_post_recv(rs, *qp, (*qp)->rbuf + i * RS_SNDLOWAT);
+               if (ret)
+                       goto err;
+       }
+       list_insert_head(&(*qp)->list, &rs->qp_list);
+       return 0;
+err:
+       ds_free_qp(*qp);
+       return ret;
+}
+
+static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr,
+                    socklen_t addrlen, struct ds_qp **qp)
+{
+       dlist_t *entry;
+
+       for (entry = rs->qp_list.next; entry != &rs->qp_list; entry = entry->next) {
+               *qp = container_of(entry, struct ds_qp, list);
+               if (!ds_compare_addr(rdma_get_local_addr((*qp)->cm_id)), src_addr)
+                       return 0;
+       }
+
+       return ds_create_qp(rs, src_addr, addrlen, qp);
+}
+
+static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
+                      socklen_t addrlen, struct ds_dest **dest)
+{
+       union socket_addr src_addr;
+       socklen_t src_len;
+       struct ds_qp *qp;
+       int ret = 0;
+
+       fastlock_acquire(&rs->map_lock);
+       dest = tfind(addr, &rs->dest_map, ds_compare_addr);
+       if (dest)
+               goto out;
+
+       if (rs->state == rs_init) {
+               ret = ds_init_ep(rs);
+               if (ret)
+                       goto out;
+       }
+
+       ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len);
+       if (ret)
+               goto out;
+
+       ret = ds_get_qp(rs, src_addr, src_len, &qp);
+       if (ret)
+               goto out;
+
+       *dest = calloc(1, sizeof(struct ds_dest));
+       if (!*dest) {
+               ret = ERR(ENOMEM);
+               goto out;
+       }
+
+       memcpy(&(*dest)->addr, addr, addrlen);
+       (*dest)->qp = qp;
+       tsearch((*dest)->addr, &rs->dest_map, ds_compare_addr);
+out:
+       fastlock_release(&rs->map_lock);
+       return ret;
+}
+
 int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 {
        struct rsocket *rs;
+       int ret;
 
        rs = idm_at(&idm, socket);
-       memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
-       return rs_do_connect(rs);
+       if (rs->type == SOCK_STREAM) {
+               memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen);
+               ret = rs_do_connect(rs);
+       } else {
+               fastlock_acquire(&rs->slock);
+               ret = connect(rs->dsock, addr, addrlen);
+               if (!ret)
+                       ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
+               fastlock_release(&rs->slock);
+       }
+       return ret;
 }
 
 static int rs_post_write_msg(struct rsocket *rs,
@@ -902,6 +1375,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int ds_post_send(struct rsocket *rs,
+                       struct ibv_sge *sgl, int nsge,
+                       uint64_t wr_id, int flags)
+{
+       struct ibv_send_wr wr, *bad;
+
+       wr.wr_id = wr_id;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+       wr.opcode = IBV_WR_SEND;
+       wr.send_flags = flags;
+       wr.wr.ud.ah = rs->conn_dest->ah;
+       wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
+       wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
+
+       return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
+}
+
 /*
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
@@ -932,6 +1424,15 @@ static int rs_write_data(struct rsocket *rs,
                                 flags, addr, rkey);
 }
 
+static int ds_send_data(struct rsocket *rs,
+                       struct ibv_sge *sgl, int nsge,
+                       uint32_t length, int flags)
+{
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= length;
+       return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
+}
+
 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
                           struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
 {
@@ -1045,7 +1546,7 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state = rs_disconnected;
                                        return 0;
                                } else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
-                                       rs->state &= ~rs_connect_rd;
+                                       rs->state &= ~rs_readable;
                                }
                                break;
                        case RS_OP_WRITE:
@@ -1218,9 +1719,14 @@ static int rs_can_send(struct rsocket *rs)
               (rs->target_sgl[rs->target_sge].length != 0);
 }
 
+static int ds_can_send(struct rsocket *rs)
+{
+       return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
+}
+
 static int rs_conn_can_send(struct rsocket *rs)
 {
-       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+       return rs_can_send(rs) || !(rs->state & rs_writable);
 }
 
 static int rs_conn_can_send_ctrl(struct rsocket *rs)
@@ -1235,7 +1741,7 @@ static int rs_have_rdata(struct rsocket *rs)
 
 static int rs_conn_have_rdata(struct rsocket *rs)
 {
-       return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
+       return rs_have_rdata(rs) || !(rs->state & rs_readable);
 }
 
 static int rs_conn_all_sends_done(struct rsocket *rs)
@@ -1338,7 +1844,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        rs->rbuf_bytes_avail += rsize;
                }
 
-       } while (left && (flags & MSG_WAITALL) && (rs->state & rs_connect_rd));
+       } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable));
 
        fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
@@ -1390,14 +1896,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        struct rs_iomap iom;
        int ret;
 
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
        while (!dlist_empty(&rs->iomap_queue)) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
                                          rs_conn_can_send);
                        if (ret)
                                break;
-                       if (!(rs->state & rs_connect_wr)) {
+                       if (!(rs->state & rs_writable)) {
                                ret = ERR(ECONNRESET);
                                break;
                        }
@@ -1446,10 +1952,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
        }
 
        rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return ret;
 }
 
+static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
+                           int iovcnt, int flags)
+{
+       struct ds_udp_header hdr;
+       struct msghdr msg;
+       struct iovec miov[4];
+       struct ds_qp *qp;
+
+       if (iovcnt > 4)
+               return ERR(ENOTSUP);
+
+       qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
+       hdr.tag = htonll(DS_UDP_TAG);
+       hdr.version = 1;
+       if (qp) {
+               hdr.sl = qp->sl;
+               hdr.slid = htons(qp->lid);
+               hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
+       } else {
+               hdr.sl = 0;
+               hdr.slid = 0;
+               hdr.qpn = 0;
+       }
+
+       miov[0].iov_base = &hdr;
+       miov[0].iov_len = sizeof hdr;
+       memcpy(&miov[1], iov, sizeof *iov * iovcnt);
+
+       memset(&msg, 0, sizeof msg);
+       /* TODO: specify name if needed */
+       msg.msg_iov = miov;
+       msg.msg_iovlen = iovcnt + 1;
+       return sendmsg(rs->fd, msg, flags);
+}
+
+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+       struct iovec iov;
+       iov.iov_base = buf;
+       iov_iov_len = len;
+       return ds_sendv_udp(s, &iov, 1, flags);
+}
+
+static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
+{
+       struct ibv_sge sge;
+       int ret = 0;
+
+       if (!rs->conn_dest || !rs->conn_dest->ah)
+               return ds_send_udp(rs, buf, len, flags);
+
+       rs->sbytes_needed = len;
+       if (!ds_can_send(rs)) {
+               ret = rs_get_comp(rs, 1, ds_can_send);
+               if (ret)
+                       return ds_send_udp(rs, buf, len, flags);
+       }
+
+       if (len <= rs->sq_inline) {
+               sge.addr = (uintptr_t) buf;
+               sge.length = len;
+               sge.lkey = 0;
+               ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
+       } else if (len <= rs_sbuf_left(rs)) {
+               memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
+               rs->ssgl[0].length = len;
+               ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
+               if (len < rs_sbuf_left(rs))
+                       rs->ssgl[0].addr += len;
+               else
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+       } else {
+               rs->ssgl[0].length = rs_sbuf_left(rs);
+               memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+                       rs->ssgl[0].length);
+               rs->ssgl[1].length = len - rs->ssgl[0].length;
+               memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+               ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
+               rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+       }
+
+       return ret ? ret : len;
+}
+
 /*
  * We overlap sending the data, by posting a small work request immediately,
  * then increasing the size of the send on each iteration.
@@ -1463,6 +2053,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        int ret = 0;
 
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM) {
+               fastlock_acquire(&rs->slock);
+               ret = dsend(rs, buf, len, flags);
+               fastlock_release(&rs->slock);
+               return ret;
+       }
+
        if (rs->state & rs_opening) {
                ret = rs_do_connect(rs);
                if (ret) {
@@ -1484,7 +2081,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                                          rs_conn_can_send);
                        if (ret)
                                break;
-                       if (!(rs->state & rs_connect_wr)) {
+                       if (!(rs->state & rs_writable)) {
                                ret = ERR(ECONNRESET);
                                break;
                        }
@@ -1537,10 +2134,21 @@ out:
 ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
                const struct sockaddr *dest_addr, socklen_t addrlen)
 {
-       if (dest_addr || addrlen)
-               return ERR(EISCONN);
+       struct rsocket *rs;
+
+       rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_STREAM) {
+               if (dest_addr || addrlen)
+                       return ERR(EISCONN);
 
-       return rsend(socket, buf, len, flags);
+               return rsend(socket, buf, len, flags);
+       }
+
+       fastlock_acquire(&rs->slock);
+       ds_connect(rs, dest_addr, addrlen);
+       ret = dsend(rs, buf, len, flags);
+       fastlock_release(&rs->slock);
+       return ret;
 }
 
 static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
@@ -1599,7 +2207,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                                          rs_conn_can_send);
                        if (ret)
                                break;
-                       if (!(rs->state & rs_connect_wr)) {
+                       if (!(rs->state & rs_writable)) {
                                ret = ERR(ECONNRESET);
                                break;
                        }
@@ -1652,7 +2260,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
        if (msg->msg_control && msg->msg_controllen)
                return ERR(ENOTSUP);
 
-       return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
+       return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
 }
 
 ssize_t rwrite(int socket, const void *buf, size_t count)
@@ -1948,7 +2556,7 @@ int rshutdown(int socket, int how)
 
        rs = idm_at(&idm, socket);
        if (how == SHUT_RD) {
-               rs->state &= ~rs_connect_rd;
+               rs->state &= ~rs_readable;
                return 0;
        }
 
@@ -1958,10 +2566,10 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected) {
                if (how == SHUT_RDWR) {
                        ctrl = RS_CTRL_DISCONNECT;
-                       rs->state &= ~(rs_connect_rd | rs_connect_wr);
+                       rs->state &= ~(rs_readable | rs_writable);
                } else {
-                       rs->state &= ~rs_connect_wr;
-                       ctrl = (rs->state & rs_connect_rd) ?
+                       rs->state &= ~rs_writable;
+                       ctrl = (rs->state & rs_readable) ?
                                RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
                }
                if (!rs->ctrl_avail) {
@@ -2017,8 +2625,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
 
        rs = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
-       return 0;
+       if (rs->type == SOCK_STREAM) {
+               rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+               return 0;
+       } else {
+               return getpeername(rs->fs, addr, addrlen);
+       }
 }
 
 int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
@@ -2026,8 +2638,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
        struct rsocket *rs;
 
        rs = idm_at(&idm, socket);
-       rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
-       return 0;
+       if (rs->type == SOCK_STREAM) {
+               rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+               return 0;
+       } else {
+               return getsockname(rs->fd, addr, addrlen);
+       }
 }
 
 int rsetsockopt(int socket, int level, int optname,
@@ -2039,6 +2655,12 @@ int rsetsockopt(int socket, int level, int optname,
 
        ret = ERR(ENOTSUP);
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
+               ret = setsockopt(rs->fd, optname, optval, optlen);
+               if (ret)
+                       return ret;
+       }
+
        switch (level) {
        case SOL_SOCKET:
                opts = &rs->so_opts;
@@ -2156,6 +2778,9 @@ int rgetsockopt(int socket, int level, int optname,
        int ret = 0;
 
        rs = idm_at(&idm, socket);
+       if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
+               return getsockopt(rs->fd, level, optname, optval, optlen);
+
        switch (level) {
        case SOL_SOCKET:
                switch (optname) {
@@ -2314,7 +2939,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
        if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
                return ERR(EINVAL);
 
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
        if (prot & PROT_WRITE) {
                iomr = rs_get_iomap_mr(rs);
                access |= IBV_ACCESS_REMOTE_WRITE;
@@ -2348,7 +2973,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
                dlist_insert_tail(&iomr->entry, &rs->iomap_list);
        }
 out:
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return offset;
 }
 
@@ -2360,7 +2985,7 @@ int riounmap(int socket, void *buf, size_t len)
        int ret = 0;
 
        rs = idm_at(&idm, socket);
-       fastlock_acquire(&rs->iomap_lock);
+       fastlock_acquire(&rs->map_lock);
 
        for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
             entry = entry->next) {
@@ -2381,7 +3006,7 @@ int riounmap(int socket, void *buf, size_t len)
        }
        ret = ERR(EINVAL);
 out:
-       fastlock_release(&rs->iomap_lock);
+       fastlock_release(&rs->map_lock);
        return ret;
 }
 
@@ -2425,7 +3050,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
                                          rs_conn_can_send);
                        if (ret)
                                break;
-                       if (!(rs->state & rs_connect_wr)) {
+                       if (!(rs->state & rs_writable)) {
                                ret = ERR(ECONNRESET);
                                break;
                        }
@@ -2475,3 +3100,24 @@ out:
 
        return (ret && left == count) ? ret : count - left;
 }
+
+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
+                 struct sockaddr *src_addr, socklen_t *addrlen)
+{
+       int ret;
+
+       ret = rrecv(socket, buf, len, flags);
+       if (ret > 0 && src_addr)
+               rgetpeername(socket, src_addr, addrlen);
+
+       return ret;
+}
+
+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
+               const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+       if (dest_addr || addrlen)
+               return ERR(EISCONN);
+
+       return usend(socket, buf, len, flags);
+}