]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add direct data placement
authorSean Hefty <sean.hefty@intel.com>
Sun, 21 Oct 2012 21:16:03 +0000 (14:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sun, 21 Oct 2012 21:16:03 +0000 (14:16 -0700)
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
docs/rsocket
include/rdma/rsocket.h
man/rsocket.7
src/indexer.h
src/librdmacm.map
src/rsocket.c

index 5399f6cbea686d2a196561c264c11d3eaadcae25..1484f65b7b5a8ee67e8a7d3b551363d924d64f95 100644 (file)
@@ -110,11 +110,11 @@ Bits    Message             Meaning of
 31:29    Type               Bits 28:0
 000    Data Transfer     bytes transfered
 001    reserved
-010    reserved
+010    reserved - used internally, available for future use
 011    reserved
 100    Credit Update     received credits granted
 101    reserved
-110    reserved
+110    Iomap Updated     index of updated entry
 111    Control           control message type
 
 Data Transfer
@@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in use.  This is done
 by tracking when a receive buffer referenced by a remote target SGL has been
 filled.
 
+Iomap Updated
+Used to indicate that a remote iomap entry was updated.  The updated entry
+contains the offset value associated with an address, length, and rkey.  Once
+an iomap has been updated, the local application can issue directed IO
+transfers against the corresponding remote buffer.
+
 Control Message - DISCONNECT
 Indicates that the rsocket connection has been fully disconnected and will no
 longer send or receive data.  Data received before the disconnect message was
@@ -142,3 +148,44 @@ Control Message - SHUTDOWN
 Indicates that the remote rsocket has shutdown the send side of its
 connection.  The recipient of a shutdown message will no longer accept
 incoming data, but may still transfer outbound data.
+
+
+Iomapped Buffers
+----------------
+Rsockets allows for zero-copy transfers using what it refers to as iomapped
+buffers.  Iomapping and direct data placement (zero-copy) transfers are done
+using rsocket specific extensions.  The general operation is similar to
+that used for normal data transfers described above.
+
+   host A                   host B
+                          remote iomap                      
+  target iomap  <-----------  [  ]
+     [  ] ------
+     [  ] --    ------  iomapped buffer(s)         
+            --        ----->  +--+
+              --              |  |
+                --            |  |
+                  --          |  |
+                    --        +--+
+                      --       
+                        --->  +--+
+                              |  |
+                              |  |
+                              +--+
+
+The remote iomap contains the address, size, and rkey of the target iomap.  As
+the applicaton maps buffers host B to a given rsocket, rsockets will issue an RDMA
+write against one of the entries in the target iomap on host A.  The
+updated entry will reference an available iomapped buffer.  Immediate data
+included with the RDMA write will indicate to host A that a target iomap
+has been updated.
+
+When host A wishes to transfer directly into an iomapped buffer, it will check
+its target iomap for an offset corresponding to a remotely mapped buffer.  A
+matching iomap entry will contain the address, size, and rkey of the target
+buffer on host B.  Host A will then issue an RDMA operation against the
+registered remote data buffer.
+
+From host A's perspective, the transfer appears as a normal send/write
+operation, with the data stream redirected directly into the receiving
+application's buffer.
index 65feda96a0ff93e4371759d881b420d337fb2f60..f220c134f306701971e32f72a42637a47620d55a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011 Intel Corporation.  All rights reserved.
+ * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
  *
  * This software is available to you under a choice of one of two
  * licenses.  You may choose to be licensed under the terms of the GNU
@@ -39,6 +39,7 @@
 #include <errno.h>
 #include <poll.h>
 #include <sys/select.h>
+#include <sys/mman.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
 enum {
        RDMA_SQSIZE,
        RDMA_RQSIZE,
-       RDMA_INLINE
+       RDMA_INLINE,
+       RDMA_IOMAPSIZE
 };
 
 int rsetsockopt(int socket, int level, int optname,
@@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname,
                void *optval, socklen_t *optlen);
 int rfcntl(int socket, int cmd, ... /* arg */ );
 
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset);
+int riounmap(int socket, void *buf, size_t len);
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags);
+
 #ifdef __cplusplus
 }
 #endif
index 2ed5ca43b2f6a0917c4b0a18d2e7032c03c16648..5ecebc2e1a58154ed32571600e53cf83a6f096b0 100644 (file)
@@ -6,7 +6,7 @@ rsocket \- RDMA socket API
 .SH "DESCRIPTION"
 RDMA socket API and protocol
 .SH "NOTES"
-rsockets is a protocol over RDMA that supports a socket-level API
+Rsockets is a protocol over RDMA that supports a socket-level API
 for applications.  rsocket APIs are intended to match the behavior
 of corresponding socket calls, except where noted.  rsocket
 functions match the name and function signature of socket calls,
@@ -30,7 +30,7 @@ rgetpeername, rgetsockname
 .P
 rsetsockopt, rgetsockopt, rfcntl
 .P
-Functions take the same parameters as that use for sockets.  The
+Functions take the same parameters as that used for sockets.  The
 follow capabilities and flags are supported at this time:
 .P
 PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG
@@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF
 .P
 O_NONBLOCK
 .P
+Rsockets provides extensions beyond normal socket routines that
+allow for direct placement of data into an application's buffer.
+This is also known as zero-copy support, since data is sent and
+received directly, bypassing copies into network controlled buffers.
+The following calls and options support direct data placement.
+.P
+riomap, riounmap, riowrite
+.TP
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+.TP
+Riomap registers an application buffer with the RDMA hardware
+associated with an rsocket.  The buffer is registered either for
+local only access (PROT_NONE) or for remote write access (PROT_WRITE).
+When registered for remote access, the buffer is mapped to a given
+offset.  The offset is either provided by the user, or if the user
+selects -1 for the offset, rsockets selects one.  The remote peer may
+access an iomapped buffer directly by specifying the correct offset.
+The mapping is not guaranteed to be available until after the remote
+peer receives a data transfer initiated after riomap has completed.
+.P
+riounmap
+.TP
+int riounmap(int socket, void *buf, size_t len)
+.TP
+Riounmap removes the mapping between a buffer and an rsocket.
+.P
+riowrite
+.TP
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+.TP
+Riowrite allows an application to transfer data over an rsocket
+directly into a remotely iomapped buffer.  The remote buffer is specified
+through an offset parameter, which corresponds to a remote iomapped buffer.
+From the sender's perspective, riowrite behaves similar to rwrite.  From
+a receiver's view, riowrite transfers are silently redirected into a pre-
+determined data buffer.  Data is received automatically, and the receiver
+is not informed of the transfer.  However, iowrite data is still considered
+part of the data stream, such that iowrite data will be written before a
+subsequent transfer is received.  A  message sent immediately after initiating
+an iowrite may be used to notify the receiver of the iowrite.
+.P
 In addition to standard socket options, rsockets supports options
 specific to RDMA devices and protocols.  These options are accessible
 through rsetsockopt using SOL_RDMA option level.
@@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue.
 RDMA_RQSIZE - Integer size of the underlying receive queue.
 .TP
 RDMA_INLINE - Integer size of inline data.
+.TP
+RDMA_IOMAPSIZE - Integer number of remote IO mappings supported
 .P
 Note that rsockets fd's cannot be passed into non-rsocket calls.  For
 applications which must mix rsocket fd's with standard socket fd's or
@@ -84,6 +127,8 @@ rqsize_default - default size of receive queue
 .P
 inline_default - default size of inline data
 .P
+iomap_size - default size of remote iomapping table
+.P
 If configuration files are not available, rsockets uses internal defaults.
 .SH "SEE ALSO"
 rdma_cm(7)
index 26e7f9866a180b0692fde374b259e32f0b23b7ca..0c5f3882673f696b4b36ee87b6325aa9095bbba7 100644 (file)
@@ -31,6 +31,9 @@
  *
  */
 
+#if !defined(INDEXER_H)
+#define INDEXER_H
+
 #if HAVE_CONFIG_H
 #  include <config.h>
 #endif /* HAVE_CONFIG_H */
@@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index)
        return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ?
                idm_at(idm, index) : NULL;
 }
+
+typedef struct _dlist_entry {
+       struct _dlist_entry     *next;
+       struct _dlist_entry     *prev;
+}      dlist_entry;
+
+static inline void dlist_init(dlist_entry *head)
+{
+       head->next = head;
+       head->prev = head;
+}
+
+static inline int dlist_empty(dlist_entry *head)
+{
+       return head->next == head;
+}
+
+static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head)
+{
+       item->next = head->next;
+       item->prev = head;
+       head->next->prev = item;
+       head->next = item;
+}
+
+static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head)
+{
+       dlist_insert_after(item, head->prev);
+}
+
+#define dlist_insert_head dlist_insert_after
+#define dlist_insert_tail dlist_insert_before
+
+static inline void dlist_remove(dlist_entry *item)
+{
+       item->prev->next = item->next;
+       item->next->prev = item->prev;
+}
+
+#endif /* INDEXER_H */
index 5c317a3e78704716f59eb4fe6df495a8d95da006..d5ef7363ca76caf088b081d8c6374f7daf69d724 100644 (file)
@@ -63,5 +63,8 @@ RDMACM_1.0 {
                rselect;
                rdma_get_src_port;
                rdma_get_dst_port;
+               riomap;
+               riounmap;
+               riowrite;
        local: *;
 };
index cc5effe1aba45a9b560cf786c5272055ee8e51d1..c534c61033e5c9ecf6e34c13b30b5a56159cabca 100644 (file)
@@ -55,6 +55,7 @@
 
 #define RS_OLAP_START_SIZE 2048
 #define RS_MAX_TRANSFER 65536
+#define RS_SNDLOWAT 64
 #define RS_QP_MAX_SIZE 0xFFFE
 #define RS_QP_CTRL_SIZE 4
 #define RS_CONN_RETRIES 6
@@ -62,6 +63,7 @@
 static struct index_map idm;
 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
 
+static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
 static uint16_t def_sqsize = 384;
 static uint16_t def_rqsize = 384;
@@ -76,19 +78,22 @@ static uint32_t polling_time = 10;
  * bit 29: more data, 0 - end of transfer, 1 - more data available
  *
  * for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transfered
  * for control messages:
+ * SGL, CTRL
  * bits [28-0]: receive credits granted
+ * IOMAP_SGL
+ * bits [28-16]: reserved, bits [15-0]: index
  */
 
 enum {
        RS_OP_DATA,
        RS_OP_RSVD_DATA_MORE,
-       RS_OP_RSVD_DRA,
+       RS_OP_WRITE, /* opcode is not transmitted over the network */
        RS_OP_RSVD_DRA_MORE,
        RS_OP_SGL,
        RS_OP_RSVD,
-       RS_OP_RSVD_DRA_SGL,
+       RS_OP_IOMAP_SGL,
        RS_OP_CTRL
 };
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
@@ -111,15 +116,30 @@ struct rs_sge {
        uint32_t length;
 };
 
-#define RS_MIN_INLINE    (sizeof(struct rs_sge))
-#define rs_host_is_net() (1 == htonl(1))
-#define RS_CONN_FLAG_NET 1
+struct rs_iomap {
+       uint64_t offset;
+       struct rs_sge sge;
+};
+
+struct rs_iomap_mr {
+       uint64_t offset;
+       struct ibv_mr *mr;
+       dlist_entry entry;
+       atomic_t refcnt;
+       int index;      /* -1 if mapping is local and not in iomap_list */
+};
+
+#define RS_MIN_INLINE      (sizeof(struct rs_sge))
+#define rs_host_is_net()   (1 == htonl(1))
+#define RS_CONN_FLAG_NET   (1 << 0)
+#define RS_CONN_FLAG_IOMAP (1 << 1)
 
 struct rs_conn_data {
        uint8_t           version;
        uint8_t           flags;
        uint16_t          credits;
-       uint32_t          reserved2;
+       uint8_t           reserved[3];
+       uint8_t           target_iomap_size;
        struct rs_sge     target_sgl;
        struct rs_sge     data_buf;
 };
@@ -155,6 +175,7 @@ struct rsocket {
        fastlock_t        rlock;
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
+       fastlock_t        iomap_lock;
 
        int               opts;
        long              fd_flags;
@@ -186,10 +207,19 @@ struct rsocket {
 
        int               remote_sge;
        struct rs_sge     remote_sgl;
+       struct rs_sge     remote_iomap;
+
+       struct rs_iomap_mr *remote_iomappings;
+       dlist_entry       iomap_list;
+       dlist_entry       iomap_queue;
+       int               iomap_pending;
 
        struct ibv_mr    *target_mr;
        int               target_sge;
-       volatile struct rs_sge    target_sgl[RS_SGL_SIZE];
+       int               target_iomap_size;
+       void             *target_buffer_list;
+       volatile struct rs_sge    *target_sgl;
+       struct rs_iomap  *target_iomap;
 
        uint32_t          rbuf_size;
        struct ibv_mr    *rmr;
@@ -201,6 +231,18 @@ struct rsocket {
        uint8_t           *sbuf;
 };
 
+static int rs_value_to_scale(int value, int bits)
+{
+       return value <= (1 << (bits - 1)) ?
+              value : (1 << (bits - 1)) | (value >> bits);
+}
+
+static int rs_scale_to_value(int value, int bits)
+{
+       return value <= (1 << (bits - 1)) ?
+              value : (value & ~(1 << (bits - 1))) << bits;
+}
+
 void rs_configure(void)
 {
        FILE *f;
@@ -247,9 +289,17 @@ void rs_configure(void)
        if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
                fscanf(f, "%u", &def_wmem);
                fclose(f);
+               if (def_wmem < RS_SNDLOWAT)
+                       def_wmem = RS_SNDLOWAT << 1;
+       }
+
+       if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
+               fscanf(f, "%hu", &def_iomap_size);
+               fclose(f);
 
-               if (def_wmem < 1)
-                       def_wmem = 1;
+               /* round to supported values */
+               def_iomap_size = (uint8_t) rs_value_to_scale(
+                       (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
        }
        init = 1;
 out:
@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = inherited_rs->sq_size;
                rs->rq_size = inherited_rs->rq_size;
                rs->ctrl_avail = inherited_rs->ctrl_avail;
+               rs->target_iomap_size = inherited_rs->target_iomap_size;
        } else {
                rs->sbuf_size = def_wmem;
                rs->rbuf_size = def_mem;
@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = def_sqsize;
                rs->rq_size = def_rqsize;
                rs->ctrl_avail = RS_QP_CTRL_SIZE;
+               rs->target_iomap_size = def_iomap_size;
        }
        fastlock_init(&rs->slock);
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
+       fastlock_init(&rs->iomap_lock);
+       dlist_init(&rs->iomap_list);
+       dlist_init(&rs->iomap_queue);
        return rs;
 }
 
@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
+       size_t len;
+
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
        if (!rs->rmsg)
                return -1;
@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
        if (!rs->smr)
                return -1;
 
-       rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
-                                      sizeof(rs->target_sgl));
+       len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
+             sizeof(*rs->target_iomap) * rs->target_iomap_size;
+       rs->target_buffer_list = malloc(len);
+       if (!rs->target_buffer_list)
+               return -1;
+
+       rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
                return -1;
 
+       memset(rs->target_buffer_list, 0, len);
+       rs->target_sgl = rs->target_buffer_list;
+       if (rs->target_iomap_size)
+               rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
+
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
                return -1;
@@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs)
        return 0;
 }
 
+/*
+static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...)
+{
+       TODO: write me
+}
+*/
+
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+       if (atomic_dec(&iomr->refcnt))
+               return;
+
+       dlist_remove(&iomr->entry);
+       ibv_dereg_mr(iomr->mr);
+       if (iomr->index >= 0)
+               iomr->mr = NULL;
+       else
+               free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+       struct rs_iomap_mr *iomr;
+
+       while (!dlist_empty(&rs->iomap_list)) {
+               iomr = container_of(rs->iomap_list.next,
+                                   struct rs_iomap_mr, entry);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+       }
+       while (!dlist_empty(&rs->iomap_queue)) {
+               iomr = container_of(rs->iomap_queue.next,
+                                   struct rs_iomap_mr, entry);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+       }
+}
+
 static void rs_free(struct rsocket *rs)
 {
        if (rs->index >= 0)
@@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs)
                free(rs->rbuf);
        }
 
-       if (rs->target_mr)
-               rdma_dereg_mr(rs->target_mr);
+       if (rs->target_buffer_list) {
+               if (rs->target_mr)
+                       rdma_dereg_mr(rs->target_mr);
+               free(rs->target_buffer_list);
+       }
 
        if (rs->cm_id) {
+               rs_free_iomappings(rs);
                if (rs->cm_id->qp)
                        rdma_destroy_qp(rs->cm_id);
                rdma_destroy_id(rs->cm_id);
        }
 
+       fastlock_destroy(&rs->iomap_lock);
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
@@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
                             struct rs_conn_data *conn)
 {
        conn->version = 1;
-       conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
+       conn->flags = RS_CONN_FLAG_IOMAP |
+                     (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
        conn->credits = htons(rs->rq_size);
-       conn->reserved2 = 0;
+       memset(conn->reserved, 0, sizeof conn->reserved);
+       conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
 
        conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
        conn->target_sgl.length = htonl(RS_SGL_SIZE);
@@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
            (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
                rs->opts = RS_OPT_SWAP_SGL;
 
+       if (conn->flags & RS_CONN_FLAG_IOMAP) {
+               rs->remote_iomap.addr = rs->remote_sgl.addr +
+                                       sizeof(rs->remote_sgl) * rs->remote_sgl.length;
+               rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
+               rs->remote_iomap.key = rs->remote_sgl.key;
+       }
+
        rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
        rs->target_sgl[0].length = ntohl(conn->data_buf.length);
        rs->target_sgl[0].key = ntohl(conn->data_buf.key);
@@ -753,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return rs_do_connect(rs);
 }
 
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
                         uint64_t addr, uint32_t rkey)
@@ -773,6 +890,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_write(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint64_t wr_id, int flags,
+                        uint64_t addr, uint32_t rkey)
+{
+       struct ibv_send_wr wr, *bad;
+
+       wr.wr_id = wr_id;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+       wr.opcode = IBV_WR_RDMA_WRITE;
+       wr.send_flags = flags;
+       wr.wr.rdma.remote_addr = addr;
+       wr.wr.rdma.rkey = rkey;
+
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
 /*
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
@@ -799,8 +935,36 @@ static int rs_write_data(struct rsocket *rs,
                        rs->target_sge = 0;
        }
 
-       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-                            flags, addr, rkey);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+                                flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+{
+       uint64_t addr;
+
+       rs->sseq_no++;
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= length;
+
+       addr = iom->sge.addr + offset - iom->offset;
+       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+                            flags, addr, iom->sge.key);
+}
+
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+                         struct ibv_sge *sgl, int nsge, int flags)
+{
+       uint64_t addr;
+
+       rs->sseq_no++;
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+       addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+                                flags, addr, rs->remote_iomap.key);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -831,12 +995,12 @@ static void rs_send_credits(struct rsocket *rs)
                ibsge.lkey = 0;
                ibsge.length = sizeof(sge);
 
-               rs_post_write(rs, &ibsge, 1,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                             IBV_SEND_INLINE,
-                             rs->remote_sgl.addr +
-                             rs->remote_sge * sizeof(struct rs_sge),
-                             rs->remote_sgl.key);
+               rs_post_write_msg(rs, &ibsge, 1,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 IBV_SEND_INLINE,
+                                 rs->remote_sgl.addr +
+                                 rs->remote_sge * sizeof(struct rs_sge),
+                                 rs->remote_sgl.key);
 
                rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
                rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -845,8 +1009,9 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write(rs, NULL, 0,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+               rs_post_write_msg(rs, NULL, 0,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 0, 0, 0);
        }
 }
 
@@ -880,6 +1045,9 @@ static int rs_poll_cq(struct rsocket *rs)
                        case RS_OP_SGL:
                                rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               /* The iomap was updated, that's nice to know. */
+                               break;
                        case RS_OP_CTRL:
                                if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
@@ -888,6 +1056,9 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state &= ~rs_connect_rd;
                                }
                                break;
+                       case RS_OP_WRITE:
+                               /* We really shouldn't be here. */
+                               break;
                        default:
                                rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
                                rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
@@ -905,6 +1076,10 @@ static int rs_poll_cq(struct rsocket *rs)
                                if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
                                        rs->state = rs_disconnected;
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               rs->sqe_avail++;
+                               rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+                               break;
                        default:
                                rs->sqe_avail++;
                                rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
@@ -1046,7 +1221,7 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
-       return rs->sqe_avail && rs->sbuf_bytes_avail &&
+       return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
               (rs->sseq_no != rs->sseq_comp) &&
               (rs->target_sgl[rs->target_sge].length != 0);
 }
@@ -1216,6 +1391,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
        return rrecvv(socket, iov, iovcnt, 0);
 }
 
+static int rs_send_iomaps(struct rsocket *rs, int flags)
+{
+       struct rs_iomap_mr *iomr;
+       struct ibv_sge sge;
+       struct rs_iomap iom;
+       int ret;
+
+       fastlock_acquire(&rs->iomap_lock);
+       while (!dlist_empty(&rs->iomap_queue)) {
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+                       if (!(rs->state & rs_connect_wr)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+               }
+
+               iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
+               if (!(rs->opts & RS_OPT_SWAP_SGL)) {
+                       iom.offset = iomr->offset;
+                       iom.sge.addr = (uintptr_t) iomr->mr->addr;
+                       iom.sge.length = iomr->mr->length;
+                       iom.sge.key = iomr->mr->rkey;
+               } else {
+                       iom.offset = bswap_64(iomr->offset);
+                       iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
+                       iom.sge.length = bswap_32(iomr->mr->length);
+                       iom.sge.key = bswap_32(iomr->mr->rkey);
+               }
+
+               if (rs->sq_inline >= sizeof iom) {
+                       sge.addr = (uintptr_t) &iom;
+                       sge.length = sizeof iom;
+                       sge.lkey = 0;
+                       ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
+               } else if (rs_sbuf_left(rs) >= sizeof iom) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
+                       rs->ssgl[0].length = sizeof iom;
+                       ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
+                       if (rs_sbuf_left(rs) > sizeof iom)
+                               rs->ssgl[0].addr += sizeof iom;
+                       else
+                               rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+               } else {
+                       rs->ssgl[0].length = rs_sbuf_left(rs);
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
+                              rs->ssgl[1].length);
+                       ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               dlist_remove(&iomr->entry);
+               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+               if (ret)
+                       break;
+       }
+
+       rs->iomap_pending = dlist_empty(&rs->iomap_queue);
+       fastlock_release(&rs->iomap_lock);
+       return ret;
+}
+
 /*
  * We overlap sending the data, by posting a small work request immediately,
  * then increasing the size of the send on each iteration.
@@ -1224,7 +1466,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
 {
        struct rsocket *rs;
        struct ibv_sge sge;
-       size_t left;
+       size_t left = len;
        uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
        int ret = 0;
 
@@ -1239,7 +1481,12 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        }
 
        fastlock_acquire(&rs->slock);
-       for (left = len; left; left -= xfer_size, buf += xfer_size) {
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
                                          rs_conn_can_send);
@@ -1289,6 +1536,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                if (ret)
                        break;
        }
+out:
        fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
@@ -1345,9 +1593,15 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        len = iov[0].iov_len;
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
+       left = len;
 
        fastlock_acquire(&rs->slock);
-       for (left = len; left; left -= xfer_size) {
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
                                          rs_conn_can_send);
@@ -1395,6 +1649,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                if (ret)
                        break;
        }
+out:
        fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
@@ -1725,8 +1980,8 @@ int rshutdown(int socket, int how)
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
                        rs->ctrl_avail--;
-                       ret = rs_post_write(rs, NULL, 0,
-                                           rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+                       ret = rs_post_write_msg(rs, NULL, 0,
+                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
                }
        }
 
@@ -1814,6 +2069,8 @@ int rsetsockopt(int socket, int level, int optname,
                case SO_SNDBUF:
                        if (!rs->sbuf)
                                rs->sbuf_size = (*(uint32_t *) optval) << 1;
+                       if (rs->sbuf_size < RS_SNDLOWAT)
+                               rs->sbuf_size = RS_SNDLOWAT << 1;
                        ret = 0;
                        break;
                case SO_LINGER:
@@ -1878,6 +2135,10 @@ int rsetsockopt(int socket, int level, int optname,
                        if (rs->sq_inline < RS_MIN_INLINE)
                                rs->sq_inline = RS_MIN_INLINE;
                        break;
+               case RDMA_IOMAPSIZE:
+                       rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+                               (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
+                       break;
                default:
                        break;
                }
@@ -1979,6 +2240,10 @@ int rgetsockopt(int socket, int level, int optname,
                        *((int *) optval) = rs->sq_inline;
                        *optlen = sizeof(int);
                        break;
+               case RDMA_IOMAPSIZE:
+                       *((int *) optval) = rs->target_iomap_size;
+                       *optlen = sizeof(int);
+                       break;
                default:
                        ret = ENOTSUP;
                        break;
@@ -2020,3 +2285,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
        va_end(args);
        return ret;
 }
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+       int i;
+
+       if (!rs->remote_iomappings) {
+               rs->remote_iomappings = calloc(rs->remote_iomap.length,
+                                              sizeof(*rs->remote_iomappings));
+               if (!rs->remote_iomappings)
+                       return NULL;
+
+               for (i = 0; i < rs->remote_iomap.length; i++)
+                       rs->remote_iomappings[i].index = i;
+       }
+
+       for (i = 0; i < rs->remote_iomap.length; i++) {
+               if (!rs->remote_iomappings[i].mr)
+                       return &rs->remote_iomappings[i];
+       }
+       return NULL;
+}
+
+/*
+ * If an offset is given, we map to it.  If offset is -1, then we map the
+ * offset to the address of buf.  We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+{
+       struct rsocket *rs;
+       struct rs_iomap_mr *iomr;
+       int access = IBV_ACCESS_LOCAL_WRITE;
+
+       rs = idm_at(&idm, socket);
+       if ((rs->state != rs_connect_rdwr) || (prot & ~(PROT_WRITE | PROT_NONE)))
+               return ERR(EINVAL);
+
+       fastlock_acquire(&rs->iomap_lock);
+       if (prot & PROT_WRITE) {
+               iomr = rs_get_iomap_mr(rs);
+               access |= IBV_ACCESS_REMOTE_WRITE;
+       } else {
+               iomr = calloc(1, sizeof *iomr);
+               iomr->index = -1;
+       }
+       if (!iomr) {
+               offset = ERR(ENOMEM);
+               goto out;
+       }
+
+       iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+       if (!iomr->mr) {
+               if (iomr->index < 0)
+                       free(iomr);
+               offset = -1;
+               goto out;
+       }
+
+       if (offset == -1)
+               offset = (uintptr_t) buf;
+       iomr->offset = offset;
+       atomic_init(&iomr->refcnt);
+       atomic_set(&iomr->refcnt, 1);
+
+       if (iomr->index >= 0) {
+               dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+               rs->iomap_pending = 1;
+       } else {
+               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+       }
+out:
+       fastlock_release(&rs->iomap_lock);
+       return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+       struct rsocket *rs;
+       struct rs_iomap_mr *iomr;
+       dlist_entry *entry;
+       int ret = 0;
+
+       rs = idm_at(&idm, socket);
+       fastlock_acquire(&rs->iomap_lock);
+
+       for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+            entry = entry->next) {
+               iomr = container_of(entry, struct rs_iomap_mr, entry);
+               if (iomr->mr->addr == buf && iomr->mr->length == len) {
+                       rs_release_iomap_mr(iomr);
+                       goto out;
+               }
+       }
+
+       for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+            entry = entry->next) {
+               iomr = container_of(entry, struct rs_iomap_mr, entry);
+               if (iomr->mr->addr == buf && iomr->mr->length == len) {
+                       rs_release_iomap_mr(iomr);
+                       goto out;
+               }
+       }
+       ret = ERR(EINVAL);
+out:
+       fastlock_release(&rs->iomap_lock);
+       return ret;
+}
+
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+       int i;
+
+       for (i = 0; i < rs->target_iomap_size; i++) {
+               if (offset >= rs->target_iomap[i].offset &&
+                   offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
+                       return &rs->target_iomap[i];
+       }
+       return NULL;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
+       struct rsocket *rs;
+       struct rs_iomap *iom = NULL;
+       struct ibv_sge sge;
+       size_t left = count;
+       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+       int ret = 0;
+
+       rs = idm_at(&idm, socket);
+       fastlock_acquire(&rs->slock);
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+               if (!iom || offset > iom->offset + iom->sge.length) {
+                       iom = rs_find_iomap(rs, offset);
+                       if (!iom)
+                               break;
+               }
+
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+                       if (!(rs->state & rs_connect_wr)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+               }
+
+               if (olen < left) {
+                       xfer_size = olen;
+                       if (olen < RS_MAX_TRANSFER)
+                               olen <<= 1;
+               } else {
+                       xfer_size = left;
+               }
+
+               if (xfer_size > rs->sbuf_bytes_avail)
+                       xfer_size = rs->sbuf_bytes_avail;
+               if (xfer_size > iom->offset + iom->sge.length - offset)
+                       xfer_size = iom->offset + iom->sge.length - offset;
+
+               if (xfer_size <= rs->sq_inline) {
+                       sge.addr = (uintptr_t) buf;
+                       sge.length = xfer_size;
+                       sge.lkey = 0;
+                       ret = rs_write_direct(rs, iom, offset, &sge, 1,
+                                             xfer_size, IBV_SEND_INLINE);
+               } else if (xfer_size <= rs_sbuf_left(rs)) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+                       rs->ssgl[0].length = xfer_size;
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
+                       if (xfer_size < rs_sbuf_left(rs))
+                               rs->ssgl[0].addr += xfer_size;
+                       else
+                               rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+               } else {
+                       rs->ssgl[0].length = rs_sbuf_left(rs);
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               if (ret)
+                       break;
+       }
+out:
+       fastlock_release(&rs->slock);
+
+       return (ret && left == count) ? ret : count - left;
+}