]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rename
authorSean Hefty <sean.hefty@intel.com>
Fri, 16 Nov 2012 18:42:04 +0000 (10:42 -0800)
committerSean Hefty <sean.hefty@intel.com>
Fri, 16 Nov 2012 18:42:04 +0000 (10:42 -0800)
meta
patches/dsocket [new file with mode: 0644]
patches/usocket [deleted file]

diff --git a/meta b/meta
index 9260340b3cdff65a297c96b078b106bebc84568c..ef037d166b762053cfe6955ab4999bea555425e4 100644 (file)
--- a/meta
+++ b/meta
@@ -1,8 +1,8 @@
 Version: 1
-Previous: c4ca7ea17d063404aa5deeb24fb21031d6810ae0
+Previous: 1a37348ae7c8dd4c895799ce3915f238a6d367cf
 Head: 158ea5a8c1a0fcf3ca61c642095023af16759c90
 Applied:
-  usocket: 158ea5a8c1a0fcf3ca61c642095023af16759c90
+  dsocket: 158ea5a8c1a0fcf3ca61c642095023af16759c90
 Unapplied:
   test-udp: f6c78ad2a26f452cf166aff1baa7b76160bd8bf7
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
diff --git a/patches/dsocket b/patches/dsocket
new file mode 100644 (file)
index 0000000..7e35c6f
--- /dev/null
@@ -0,0 +1,762 @@
+Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
+Top:    97a52629c221cba1033082bbd308ecfc4d4b6082
+Author: Sean Hefty <sean.hefty@intel.com>
+Date:   2012-11-09 10:26:38 -0800
+
+rsocket: Add datagram support
+
+Signed-off-by: Sean Hefty <sean.hefty@intel.com>
+
+
+---
+
+diff --git a/docs/rsocket b/docs/rsocket
+index 1484f65..03d49df 100644
+--- a/docs/rsocket
++++ b/docs/rsocket
+@@ -1,7 +1,7 @@
+-rsocket Protocol and Design Guide               9/10/2012
++rsocket Protocol and Design Guide               11/11/2012
+-Overview
+---------
++Data Streaming (TCP) Overview
++-----------------------------
+ Rsockets is a protocol over RDMA that supports a socket-level API
+ for applications.  For details on the current state of the
+ implementation, readers should refer to the rsocket man page.  This
+@@ -189,3 +189,47 @@ registered remote data buffer.
+ From host A's perspective, the transfer appears as a normal send/write
+ operation, with the data stream redirected directly into the receiving
+ application's buffer.
++
++
++
++Datagram Overview
++-----------------
++THe rsocket API supports datagram sockets.  Datagram support is handled through an
++entirely different protocol and internal implementation.  Unlike connected rsockets,
++datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
++socket may use any number of network (IP) addresses, including those which map to
++different RDMA devices.  As a result, a single datagram rsocket must support
++using multiple RDMA devices and ports, and a datagram rsocket references a single
++UDP socket, plus zero or more UD QPs.
++
++Rsockets uses headers inserted before user data sent over UDP sockets to resolve
++remote UD QP numbers.  When a user first attempts to send a datagram to a remote
++address (IP and UDP port), rsockets will take the following steps.  First, it
++will allocate and store the destination address into a lookup table for future
++use.  Then it will resolve which local network address should be used when sending
++to the specified destination.  It will allocate a UD QP on the RDMA device
++associated with the local address.  Finally, rsockets will send the user's
++datagram to the remote UDP socket, but inserting a header before the datagram.
++The header specifies the UD QP number associated with the local network address
++(IP and UDP port) of the send.
++
++Under many circumstances, the rsocket UDP header also provides enough path
++information for the receiver to send the reply using a UD QP.  However, certain
++fabric topologies may require contacting a subnet administrator.  Whenever
++rsockets lacks sufficient information to send data to a remote peer using
++a UD QP, it will automatically fall back to using a standard UDP socket.  This
++helps minimize the latency for performing a given send, while lenghty address
++and route resolution protocols complete.
++
++Currently, rsockets allocates a single UD QP, with an infrastructure is provided
++to support more.  Future enhancements may add support for multiple UD QPs, each
++associated with a single network (IP) address.  Multiplexing different network
++addresses over a single UD QP and using shared receive queues are other possible
++enhancements.
++
++Because rsockets may use multiple UD QPs, buffer management can become an issue.
++The total size of receive buffers is specified by the mem_default configuration
++files, but may be overridden with rsetsockopt.  The buffer is divided into
++MTU sized messages and distributed among the allocated QPs.  As new QPs are
++created, the receive buffers may be redistributed by posting loopback sends
++on a QP in order to free the receive buffers for reposting to a different QP.  
+\ No newline at end of file
+diff --git a/src/rsocket.c b/src/rsocket.c
+index 58fcb8e..99e638c 100644
+--- a/src/rsocket.c
++++ b/src/rsocket.c
+@@ -110,6 +110,12 @@ struct rs_msg {
+       uint32_t data;
+ };
++struct ds_qp;
++
++struct ds_msg {
++      struct ds_qp *qp;
++};
++
+ struct rs_sge {
+       uint64_t addr;
+       uint32_t key;
+@@ -169,13 +175,63 @@ enum rs_state {
+ #define RS_OPT_SWAP_SGL 1
+-struct rsocket {
++union socket_addr {
++      struct sockaddr         sa;
++      struct sockaddr_in      sin;
++      struct sockaddr_in6     sin6;
++};
++
++struct ds_qp {
+       struct rdma_cm_id *cm_id;
++      int               rbuf_cnt;
++      uint16_t          lid;
++      uint8_t           sl;
++};
++
++struct ds_dest {
++      union socket_addr addr;
++      struct ds_qp      *qp;
++      struct ibv_ah     *ah;
++      uint32_t           qpn;
++      atomic_t           refcnt;
++};
++
++struct rsocket {
++      int               type;
++      int               index;
+       fastlock_t        slock;
+       fastlock_t        rlock;
+       fastlock_t        cq_lock;
+       fastlock_t        cq_wait_lock;
+-      fastlock_t        iomap_lock;
++      fastlock_t        map_lock; /* acquire slock first if needed */
++
++      union {
++              /* data stream */
++              struct {
++                      struct rdma_cm_id *cm_id;
++
++                      int               remote_sge;
++                      struct rs_sge     remote_sgl;
++                      struct rs_sge     remote_iomap;
++
++                      struct ibv_mr     *target_mr;
++                      int               target_sge;
++                      int               target_iomap_size;
++                      void              *target_buffer_list;
++                      volatile struct rs_sge    *target_sgl;
++                      struct rs_iomap   *target_iomap;
++              };
++              /* datagram */
++              struct {
++                      struct ds_qp      *qp_array;
++                      void              *dest_map;
++                      struct ds_dest    *conn_dest;
++                      struct pollfd     *fds;
++                      nfds_t            nfds;
++                      int               fd;
++                      int               sbytes_needed;
++              };
++      };
+       int               opts;
+       long              fd_flags;
+@@ -186,7 +242,7 @@ struct rsocket {
+       int               cq_armed;
+       int               retries;
+       int               err;
+-      int               index;
++
+       int               ctrl_avail;
+       int               sqe_avail;
+       int               sbuf_bytes_avail;
+@@ -203,34 +259,37 @@ struct rsocket {
+       int               rbuf_offset;
+       int               rmsg_head;
+       int               rmsg_tail;
+-      struct rs_msg     *rmsg;
+-
+-      int               remote_sge;
+-      struct rs_sge     remote_sgl;
+-      struct rs_sge     remote_iomap;
++      union {
++              struct rs_msg     *rmsg;
++              struct ds_msg     *dmsg;
++      };
+       struct rs_iomap_mr *remote_iomappings;
+       dlist_entry       iomap_list;
+       dlist_entry       iomap_queue;
+       int               iomap_pending;
+-      struct ibv_mr    *target_mr;
+-      int               target_sge;
+-      int               target_iomap_size;
+-      void             *target_buffer_list;
+-      volatile struct rs_sge    *target_sgl;
+-      struct rs_iomap  *target_iomap;
+-
+       uint32_t          rbuf_size;
+-      struct ibv_mr    *rmr;
++      struct ibv_mr     *rmr;
+       uint8_t           *rbuf;
+       uint32_t          sbuf_size;
+-      struct ibv_mr    *smr;
++      struct ibv_mr     *smr;
+       struct ibv_sge    ssgl[2];
+       uint8_t           *sbuf;
+ };
++#define DS_UDP_TAG 0x5555555555555555ULL
++
++struct ds_udp_header {
++      uint64_t          tag;
++      uint8_t           version;
++      uint8_t           sl;
++      uint16_t          slid;
++      uint32_t          qpn;  /* upper 8-bits reserved */
++};
++
++
+ static int rs_value_to_scale(int value, int bits)
+ {
+       return value <= (1 << (bits - 1)) ?
+@@ -306,10 +365,10 @@ out:
+       pthread_mutex_unlock(&mut);
+ }
+-static int rs_insert(struct rsocket *rs)
++static int rs_insert(struct rsocket *rs, index)
+ {
+       pthread_mutex_lock(&mut);
+-      rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs);
++      rs->index = idm_set(&idm, index, rs);
+       pthread_mutex_unlock(&mut);
+       return rs->index;
+ }
+@@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs)
+       pthread_mutex_unlock(&mut);
+ }
+-static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
++static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
+ {
+       struct rsocket *rs;
+@@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+       if (!rs)
+               return NULL;
++      rs->type = type;
+       rs->index = -1;
++      rs->fd = -1;
+       if (inherited_rs) {
+               rs->sbuf_size = inherited_rs->sbuf_size;
+               rs->rbuf_size = inherited_rs->rbuf_size;
+@@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+       fastlock_init(&rs->rlock);
+       fastlock_init(&rs->cq_lock);
+       fastlock_init(&rs->cq_wait_lock);
+-      fastlock_init(&rs->iomap_lock);
++      fastlock_init(&rs->map_lock);
+       dlist_init(&rs->iomap_list);
+       dlist_init(&rs->iomap_queue);
+       return rs;
+@@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs)
+               rdma_destroy_id(rs->cm_id);
+       }
+-      fastlock_destroy(&rs->iomap_lock);
++      if (rs->fd >= 0)
++              close(rs->fd);
++      if (rs->fds)
++              free(rs->fds);
++
++      fastlock_destroy(&rs->map_lock);
+       fastlock_destroy(&rs->cq_wait_lock);
+       fastlock_destroy(&rs->cq_lock);
+       fastlock_destroy(&rs->rlock);
+@@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+       rs->sseq_comp = ntohs(conn->credits);
+ }
++static int ds_init(struct rsocket *rs, int domain)
++{
++      rs->fd = socket(domain, SOCK_DGRAM, 0);
++      if (rs->fd < 0)
++              return rs->fd;
++
++      rs->fds = calloc(1, sizeof *fds);
++      if (!rs->fds)
++              return ERR(ENOMEM);
++      rs->nfds = 1;
++
++      return 0;
++}
++
+ int rsocket(int domain, int type, int protocol)
+ {
+       struct rsocket *rs;
+-      int ret;
++      int index, ret;
+       if ((domain != PF_INET && domain != PF_INET6) ||
+-          (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
++          ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
++          (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
++          (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
+               return ERR(ENOTSUP);
+       rs_configure();
+-      rs = rs_alloc(NULL);
++      rs = rs_alloc(NULL, type);
+       if (!rs)
+               return ERR(ENOMEM);
+-      ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+-      if (ret)
+-              goto err;
++      if (type == SOCK_STREAM) {
++              ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
++              if (ret)
++                      goto err;
++
++              rs->cm_id->route.addr.src_addr.sa_family = domain;
++              index = rs->cm_id->channel->fd;
++      } else {
++              ret = ds_init(rs, domain);
++              if (ret)
++                      goto err;
+-      ret = rs_insert(rs);
++              index = rs->fd;
++      }
++
++      ret = rs_insert(rs, index);
+       if (ret < 0)
+               goto err;
+-      rs->cm_id->route.addr.src_addr.sa_family = domain;
+       return rs->index;
+ err:
+@@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+       int ret;
+       rs = idm_at(&idm, socket);
+-      ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
+-      if (!ret)
+-              rs->state = rs_bound;
++      if (rs->type == SOCK_STREAM) {
++              ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
++              if (!ret)
++                      rs->state = rs_bound;
++      } else {
++              ret = bind(rs->fd, addr, addrlen);
++      }
+       return ret;
+ }
+@@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+       int ret;
+       rs = idm_at(&idm, socket);
+-      new_rs = rs_alloc(rs);
++      new_rs = rs_alloc(rs, rs->type);
+       if (!new_rs)
+               return ERR(ENOMEM);
+@@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+       if (ret)
+               goto err;
+-      ret = rs_insert(new_rs);
++      ret = rs_insert(new_rs, rs->cm_id->channel->fd);
+       if (ret < 0)
+               goto err;
+@@ -854,6 +950,66 @@ connected:
+       return ret;
+ }
++static int ds_compare_dest(const void *dst1, const void *dst2)
++{
++      const struct sockaddr *sa1, *sa2;
++      size_t len;
++
++      sa1 = (const struct sockaddr *) dst1;
++      sa2 = (const struct sockaddr *) dst2;
++
++      len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
++            sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
++      return memcmp(dst1, dst2, len);
++}
++
++/* Caller must hold map_lock around accessing source address */
++static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
++                                        socklen_t dst_len, socklen_t *src_len)
++{
++      static union socket_addr src_addr;
++      int sock, ret;
++
++      sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
++      if (sock < 0)
++              return NULL;
++
++      ret = connect(sock, &dst_addr->sa, dst_len);
++      if (ret)
++              goto out;
++
++      *src_len = sizeof src_addr;
++      ret = getsockname(sock, &src_addr.sa, src_len);
++
++out:
++      close(sock);
++      return ret ? NULL : &src_addr;
++}
++
++static struct ds_qp *ds_get_qp(struct rsocket *rs,
++                             const struct sockaddr *src_addr, socklen_t addrlen)
++{
++      union socket_addr *addr;
++      socklen_t len;
++
++}
++
++static int ds_connect(struct rsocket *rs,
++                    const struct sockaddr *dest_addr, socklen_t addrlen)
++{
++      struct ds_dest **dest;
++
++      fastlock_acquire(&rs->map_lock);
++      dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
++      if (dest) {
++              rs->conn_dest = *dest;
++              goto out;
++      }
++out:
++      fastlock_release(&rs->map_lock);
++      return 0;
++}
++
+ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
+ {
+       struct rsocket *rs;
+@@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs,
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+ }
++static int ds_post_send(struct rsocket *rs,
++                      struct ibv_sge *sgl, int nsge,
++                      uint64_t wr_id, int flags)
++{
++      struct ibv_send_wr wr, *bad;
++
++      wr.wr_id = wr_id;
++      wr.next = NULL;
++      wr.sg_list = sgl;
++      wr.num_sge = nsge;
++      wr.opcode = IBV_WR_SEND;
++      wr.send_flags = flags;
++      wr.wr.ud.ah = rs->conn_dest->ah;
++      wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
++      wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
++
++      return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
++}
++
+ /*
+  * Update target SGE before sending data.  Otherwise the remote side may
+  * update the entry before we do.
+@@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs,
+                                flags, addr, rkey);
+ }
++static int ds_send_data(struct rsocket *rs,
++                      struct ibv_sge *sgl, int nsge,
++                      uint32_t length, int flags)
++{
++      rs->sqe_avail--;
++      rs->sbuf_bytes_avail -= length;
++      return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
++}
++
+ static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+ {
+@@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs)
+              (rs->target_sgl[rs->target_sge].length != 0);
+ }
++static int ds_can_send(struct rsocket *rs)
++{
++      return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
++}
++
+ static int rs_conn_can_send(struct rsocket *rs)
+ {
+       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+@@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+       struct rs_iomap iom;
+       int ret;
+-      fastlock_acquire(&rs->iomap_lock);
++      fastlock_acquire(&rs->map_lock);
+       while (!dlist_empty(&rs->iomap_queue)) {
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+@@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+       }
+       rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+-      fastlock_release(&rs->iomap_lock);
++      fastlock_release(&rs->map_lock);
+       return ret;
+ }
++static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
++                          int iovcnt, int flags)
++{
++      struct ds_udp_header hdr;
++      struct msghdr msg;
++      struct iovec miov[4];
++      struct ds_qp *qp;
++
++      if (iovcnt > 4)
++              return ERR(ENOTSUP);
++
++      qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
++      hdr.tag = htonll(DS_UDP_TAG);
++      hdr.version = 1;
++      if (qp) {
++              hdr.sl = qp->sl;
++              hdr.slid = htons(qp->lid);
++              hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
++      } else {
++              hdr.sl = 0;
++              hdr.slid = 0;
++              hdr.qpn = 0;
++      }
++
++      miov[0].iov_base = &hdr;
++      miov[0].iov_len = sizeof hdr;
++      memcpy(&miov[1], iov, sizeof *iov * iovcnt);
++
++      memset(&msg, 0, sizeof msg);
++      /* TODO: specify name if needed */
++      msg.msg_iov = miov;
++      msg.msg_iovlen = iovcnt + 1;
++      return sendmsg(rs->fd, msg, flags);
++}
++
++static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
++{
++      struct iovec iov;
++      iov.iov_base = buf;
++      iov_iov_len = len;
++      return ds_sendv_udp(s, &iov, 1, flags);
++}
++
++static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
++{
++      struct ibv_sge sge;
++      int ret = 0;
++
++      if (!rs->conn_dest || !rs->conn_dest->ah)
++              return ds_send_udp(rs, buf, len, flags);
++
++      rs->sbytes_needed = len;
++      if (!ds_can_send(rs)) {
++              ret = rs_get_comp(rs, 1, ds_can_send);
++              if (ret)
++                      return ds_send_udp(rs, buf, len, flags);
++      }
++
++      if (len <= rs->sq_inline) {
++              sge.addr = (uintptr_t) buf;
++              sge.length = len;
++              sge.lkey = 0;
++              ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
++      } else if (len <= rs_sbuf_left(rs)) {
++              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
++              rs->ssgl[0].length = len;
++              ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
++              if (len < rs_sbuf_left(rs))
++                      rs->ssgl[0].addr += len;
++              else
++                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
++      } else {
++              rs->ssgl[0].length = rs_sbuf_left(rs);
++              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
++                      rs->ssgl[0].length);
++              rs->ssgl[1].length = len - rs->ssgl[0].length;
++              memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
++              ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
++              rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
++      }
++
++      return ret ? ret : len;
++}
++
+ /*
+  * We overlap sending the data, by posting a small work request immediately,
+  * then increasing the size of the send on each iteration.
+@@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+       int ret = 0;
+       rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_DGRAM) {
++              fastlock_acquire(&rs->slock);
++              ret = dsend(rs, buf, len, flags);
++              fastlock_release(&rs->slock);
++              return ret;
++      }
++
+       if (rs->state & rs_opening) {
+               ret = rs_do_connect(rs);
+               if (ret) {
+@@ -1537,10 +1817,21 @@ out:
+ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
+               const struct sockaddr *dest_addr, socklen_t addrlen)
+ {
+-      if (dest_addr || addrlen)
+-              return ERR(EISCONN);
++      struct rsocket *rs;
++
++      rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_STREAM) {
++              if (dest_addr || addrlen)
++                      return ERR(EISCONN);
++
++              return rsend(socket, buf, len, flags);
++      }
+-      return rsend(socket, buf, len, flags);
++      fastlock_acquire(&rs->slock);
++      ds_connect(rs, dest_addr, addrlen);
++      ret = dsend(rs, buf, len, flags);
++      fastlock_release(&rs->slock);
++      return ret;
+ }
+ static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
+@@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+       if (msg->msg_control && msg->msg_controllen)
+               return ERR(ENOTSUP);
+-      return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
++      return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
+ }
+ ssize_t rwrite(int socket, const void *buf, size_t count)
+@@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+       struct rsocket *rs;
+       rs = idm_at(&idm, socket);
+-      rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
+-      return 0;
++      if (rs->type == SOCK_STREAM) {
++              rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
++              return 0;
++      } else {
++              return getpeername(rs->fs, addr, addrlen);
++      }
+ }
+ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+       struct rsocket *rs;
+       rs = idm_at(&idm, socket);
+-      rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
+-      return 0;
++      if (rs->type == SOCK_STREAM) {
++              rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
++              return 0;
++      } else {
++              return getsockname(rs->fd, addr, addrlen);
++      }
+ }
+ int rsetsockopt(int socket, int level, int optname,
+@@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname,
+       ret = ERR(ENOTSUP);
+       rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
++              ret = setsockopt(rs->fd, optname, optval, optlen);
++              if (ret)
++                      return ret;
++      }
++
+       switch (level) {
+       case SOL_SOCKET:
+               opts = &rs->so_opts;
+@@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname,
+       int ret = 0;
+       rs = idm_at(&idm, socket);
++      if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
++              return getsockopt(rs->fd, level, optname, optval, optlen);
++
+       switch (level) {
+       case SOL_SOCKET:
+               switch (optname) {
+@@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+       if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
+               return ERR(EINVAL);
+-      fastlock_acquire(&rs->iomap_lock);
++      fastlock_acquire(&rs->map_lock);
+       if (prot & PROT_WRITE) {
+               iomr = rs_get_iomap_mr(rs);
+               access |= IBV_ACCESS_REMOTE_WRITE;
+@@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+       }
+ out:
+-      fastlock_release(&rs->iomap_lock);
++      fastlock_release(&rs->map_lock);
+       return offset;
+ }
+@@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len)
+       int ret = 0;
+       rs = idm_at(&idm, socket);
+-      fastlock_acquire(&rs->iomap_lock);
++      fastlock_acquire(&rs->map_lock);
+       for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+            entry = entry->next) {
+@@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len)
+       }
+       ret = ERR(EINVAL);
+ out:
+-      fastlock_release(&rs->iomap_lock);
++      fastlock_release(&rs->map_lock);
+       return ret;
+ }
+@@ -2475,3 +2783,24 @@ out:
+       return (ret && left == count) ? ret : count - left;
+ }
++
++ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
++                struct sockaddr *src_addr, socklen_t *addrlen)
++{
++      int ret;
++
++      ret = rrecv(socket, buf, len, flags);
++      if (ret > 0 && src_addr)
++              rgetpeername(socket, src_addr, addrlen);
++
++      return ret;
++}
++
++ssize_t usendto(int socket, const void *buf, size_t len, int flags,
++              const struct sockaddr *dest_addr, socklen_t addrlen)
++{
++      if (dest_addr || addrlen)
++              return ERR(EISCONN);
++
++      return usend(socket, buf, len, flags);
++}
diff --git a/patches/usocket b/patches/usocket
deleted file mode 100644 (file)
index 7e35c6f..0000000
+++ /dev/null
@@ -1,762 +0,0 @@
-Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top:    97a52629c221cba1033082bbd308ecfc4d4b6082
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-11-09 10:26:38 -0800
-
-rsocket: Add datagram support
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/docs/rsocket b/docs/rsocket
-index 1484f65..03d49df 100644
---- a/docs/rsocket
-+++ b/docs/rsocket
-@@ -1,7 +1,7 @@
--rsocket Protocol and Design Guide               9/10/2012
-+rsocket Protocol and Design Guide               11/11/2012
--Overview
----------
-+Data Streaming (TCP) Overview
-+-----------------------------
- Rsockets is a protocol over RDMA that supports a socket-level API
- for applications.  For details on the current state of the
- implementation, readers should refer to the rsocket man page.  This
-@@ -189,3 +189,47 @@ registered remote data buffer.
- From host A's perspective, the transfer appears as a normal send/write
- operation, with the data stream redirected directly into the receiving
- application's buffer.
-+
-+
-+
-+Datagram Overview
-+-----------------
-+THe rsocket API supports datagram sockets.  Datagram support is handled through an
-+entirely different protocol and internal implementation.  Unlike connected rsockets,
-+datagram rsockets are not necessarily bound to a network (IP) address.  A datagram
-+socket may use any number of network (IP) addresses, including those which map to
-+different RDMA devices.  As a result, a single datagram rsocket must support
-+using multiple RDMA devices and ports, and a datagram rsocket references a single
-+UDP socket, plus zero or more UD QPs.
-+
-+Rsockets uses headers inserted before user data sent over UDP sockets to resolve
-+remote UD QP numbers.  When a user first attempts to send a datagram to a remote
-+address (IP and UDP port), rsockets will take the following steps.  First, it
-+will allocate and store the destination address into a lookup table for future
-+use.  Then it will resolve which local network address should be used when sending
-+to the specified destination.  It will allocate a UD QP on the RDMA device
-+associated with the local address.  Finally, rsockets will send the user's
-+datagram to the remote UDP socket, but inserting a header before the datagram.
-+The header specifies the UD QP number associated with the local network address
-+(IP and UDP port) of the send.
-+
-+Under many circumstances, the rsocket UDP header also provides enough path
-+information for the receiver to send the reply using a UD QP.  However, certain
-+fabric topologies may require contacting a subnet administrator.  Whenever
-+rsockets lacks sufficient information to send data to a remote peer using
-+a UD QP, it will automatically fall back to using a standard UDP socket.  This
-+helps minimize the latency for performing a given send, while lenghty address
-+and route resolution protocols complete.
-+
-+Currently, rsockets allocates a single UD QP, with an infrastructure is provided
-+to support more.  Future enhancements may add support for multiple UD QPs, each
-+associated with a single network (IP) address.  Multiplexing different network
-+addresses over a single UD QP and using shared receive queues are other possible
-+enhancements.
-+
-+Because rsockets may use multiple UD QPs, buffer management can become an issue.
-+The total size of receive buffers is specified by the mem_default configuration
-+files, but may be overridden with rsetsockopt.  The buffer is divided into
-+MTU sized messages and distributed among the allocated QPs.  As new QPs are
-+created, the receive buffers may be redistributed by posting loopback sends
-+on a QP in order to free the receive buffers for reposting to a different QP.  
-\ No newline at end of file
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..99e638c 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -110,6 +110,12 @@ struct rs_msg {
-       uint32_t data;
- };
-+struct ds_qp;
-+
-+struct ds_msg {
-+      struct ds_qp *qp;
-+};
-+
- struct rs_sge {
-       uint64_t addr;
-       uint32_t key;
-@@ -169,13 +175,63 @@ enum rs_state {
- #define RS_OPT_SWAP_SGL 1
--struct rsocket {
-+union socket_addr {
-+      struct sockaddr         sa;
-+      struct sockaddr_in      sin;
-+      struct sockaddr_in6     sin6;
-+};
-+
-+struct ds_qp {
-       struct rdma_cm_id *cm_id;
-+      int               rbuf_cnt;
-+      uint16_t          lid;
-+      uint8_t           sl;
-+};
-+
-+struct ds_dest {
-+      union socket_addr addr;
-+      struct ds_qp      *qp;
-+      struct ibv_ah     *ah;
-+      uint32_t           qpn;
-+      atomic_t           refcnt;
-+};
-+
-+struct rsocket {
-+      int               type;
-+      int               index;
-       fastlock_t        slock;
-       fastlock_t        rlock;
-       fastlock_t        cq_lock;
-       fastlock_t        cq_wait_lock;
--      fastlock_t        iomap_lock;
-+      fastlock_t        map_lock; /* acquire slock first if needed */
-+
-+      union {
-+              /* data stream */
-+              struct {
-+                      struct rdma_cm_id *cm_id;
-+
-+                      int               remote_sge;
-+                      struct rs_sge     remote_sgl;
-+                      struct rs_sge     remote_iomap;
-+
-+                      struct ibv_mr     *target_mr;
-+                      int               target_sge;
-+                      int               target_iomap_size;
-+                      void              *target_buffer_list;
-+                      volatile struct rs_sge    *target_sgl;
-+                      struct rs_iomap   *target_iomap;
-+              };
-+              /* datagram */
-+              struct {
-+                      struct ds_qp      *qp_array;
-+                      void              *dest_map;
-+                      struct ds_dest    *conn_dest;
-+                      struct pollfd     *fds;
-+                      nfds_t            nfds;
-+                      int               fd;
-+                      int               sbytes_needed;
-+              };
-+      };
-       int               opts;
-       long              fd_flags;
-@@ -186,7 +242,7 @@ struct rsocket {
-       int               cq_armed;
-       int               retries;
-       int               err;
--      int               index;
-+
-       int               ctrl_avail;
-       int               sqe_avail;
-       int               sbuf_bytes_avail;
-@@ -203,34 +259,37 @@ struct rsocket {
-       int               rbuf_offset;
-       int               rmsg_head;
-       int               rmsg_tail;
--      struct rs_msg     *rmsg;
--
--      int               remote_sge;
--      struct rs_sge     remote_sgl;
--      struct rs_sge     remote_iomap;
-+      union {
-+              struct rs_msg     *rmsg;
-+              struct ds_msg     *dmsg;
-+      };
-       struct rs_iomap_mr *remote_iomappings;
-       dlist_entry       iomap_list;
-       dlist_entry       iomap_queue;
-       int               iomap_pending;
--      struct ibv_mr    *target_mr;
--      int               target_sge;
--      int               target_iomap_size;
--      void             *target_buffer_list;
--      volatile struct rs_sge    *target_sgl;
--      struct rs_iomap  *target_iomap;
--
-       uint32_t          rbuf_size;
--      struct ibv_mr    *rmr;
-+      struct ibv_mr     *rmr;
-       uint8_t           *rbuf;
-       uint32_t          sbuf_size;
--      struct ibv_mr    *smr;
-+      struct ibv_mr     *smr;
-       struct ibv_sge    ssgl[2];
-       uint8_t           *sbuf;
- };
-+#define DS_UDP_TAG 0x5555555555555555ULL
-+
-+struct ds_udp_header {
-+      uint64_t          tag;
-+      uint8_t           version;
-+      uint8_t           sl;
-+      uint16_t          slid;
-+      uint32_t          qpn;  /* upper 8-bits reserved */
-+};
-+
-+
- static int rs_value_to_scale(int value, int bits)
- {
-       return value <= (1 << (bits - 1)) ?
-@@ -306,10 +365,10 @@ out:
-       pthread_mutex_unlock(&mut);
- }
--static int rs_insert(struct rsocket *rs)
-+static int rs_insert(struct rsocket *rs, index)
- {
-       pthread_mutex_lock(&mut);
--      rs->index = idm_set(&idm, rs->cm_id->channel->fd, rs);
-+      rs->index = idm_set(&idm, index, rs);
-       pthread_mutex_unlock(&mut);
-       return rs->index;
- }
-@@ -321,7 +380,7 @@ static void rs_remove(struct rsocket *rs)
-       pthread_mutex_unlock(&mut);
- }
--static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-+static struct rsocket *rs_alloc(struct rsocket *inherited_rs, int type)
- {
-       struct rsocket *rs;
-@@ -329,7 +388,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-       if (!rs)
-               return NULL;
-+      rs->type = type;
-       rs->index = -1;
-+      rs->fd = -1;
-       if (inherited_rs) {
-               rs->sbuf_size = inherited_rs->sbuf_size;
-               rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +412,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-       fastlock_init(&rs->rlock);
-       fastlock_init(&rs->cq_lock);
-       fastlock_init(&rs->cq_wait_lock);
--      fastlock_init(&rs->iomap_lock);
-+      fastlock_init(&rs->map_lock);
-       dlist_init(&rs->iomap_list);
-       dlist_init(&rs->iomap_queue);
-       return rs;
-@@ -581,7 +642,12 @@ static void rs_free(struct rsocket *rs)
-               rdma_destroy_id(rs->cm_id);
-       }
--      fastlock_destroy(&rs->iomap_lock);
-+      if (rs->fd >= 0)
-+              close(rs->fd);
-+      if (rs->fds)
-+              free(rs->fds);
-+
-+      fastlock_destroy(&rs->map_lock);
-       fastlock_destroy(&rs->cq_wait_lock);
-       fastlock_destroy(&rs->cq_lock);
-       fastlock_destroy(&rs->rlock);
-@@ -635,29 +701,55 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
-       rs->sseq_comp = ntohs(conn->credits);
- }
-+static int ds_init(struct rsocket *rs, int domain)
-+{
-+      rs->fd = socket(domain, SOCK_DGRAM, 0);
-+      if (rs->fd < 0)
-+              return rs->fd;
-+
-+      rs->fds = calloc(1, sizeof *fds);
-+      if (!rs->fds)
-+              return ERR(ENOMEM);
-+      rs->nfds = 1;
-+
-+      return 0;
-+}
-+
- int rsocket(int domain, int type, int protocol)
- {
-       struct rsocket *rs;
--      int ret;
-+      int index, ret;
-       if ((domain != PF_INET && domain != PF_INET6) ||
--          (type != SOCK_STREAM) || (protocol && protocol != IPPROTO_TCP))
-+          ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) ||
-+          (type == SOCK_STREAM && protocol && protocol != IPPROTO_TCP) ||
-+          (type == SOCK_DGRAM && protocol && protocol != IPPROTO_UDP))
-               return ERR(ENOTSUP);
-       rs_configure();
--      rs = rs_alloc(NULL);
-+      rs = rs_alloc(NULL, type);
-       if (!rs)
-               return ERR(ENOMEM);
--      ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
--      if (ret)
--              goto err;
-+      if (type == SOCK_STREAM) {
-+              ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
-+              if (ret)
-+                      goto err;
-+
-+              rs->cm_id->route.addr.src_addr.sa_family = domain;
-+              index = rs->cm_id->channel->fd;
-+      } else {
-+              ret = ds_init(rs, domain);
-+              if (ret)
-+                      goto err;
--      ret = rs_insert(rs);
-+              index = rs->fd;
-+      }
-+
-+      ret = rs_insert(rs, index);
-       if (ret < 0)
-               goto err;
--      rs->cm_id->route.addr.src_addr.sa_family = domain;
-       return rs->index;
- err:
-@@ -671,9 +763,13 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
-       int ret;
-       rs = idm_at(&idm, socket);
--      ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
--      if (!ret)
--              rs->state = rs_bound;
-+      if (rs->type == SOCK_STREAM) {
-+              ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr);
-+              if (!ret)
-+                      rs->state = rs_bound;
-+      } else {
-+              ret = bind(rs->fd, addr, addrlen);
-+      }
-       return ret;
- }
-@@ -709,7 +805,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
-       int ret;
-       rs = idm_at(&idm, socket);
--      new_rs = rs_alloc(rs);
-+      new_rs = rs_alloc(rs, rs->type);
-       if (!new_rs)
-               return ERR(ENOMEM);
-@@ -717,7 +813,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
-       if (ret)
-               goto err;
--      ret = rs_insert(new_rs);
-+      ret = rs_insert(new_rs, rs->cm_id->channel->fd);
-       if (ret < 0)
-               goto err;
-@@ -854,6 +950,66 @@ connected:
-       return ret;
- }
-+static int ds_compare_dest(const void *dst1, const void *dst2)
-+{
-+      const struct sockaddr *sa1, *sa2;
-+      size_t len;
-+
-+      sa1 = (const struct sockaddr *) dst1;
-+      sa2 = (const struct sockaddr *) dst2;
-+
-+      len = (sa1->sa_family == AF_INET6 && sa2->sa_family == AF_INET6) ?
-+            sizeof(struct sockaddr_in6) : sizeof(struct sockaddr);
-+      return memcmp(dst1, dst2, len);
-+}
-+
-+/* Caller must hold map_lock around accessing source address */
-+static union socket_addr *ds_get_src_addr(const struct sockaddr *dst_addr,
-+                                        socklen_t dst_len, socklen_t *src_len)
-+{
-+      static union socket_addr src_addr;
-+      int sock, ret;
-+
-+      sock = socket(dst_addr->sa.sa_family, SOCK_DGRAM, 0);
-+      if (sock < 0)
-+              return NULL;
-+
-+      ret = connect(sock, &dst_addr->sa, dst_len);
-+      if (ret)
-+              goto out;
-+
-+      *src_len = sizeof src_addr;
-+      ret = getsockname(sock, &src_addr.sa, src_len);
-+
-+out:
-+      close(sock);
-+      return ret ? NULL : &src_addr;
-+}
-+
-+static struct ds_qp *ds_get_qp(struct rsocket *rs,
-+                             const struct sockaddr *src_addr, socklen_t addrlen)
-+{
-+      union socket_addr *addr;
-+      socklen_t len;
-+
-+}
-+
-+static int ds_connect(struct rsocket *rs,
-+                    const struct sockaddr *dest_addr, socklen_t addrlen)
-+{
-+      struct ds_dest **dest;
-+
-+      fastlock_acquire(&rs->map_lock);
-+      dest = tfind(dest_addr, rs->dest_map, ds_compare_dest);
-+      if (dest) {
-+              rs->conn_dest = *dest;
-+              goto out;
-+      }
-+out:
-+      fastlock_release(&rs->map_lock);
-+      return 0;
-+}
-+
- int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- {
-       struct rsocket *rs;
-@@ -902,6 +1058,25 @@ static int rs_post_write(struct rsocket *rs,
-       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-+static int ds_post_send(struct rsocket *rs,
-+                      struct ibv_sge *sgl, int nsge,
-+                      uint64_t wr_id, int flags)
-+{
-+      struct ibv_send_wr wr, *bad;
-+
-+      wr.wr_id = wr_id;
-+      wr.next = NULL;
-+      wr.sg_list = sgl;
-+      wr.num_sge = nsge;
-+      wr.opcode = IBV_WR_SEND;
-+      wr.send_flags = flags;
-+      wr.wr.ud.ah = rs->conn_dest->ah;
-+      wr.wr.ud.remote_qpn = rs->conn_dest->qpn;
-+      wr.wr.ud.remote_qkey = RDMA_UDP_QKEY;
-+
-+      return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad));
-+}
-+
- /*
-  * Update target SGE before sending data.  Otherwise the remote side may
-  * update the entry before we do.
-@@ -932,6 +1107,15 @@ static int rs_write_data(struct rsocket *rs,
-                                flags, addr, rkey);
- }
-+static int ds_send_data(struct rsocket *rs,
-+                      struct ibv_sge *sgl, int nsge,
-+                      uint32_t length, int flags)
-+{
-+      rs->sqe_avail--;
-+      rs->sbuf_bytes_avail -= length;
-+      return ds_post_send(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), flags);
-+}
-+
- static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
- {
-@@ -1218,6 +1402,11 @@ static int rs_can_send(struct rsocket *rs)
-              (rs->target_sgl[rs->target_sge].length != 0);
- }
-+static int ds_can_send(struct rsocket *rs)
-+{
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= rs->sbytes_needed);
-+}
-+
- static int rs_conn_can_send(struct rsocket *rs)
- {
-       return rs_can_send(rs) || !(rs->state & rs_connect_wr);
-@@ -1390,7 +1579,7 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
-       struct rs_iomap iom;
-       int ret;
--      fastlock_acquire(&rs->iomap_lock);
-+      fastlock_acquire(&rs->map_lock);
-       while (!dlist_empty(&rs->iomap_queue)) {
-               if (!rs_can_send(rs)) {
-                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-@@ -1446,10 +1635,94 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
-       }
-       rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
--      fastlock_release(&rs->iomap_lock);
-+      fastlock_release(&rs->map_lock);
-       return ret;
- }
-+static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
-+                          int iovcnt, int flags)
-+{
-+      struct ds_udp_header hdr;
-+      struct msghdr msg;
-+      struct iovec miov[4];
-+      struct ds_qp *qp;
-+
-+      if (iovcnt > 4)
-+              return ERR(ENOTSUP);
-+
-+      qp = (rs->conn_dest) ? rs->conn_dest->qp : NULL;
-+      hdr.tag = htonll(DS_UDP_TAG);
-+      hdr.version = 1;
-+      if (qp) {
-+              hdr.sl = qp->sl;
-+              hdr.slid = htons(qp->lid);
-+              hdr.qpn = htonl(qp->cm_id->qp->qp_num & 0xFFFFFF);
-+      } else {
-+              hdr.sl = 0;
-+              hdr.slid = 0;
-+              hdr.qpn = 0;
-+      }
-+
-+      miov[0].iov_base = &hdr;
-+      miov[0].iov_len = sizeof hdr;
-+      memcpy(&miov[1], iov, sizeof *iov * iovcnt);
-+
-+      memset(&msg, 0, sizeof msg);
-+      /* TODO: specify name if needed */
-+      msg.msg_iov = miov;
-+      msg.msg_iovlen = iovcnt + 1;
-+      return sendmsg(rs->fd, msg, flags);
-+}
-+
-+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, int flags)
-+{
-+      struct iovec iov;
-+      iov.iov_base = buf;
-+      iov_iov_len = len;
-+      return ds_sendv_udp(s, &iov, 1, flags);
-+}
-+
-+static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags)
-+{
-+      struct ibv_sge sge;
-+      int ret = 0;
-+
-+      if (!rs->conn_dest || !rs->conn_dest->ah)
-+              return ds_send_udp(rs, buf, len, flags);
-+
-+      rs->sbytes_needed = len;
-+      if (!ds_can_send(rs)) {
-+              ret = rs_get_comp(rs, 1, ds_can_send);
-+              if (ret)
-+                      return ds_send_udp(rs, buf, len, flags);
-+      }
-+
-+      if (len <= rs->sq_inline) {
-+              sge.addr = (uintptr_t) buf;
-+              sge.length = len;
-+              sge.lkey = 0;
-+              ret = ds_send_data(rs, &sge, 1, len, IBV_SEND_INLINE);
-+      } else if (len <= rs_sbuf_left(rs)) {
-+              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, len);
-+              rs->ssgl[0].length = len;
-+              ret = ds_send_data(rs, rs->ssgl, 1, len, 0);
-+              if (len < rs_sbuf_left(rs))
-+                      rs->ssgl[0].addr += len;
-+              else
-+                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+      } else {
-+              rs->ssgl[0].length = rs_sbuf_left(rs);
-+              memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
-+                      rs->ssgl[0].length);
-+              rs->ssgl[1].length = len - rs->ssgl[0].length;
-+              memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-+              ret = ds_send_data(rs, rs->ssgl, 2, len, 0);
-+              rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+      }
-+
-+      return ret ? ret : len;
-+}
-+
- /*
-  * We overlap sending the data, by posting a small work request immediately,
-  * then increasing the size of the send on each iteration.
-@@ -1463,6 +1736,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-       int ret = 0;
-       rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM) {
-+              fastlock_acquire(&rs->slock);
-+              ret = dsend(rs, buf, len, flags);
-+              fastlock_release(&rs->slock);
-+              return ret;
-+      }
-+
-       if (rs->state & rs_opening) {
-               ret = rs_do_connect(rs);
-               if (ret) {
-@@ -1537,10 +1817,21 @@ out:
- ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
-               const struct sockaddr *dest_addr, socklen_t addrlen)
- {
--      if (dest_addr || addrlen)
--              return ERR(EISCONN);
-+      struct rsocket *rs;
-+
-+      rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_STREAM) {
-+              if (dest_addr || addrlen)
-+                      return ERR(EISCONN);
-+
-+              return rsend(socket, buf, len, flags);
-+      }
--      return rsend(socket, buf, len, flags);
-+      fastlock_acquire(&rs->slock);
-+      ds_connect(rs, dest_addr, addrlen);
-+      ret = dsend(rs, buf, len, flags);
-+      fastlock_release(&rs->slock);
-+      return ret;
- }
- static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1652,7 +1943,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
-       if (msg->msg_control && msg->msg_controllen)
-               return ERR(ENOTSUP);
--      return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, msg->msg_flags);
-+      return rsendv(socket, msg->msg_iov, (int) msg->msg_iovlen, flags);
- }
- ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -2017,8 +2308,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
-       struct rsocket *rs;
-       rs = idm_at(&idm, socket);
--      rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
--      return 0;
-+      if (rs->type == SOCK_STREAM) {
-+              rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen);
-+              return 0;
-+      } else {
-+              return getpeername(rs->fs, addr, addrlen);
-+      }
- }
- int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +2321,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-       struct rsocket *rs;
-       rs = idm_at(&idm, socket);
--      rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
--      return 0;
-+      if (rs->type == SOCK_STREAM) {
-+              rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen);
-+              return 0;
-+      } else {
-+              return getsockname(rs->fd, addr, addrlen);
-+      }
- }
- int rsetsockopt(int socket, int level, int optname,
-@@ -2039,6 +2338,12 @@ int rsetsockopt(int socket, int level, int optname,
-       ret = ERR(ENOTSUP);
-       rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM && level != SOL_RDMA) {
-+              ret = setsockopt(rs->fd, optname, optval, optlen);
-+              if (ret)
-+                      return ret;
-+      }
-+
-       switch (level) {
-       case SOL_SOCKET:
-               opts = &rs->so_opts;
-@@ -2156,6 +2461,9 @@ int rgetsockopt(int socket, int level, int optname,
-       int ret = 0;
-       rs = idm_at(&idm, socket);
-+      if (rs->type == SOCK_DGRAM && level != SOL_RDMA)
-+              return getsockopt(rs->fd, level, optname, optval, optlen);
-+
-       switch (level) {
-       case SOL_SOCKET:
-               switch (optname) {
-@@ -2314,7 +2622,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
-       if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
-               return ERR(EINVAL);
--      fastlock_acquire(&rs->iomap_lock);
-+      fastlock_acquire(&rs->map_lock);
-       if (prot & PROT_WRITE) {
-               iomr = rs_get_iomap_mr(rs);
-               access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +2656,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
-               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
-       }
- out:
--      fastlock_release(&rs->iomap_lock);
-+      fastlock_release(&rs->map_lock);
-       return offset;
- }
-@@ -2360,7 +2668,7 @@ int riounmap(int socket, void *buf, size_t len)
-       int ret = 0;
-       rs = idm_at(&idm, socket);
--      fastlock_acquire(&rs->iomap_lock);
-+      fastlock_acquire(&rs->map_lock);
-       for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
-            entry = entry->next) {
-@@ -2381,7 +2689,7 @@ int riounmap(int socket, void *buf, size_t len)
-       }
-       ret = ERR(EINVAL);
- out:
--      fastlock_release(&rs->iomap_lock);
-+      fastlock_release(&rs->map_lock);
-       return ret;
- }
-@@ -2475,3 +2783,24 @@ out:
-       return (ret && left == count) ? ret : count - left;
- }
-+
-+ssize_t urecvfrom(int socket, void *buf, size_t len, int flags,
-+                struct sockaddr *src_addr, socklen_t *addrlen)
-+{
-+      int ret;
-+
-+      ret = rrecv(socket, buf, len, flags);
-+      if (ret > 0 && src_addr)
-+              rgetpeername(socket, src_addr, addrlen);
-+
-+      return ret;
-+}
-+
-+ssize_t usendto(int socket, const void *buf, size_t len, int flags,
-+              const struct sockaddr *dest_addr, socklen_t addrlen)
-+{
-+      if (dest_addr || addrlen)
-+              return ERR(EISCONN);
-+
-+      return usend(socket, buf, len, flags);
-+}