]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsocket: Add APIs for direct data placement
authorSean Hefty <sean.hefty@intel.com>
Sun, 21 Oct 2012 21:16:03 +0000 (14:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sun, 21 Oct 2012 21:16:03 +0000 (14:16 -0700)
We introduce rsocket extensions for supporting direct
data placement (also known as zero copy).  Direct data
placement avoids data copies into network buffers when
sending or receiving data.  This patch implements zero
copies on the receive side, but adds some basic framework for
supporting it on the sending side.

Integrating zero copy support into the existing socket APIs
is difficult to achieve when the sockets are set as
nonblocking.  Any such implementation is likely to be unusable
in practice.  The problem stems from the fact that socket
operations are synchronous in nature.  Support for asynchronous
operations is limited to connection establishment.

Therefore we introduce new calls to handle direct data placement.
The use of the new calls is optional and does not affect the
use of the existing calls.  An attempt is made to have the new
routines integrate naturally with the existing APIs.  The new
functions are: riomap, riounmap, and riowrite.  The basic operation
can be described as follows:

1. App A calls riomap to register a data buffer with the local
   RDMA device.  Riomap returns an off_t offset value that
   corresponds to the registered data buffer.  The app may
   select the offset value.
2. Rsockets will transmit an internal message to the remote
   peer with information about the registration.  This exchange
   is hidden from the applications.
3. App A sends a notification message to app B indicating that
   the remote iomapped buffer is now available to receive data.
4. App B calls riowrite to transmit data directly into the
   riomapped data buffer.
5. App B sends a notification message to app A indicating that
   data is available in the mapped buffer.
6. After all transfers are complete, app A calls riounmap to
   deregister its data buffer.

Riomap and riounmap are functionally equivalent to RDMA
memory registration and deregistration routines.  They are loosely
based on the mmap and munmap APIs.

off_t riomap(int socket, void *buf, size_t len,
     int prot, int flags, off_t offset)

Riomap registers an application buffer with the RDMA hardware
associated with an rsocket.  The buffer is registered either for
local only access (PROT_NONE) or for remote write access (PROT_WRITE).
When registered for remote access, the buffer is mapped to a given
offset.  The offset is either provided by the user, or if the user
selects -1 for the offset, rsockets selects one.  The remote peer may
access an iomapped buffer directly by specifying the correct offset.
The mapping is not guaranteed to be available until after the remote
peer receives a data transfer initiated after riomap has completed.

int riounmap(int socket, void *buf, size_t len)

Riounmap removes the mapping between a buffer and an rsocket.

size_t riowrite(int socket, const void *buf, size_t count,
off_t offset, int flags)

Riowrite allows an application to transfer data over an rsocket
directly into a remotely iomapped buffer.  The remote buffer is specified
through an offset parameter, which corresponds to a remote iomapped buffer.
From the sender's perspective, riowrite behaves similar to rwrite.  From
a receiver's view, riowrite transfers are silently redirected into a pre-
determined data buffer.  Data is received automatically, and the receiver
is not informed of the transfer.  However, iowrite data is still considered
part of the data stream, such that iowrite data will be written before a
subsequent transfer is received.  A message sent immediately after
initiating an iowrite may be used to notify the receiver of the iowrite.

It should be noted that the current implementation primarily focused
on being functional for evaluation purposes.  Some checks have been
deferred for subsequent patches, and performance is currently limited
by linear lookups.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
docs/rsocket
include/rdma/rsocket.h
man/rsocket.7
src/indexer.h
src/librdmacm.map
src/rsocket.c

index 5399f6cbea686d2a196561c264c11d3eaadcae25..1484f65b7b5a8ee67e8a7d3b551363d924d64f95 100644 (file)
@@ -110,11 +110,11 @@ Bits    Message             Meaning of
 31:29    Type               Bits 28:0
 000    Data Transfer     bytes transfered
 001    reserved
-010    reserved
+010    reserved - used internally, available for future use
 011    reserved
 100    Credit Update     received credits granted
 101    reserved
-110    reserved
+110    Iomap Updated     index of updated entry
 111    Control           control message type
 
 Data Transfer
@@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in use.  This is done
 by tracking when a receive buffer referenced by a remote target SGL has been
 filled.
 
+Iomap Updated
+Used to indicate that a remote iomap entry was updated.  The updated entry
+contains the offset value associated with an address, length, and rkey.  Once
+an iomap has been updated, the local application can issue directed IO
+transfers against the corresponding remote buffer.
+
 Control Message - DISCONNECT
 Indicates that the rsocket connection has been fully disconnected and will no
 longer send or receive data.  Data received before the disconnect message was
@@ -142,3 +148,44 @@ Control Message - SHUTDOWN
 Indicates that the remote rsocket has shutdown the send side of its
 connection.  The recipient of a shutdown message will no longer accept
 incoming data, but may still transfer outbound data.
+
+
+Iomapped Buffers
+----------------
+Rsockets allows for zero-copy transfers using what it refers to as iomapped
+buffers.  Iomapping and direct data placement (zero-copy) transfers are done
+using rsocket specific extensions.  The general operation is similar to
+that used for normal data transfers described above.
+
+   host A                   host B
+                          remote iomap                      
+  target iomap  <-----------  [  ]
+     [  ] ------
+     [  ] --    ------  iomapped buffer(s)         
+            --        ----->  +--+
+              --              |  |
+                --            |  |
+                  --          |  |
+                    --        +--+
+                      --       
+                        --->  +--+
+                              |  |
+                              |  |
+                              +--+
+
+The remote iomap contains the address, size, and rkey of the target iomap.  As
+the applicaton maps buffers host B to a given rsocket, rsockets will issue an RDMA
+write against one of the entries in the target iomap on host A.  The
+updated entry will reference an available iomapped buffer.  Immediate data
+included with the RDMA write will indicate to host A that a target iomap
+has been updated.
+
+When host A wishes to transfer directly into an iomapped buffer, it will check
+its target iomap for an offset corresponding to a remotely mapped buffer.  A
+matching iomap entry will contain the address, size, and rkey of the target
+buffer on host B.  Host A will then issue an RDMA operation against the
+registered remote data buffer.
+
+From host A's perspective, the transfer appears as a normal send/write
+operation, with the data stream redirected directly into the receiving
+application's buffer.
index 65feda96a0ff93e4371759d881b420d337fb2f60..f220c134f306701971e32f72a42637a47620d55a 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011 Intel Corporation.  All rights reserved.
+ * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
  *
  * This software is available to you under a choice of one of two
  * licenses.  You may choose to be licensed under the terms of the GNU
@@ -39,6 +39,7 @@
 #include <errno.h>
 #include <poll.h>
 #include <sys/select.h>
+#include <sys/mman.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
 enum {
        RDMA_SQSIZE,
        RDMA_RQSIZE,
-       RDMA_INLINE
+       RDMA_INLINE,
+       RDMA_IOMAPSIZE
 };
 
 int rsetsockopt(int socket, int level, int optname,
@@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname,
                void *optval, socklen_t *optlen);
 int rfcntl(int socket, int cmd, ... /* arg */ );
 
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset);
+int riounmap(int socket, void *buf, size_t len);
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags);
+
 #ifdef __cplusplus
 }
 #endif
index 2ed5ca43b2f6a0917c4b0a18d2e7032c03c16648..5ecebc2e1a58154ed32571600e53cf83a6f096b0 100644 (file)
@@ -6,7 +6,7 @@ rsocket \- RDMA socket API
 .SH "DESCRIPTION"
 RDMA socket API and protocol
 .SH "NOTES"
-rsockets is a protocol over RDMA that supports a socket-level API
+Rsockets is a protocol over RDMA that supports a socket-level API
 for applications.  rsocket APIs are intended to match the behavior
 of corresponding socket calls, except where noted.  rsocket
 functions match the name and function signature of socket calls,
@@ -30,7 +30,7 @@ rgetpeername, rgetsockname
 .P
 rsetsockopt, rgetsockopt, rfcntl
 .P
-Functions take the same parameters as that use for sockets.  The
+Functions take the same parameters as that used for sockets.  The
 follow capabilities and flags are supported at this time:
 .P
 PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG
@@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF
 .P
 O_NONBLOCK
 .P
+Rsockets provides extensions beyond normal socket routines that
+allow for direct placement of data into an application's buffer.
+This is also known as zero-copy support, since data is sent and
+received directly, bypassing copies into network controlled buffers.
+The following calls and options support direct data placement.
+.P
+riomap, riounmap, riowrite
+.TP
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+.TP
+Riomap registers an application buffer with the RDMA hardware
+associated with an rsocket.  The buffer is registered either for
+local only access (PROT_NONE) or for remote write access (PROT_WRITE).
+When registered for remote access, the buffer is mapped to a given
+offset.  The offset is either provided by the user, or if the user
+selects -1 for the offset, rsockets selects one.  The remote peer may
+access an iomapped buffer directly by specifying the correct offset.
+The mapping is not guaranteed to be available until after the remote
+peer receives a data transfer initiated after riomap has completed.
+.P
+riounmap
+.TP
+int riounmap(int socket, void *buf, size_t len)
+.TP
+Riounmap removes the mapping between a buffer and an rsocket.
+.P
+riowrite
+.TP
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+.TP
+Riowrite allows an application to transfer data over an rsocket
+directly into a remotely iomapped buffer.  The remote buffer is specified
+through an offset parameter, which corresponds to a remote iomapped buffer.
+From the sender's perspective, riowrite behaves similar to rwrite.  From
+a receiver's view, riowrite transfers are silently redirected into a pre-
+determined data buffer.  Data is received automatically, and the receiver
+is not informed of the transfer.  However, iowrite data is still considered
+part of the data stream, such that iowrite data will be written before a
+subsequent transfer is received.  A  message sent immediately after initiating
+an iowrite may be used to notify the receiver of the iowrite.
+.P
 In addition to standard socket options, rsockets supports options
 specific to RDMA devices and protocols.  These options are accessible
 through rsetsockopt using SOL_RDMA option level.
@@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue.
 RDMA_RQSIZE - Integer size of the underlying receive queue.
 .TP
 RDMA_INLINE - Integer size of inline data.
+.TP
+RDMA_IOMAPSIZE - Integer number of remote IO mappings supported
 .P
 Note that rsockets fd's cannot be passed into non-rsocket calls.  For
 applications which must mix rsocket fd's with standard socket fd's or
@@ -84,6 +127,8 @@ rqsize_default - default size of receive queue
 .P
 inline_default - default size of inline data
 .P
+iomap_size - default size of remote iomapping table
+.P
 If configuration files are not available, rsockets uses internal defaults.
 .SH "SEE ALSO"
 rdma_cm(7)
index 26e7f9866a180b0692fde374b259e32f0b23b7ca..0c5f3882673f696b4b36ee87b6325aa9095bbba7 100644 (file)
@@ -31,6 +31,9 @@
  *
  */
 
+#if !defined(INDEXER_H)
+#define INDEXER_H
+
 #if HAVE_CONFIG_H
 #  include <config.h>
 #endif /* HAVE_CONFIG_H */
@@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index)
        return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ?
                idm_at(idm, index) : NULL;
 }
+
+typedef struct _dlist_entry {
+       struct _dlist_entry     *next;
+       struct _dlist_entry     *prev;
+}      dlist_entry;
+
+static inline void dlist_init(dlist_entry *head)
+{
+       head->next = head;
+       head->prev = head;
+}
+
+static inline int dlist_empty(dlist_entry *head)
+{
+       return head->next == head;
+}
+
+static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head)
+{
+       item->next = head->next;
+       item->prev = head;
+       head->next->prev = item;
+       head->next = item;
+}
+
+static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head)
+{
+       dlist_insert_after(item, head->prev);
+}
+
+#define dlist_insert_head dlist_insert_after
+#define dlist_insert_tail dlist_insert_before
+
+static inline void dlist_remove(dlist_entry *item)
+{
+       item->prev->next = item->next;
+       item->next->prev = item->prev;
+}
+
+#endif /* INDEXER_H */
index 5c317a3e78704716f59eb4fe6df495a8d95da006..d5ef7363ca76caf088b081d8c6374f7daf69d724 100644 (file)
@@ -63,5 +63,8 @@ RDMACM_1.0 {
                rselect;
                rdma_get_src_port;
                rdma_get_dst_port;
+               riomap;
+               riounmap;
+               riowrite;
        local: *;
 };
index cc5effe1aba45a9b560cf786c5272055ee8e51d1..29b8c018386a3a906e7f39ed3d3b42420be13d5b 100644 (file)
@@ -55,6 +55,7 @@
 
 #define RS_OLAP_START_SIZE 2048
 #define RS_MAX_TRANSFER 65536
+#define RS_SNDLOWAT 64
 #define RS_QP_MAX_SIZE 0xFFFE
 #define RS_QP_CTRL_SIZE 4
 #define RS_CONN_RETRIES 6
@@ -62,6 +63,7 @@
 static struct index_map idm;
 static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
 
+static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
 static uint16_t def_sqsize = 384;
 static uint16_t def_rqsize = 384;
@@ -76,19 +78,22 @@ static uint32_t polling_time = 10;
  * bit 29: more data, 0 - end of transfer, 1 - more data available
  *
  * for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transfered
  * for control messages:
+ * SGL, CTRL
  * bits [28-0]: receive credits granted
+ * IOMAP_SGL
+ * bits [28-16]: reserved, bits [15-0]: index
  */
 
 enum {
        RS_OP_DATA,
        RS_OP_RSVD_DATA_MORE,
-       RS_OP_RSVD_DRA,
+       RS_OP_WRITE, /* opcode is not transmitted over the network */
        RS_OP_RSVD_DRA_MORE,
        RS_OP_SGL,
        RS_OP_RSVD,
-       RS_OP_RSVD_DRA_SGL,
+       RS_OP_IOMAP_SGL,
        RS_OP_CTRL
 };
 #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
@@ -111,15 +116,30 @@ struct rs_sge {
        uint32_t length;
 };
 
-#define RS_MIN_INLINE    (sizeof(struct rs_sge))
-#define rs_host_is_net() (1 == htonl(1))
-#define RS_CONN_FLAG_NET 1
+struct rs_iomap {
+       uint64_t offset;
+       struct rs_sge sge;
+};
+
+struct rs_iomap_mr {
+       uint64_t offset;
+       struct ibv_mr *mr;
+       dlist_entry entry;
+       atomic_t refcnt;
+       int index;      /* -1 if mapping is local and not in iomap_list */
+};
+
+#define RS_MIN_INLINE      (sizeof(struct rs_sge))
+#define rs_host_is_net()   (1 == htonl(1))
+#define RS_CONN_FLAG_NET   (1 << 0)
+#define RS_CONN_FLAG_IOMAP (1 << 1)
 
 struct rs_conn_data {
        uint8_t           version;
        uint8_t           flags;
        uint16_t          credits;
-       uint32_t          reserved2;
+       uint8_t           reserved[3];
+       uint8_t           target_iomap_size;
        struct rs_sge     target_sgl;
        struct rs_sge     data_buf;
 };
@@ -155,6 +175,7 @@ struct rsocket {
        fastlock_t        rlock;
        fastlock_t        cq_lock;
        fastlock_t        cq_wait_lock;
+       fastlock_t        iomap_lock;
 
        int               opts;
        long              fd_flags;
@@ -186,10 +207,19 @@ struct rsocket {
 
        int               remote_sge;
        struct rs_sge     remote_sgl;
+       struct rs_sge     remote_iomap;
+
+       struct rs_iomap_mr *remote_iomappings;
+       dlist_entry       iomap_list;
+       dlist_entry       iomap_queue;
+       int               iomap_pending;
 
        struct ibv_mr    *target_mr;
        int               target_sge;
-       volatile struct rs_sge    target_sgl[RS_SGL_SIZE];
+       int               target_iomap_size;
+       void             *target_buffer_list;
+       volatile struct rs_sge    *target_sgl;
+       struct rs_iomap  *target_iomap;
 
        uint32_t          rbuf_size;
        struct ibv_mr    *rmr;
@@ -201,6 +231,18 @@ struct rsocket {
        uint8_t           *sbuf;
 };
 
+static int rs_value_to_scale(int value, int bits)
+{
+       return value <= (1 << (bits - 1)) ?
+              value : (1 << (bits - 1)) | (value >> bits);
+}
+
+static int rs_scale_to_value(int value, int bits)
+{
+       return value <= (1 << (bits - 1)) ?
+              value : (value & ~(1 << (bits - 1))) << bits;
+}
+
 void rs_configure(void)
 {
        FILE *f;
@@ -247,9 +289,17 @@ void rs_configure(void)
        if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
                fscanf(f, "%u", &def_wmem);
                fclose(f);
+               if (def_wmem < RS_SNDLOWAT)
+                       def_wmem = RS_SNDLOWAT << 1;
+       }
+
+       if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
+               fscanf(f, "%hu", &def_iomap_size);
+               fclose(f);
 
-               if (def_wmem < 1)
-                       def_wmem = 1;
+               /* round to supported values */
+               def_iomap_size = (uint8_t) rs_value_to_scale(
+                       (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
        }
        init = 1;
 out:
@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = inherited_rs->sq_size;
                rs->rq_size = inherited_rs->rq_size;
                rs->ctrl_avail = inherited_rs->ctrl_avail;
+               rs->target_iomap_size = inherited_rs->target_iomap_size;
        } else {
                rs->sbuf_size = def_wmem;
                rs->rbuf_size = def_mem;
@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
                rs->sq_size = def_sqsize;
                rs->rq_size = def_rqsize;
                rs->ctrl_avail = RS_QP_CTRL_SIZE;
+               rs->target_iomap_size = def_iomap_size;
        }
        fastlock_init(&rs->slock);
        fastlock_init(&rs->rlock);
        fastlock_init(&rs->cq_lock);
        fastlock_init(&rs->cq_wait_lock);
+       fastlock_init(&rs->iomap_lock);
+       dlist_init(&rs->iomap_list);
+       dlist_init(&rs->iomap_queue);
        return rs;
 }
 
@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
+       size_t len;
+
        rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
        if (!rs->rmsg)
                return -1;
@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
        if (!rs->smr)
                return -1;
 
-       rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
-                                      sizeof(rs->target_sgl));
+       len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
+             sizeof(*rs->target_iomap) * rs->target_iomap_size;
+       rs->target_buffer_list = malloc(len);
+       if (!rs->target_buffer_list)
+               return -1;
+
+       rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
        if (!rs->target_mr)
                return -1;
 
+       memset(rs->target_buffer_list, 0, len);
+       rs->target_sgl = rs->target_buffer_list;
+       if (rs->target_iomap_size)
+               rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
+
        rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
        if (!rs->rbuf)
                return -1;
@@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs)
        return 0;
 }
 
+/*
+static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...)
+{
+       TODO: write me
+}
+*/
+
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+       if (atomic_dec(&iomr->refcnt))
+               return;
+
+       dlist_remove(&iomr->entry);
+       ibv_dereg_mr(iomr->mr);
+       if (iomr->index >= 0)
+               iomr->mr = NULL;
+       else
+               free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+       struct rs_iomap_mr *iomr;
+
+       while (!dlist_empty(&rs->iomap_list)) {
+               iomr = container_of(rs->iomap_list.next,
+                                   struct rs_iomap_mr, entry);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+       }
+       while (!dlist_empty(&rs->iomap_queue)) {
+               iomr = container_of(rs->iomap_queue.next,
+                                   struct rs_iomap_mr, entry);
+               riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+       }
+}
+
 static void rs_free(struct rsocket *rs)
 {
        if (rs->index >= 0)
@@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs)
                free(rs->rbuf);
        }
 
-       if (rs->target_mr)
-               rdma_dereg_mr(rs->target_mr);
+       if (rs->target_buffer_list) {
+               if (rs->target_mr)
+                       rdma_dereg_mr(rs->target_mr);
+               free(rs->target_buffer_list);
+       }
 
        if (rs->cm_id) {
+               rs_free_iomappings(rs);
                if (rs->cm_id->qp)
                        rdma_destroy_qp(rs->cm_id);
                rdma_destroy_id(rs->cm_id);
        }
 
+       fastlock_destroy(&rs->iomap_lock);
        fastlock_destroy(&rs->cq_wait_lock);
        fastlock_destroy(&rs->cq_lock);
        fastlock_destroy(&rs->rlock);
@@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
                             struct rs_conn_data *conn)
 {
        conn->version = 1;
-       conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
+       conn->flags = RS_CONN_FLAG_IOMAP |
+                     (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
        conn->credits = htons(rs->rq_size);
-       conn->reserved2 = 0;
+       memset(conn->reserved, 0, sizeof conn->reserved);
+       conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
 
        conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
        conn->target_sgl.length = htonl(RS_SGL_SIZE);
@@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
            (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
                rs->opts = RS_OPT_SWAP_SGL;
 
+       if (conn->flags & RS_CONN_FLAG_IOMAP) {
+               rs->remote_iomap.addr = rs->remote_sgl.addr +
+                                       sizeof(rs->remote_sgl) * rs->remote_sgl.length;
+               rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
+               rs->remote_iomap.key = rs->remote_sgl.key;
+       }
+
        rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
        rs->target_sgl[0].length = ntohl(conn->data_buf.length);
        rs->target_sgl[0].key = ntohl(conn->data_buf.key);
@@ -753,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
        return rs_do_connect(rs);
 }
 
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
                         struct ibv_sge *sgl, int nsge,
                         uint32_t imm_data, int flags,
                         uint64_t addr, uint32_t rkey)
@@ -773,6 +890,25 @@ static int rs_post_write(struct rsocket *rs,
        return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
 }
 
+static int rs_post_write(struct rsocket *rs,
+                        struct ibv_sge *sgl, int nsge,
+                        uint64_t wr_id, int flags,
+                        uint64_t addr, uint32_t rkey)
+{
+       struct ibv_send_wr wr, *bad;
+
+       wr.wr_id = wr_id;
+       wr.next = NULL;
+       wr.sg_list = sgl;
+       wr.num_sge = nsge;
+       wr.opcode = IBV_WR_RDMA_WRITE;
+       wr.send_flags = flags;
+       wr.wr.rdma.remote_addr = addr;
+       wr.wr.rdma.rkey = rkey;
+
+       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
 /*
  * Update target SGE before sending data.  Otherwise the remote side may
  * update the entry before we do.
@@ -799,8 +935,35 @@ static int rs_write_data(struct rsocket *rs,
                        rs->target_sge = 0;
        }
 
-       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-                            flags, addr, rkey);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+                                flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+                          struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+{
+       uint64_t addr;
+
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= length;
+
+       addr = iom->sge.addr + offset - iom->offset;
+       return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+                            flags, addr, iom->sge.key);
+}
+
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+                         struct ibv_sge *sgl, int nsge, int flags)
+{
+       uint64_t addr;
+
+       rs->sseq_no++;
+       rs->sqe_avail--;
+       rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+       addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
+       return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+                                flags, addr, rs->remote_iomap.key);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -831,12 +994,12 @@ static void rs_send_credits(struct rsocket *rs)
                ibsge.lkey = 0;
                ibsge.length = sizeof(sge);
 
-               rs_post_write(rs, &ibsge, 1,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-                             IBV_SEND_INLINE,
-                             rs->remote_sgl.addr +
-                             rs->remote_sge * sizeof(struct rs_sge),
-                             rs->remote_sgl.key);
+               rs_post_write_msg(rs, &ibsge, 1,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 IBV_SEND_INLINE,
+                                 rs->remote_sgl.addr +
+                                 rs->remote_sge * sizeof(struct rs_sge),
+                                 rs->remote_sgl.key);
 
                rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
                rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -845,8 +1008,9 @@ static void rs_send_credits(struct rsocket *rs)
                if (++rs->remote_sge == rs->remote_sgl.length)
                        rs->remote_sge = 0;
        } else {
-               rs_post_write(rs, NULL, 0,
-                             rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+               rs_post_write_msg(rs, NULL, 0,
+                                 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+                                 0, 0, 0);
        }
 }
 
@@ -880,6 +1044,9 @@ static int rs_poll_cq(struct rsocket *rs)
                        case RS_OP_SGL:
                                rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               /* The iomap was updated, that's nice to know. */
+                               break;
                        case RS_OP_CTRL:
                                if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
                                        rs->state = rs_disconnected;
@@ -888,6 +1055,9 @@ static int rs_poll_cq(struct rsocket *rs)
                                        rs->state &= ~rs_connect_rd;
                                }
                                break;
+                       case RS_OP_WRITE:
+                               /* We really shouldn't be here. */
+                               break;
                        default:
                                rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
                                rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
@@ -905,6 +1075,10 @@ static int rs_poll_cq(struct rsocket *rs)
                                if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
                                        rs->state = rs_disconnected;
                                break;
+                       case RS_OP_IOMAP_SGL:
+                               rs->sqe_avail++;
+                               rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+                               break;
                        default:
                                rs->sqe_avail++;
                                rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
@@ -1046,7 +1220,7 @@ static int rs_poll_all(struct rsocket *rs)
  */
 static int rs_can_send(struct rsocket *rs)
 {
-       return rs->sqe_avail && rs->sbuf_bytes_avail &&
+       return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
               (rs->sseq_no != rs->sseq_comp) &&
               (rs->target_sgl[rs->target_sge].length != 0);
 }
@@ -1216,6 +1390,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
        return rrecvv(socket, iov, iovcnt, 0);
 }
 
+static int rs_send_iomaps(struct rsocket *rs, int flags)
+{
+       struct rs_iomap_mr *iomr;
+       struct ibv_sge sge;
+       struct rs_iomap iom;
+       int ret;
+
+       fastlock_acquire(&rs->iomap_lock);
+       while (!dlist_empty(&rs->iomap_queue)) {
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+                       if (!(rs->state & rs_connect_wr)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+               }
+
+               iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
+               if (!(rs->opts & RS_OPT_SWAP_SGL)) {
+                       iom.offset = iomr->offset;
+                       iom.sge.addr = (uintptr_t) iomr->mr->addr;
+                       iom.sge.length = iomr->mr->length;
+                       iom.sge.key = iomr->mr->rkey;
+               } else {
+                       iom.offset = bswap_64(iomr->offset);
+                       iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
+                       iom.sge.length = bswap_32(iomr->mr->length);
+                       iom.sge.key = bswap_32(iomr->mr->rkey);
+               }
+
+               if (rs->sq_inline >= sizeof iom) {
+                       sge.addr = (uintptr_t) &iom;
+                       sge.length = sizeof iom;
+                       sge.lkey = 0;
+                       ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
+               } else if (rs_sbuf_left(rs) >= sizeof iom) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
+                       rs->ssgl[0].length = sizeof iom;
+                       ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
+                       if (rs_sbuf_left(rs) > sizeof iom)
+                               rs->ssgl[0].addr += sizeof iom;
+                       else
+                               rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+               } else {
+                       rs->ssgl[0].length = rs_sbuf_left(rs);
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
+                              rs->ssgl[1].length);
+                       ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               dlist_remove(&iomr->entry);
+               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+               if (ret)
+                       break;
+       }
+
+       rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+       fastlock_release(&rs->iomap_lock);
+       return ret;
+}
+
 /*
  * We overlap sending the data, by posting a small work request immediately,
  * then increasing the size of the send on each iteration.
@@ -1224,7 +1465,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
 {
        struct rsocket *rs;
        struct ibv_sge sge;
-       size_t left;
+       size_t left = len;
        uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
        int ret = 0;
 
@@ -1239,7 +1480,12 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
        }
 
        fastlock_acquire(&rs->slock);
-       for (left = len; left; left -= xfer_size, buf += xfer_size) {
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
                                          rs_conn_can_send);
@@ -1289,6 +1535,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                if (ret)
                        break;
        }
+out:
        fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
@@ -1345,9 +1592,15 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        len = iov[0].iov_len;
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
+       left = len;
 
        fastlock_acquire(&rs->slock);
-       for (left = len; left; left -= xfer_size) {
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
                                          rs_conn_can_send);
@@ -1395,6 +1648,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                if (ret)
                        break;
        }
+out:
        fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
@@ -1725,8 +1979,8 @@ int rshutdown(int socket, int how)
 
                if ((rs->state & rs_connected) && rs->ctrl_avail) {
                        rs->ctrl_avail--;
-                       ret = rs_post_write(rs, NULL, 0,
-                                           rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+                       ret = rs_post_write_msg(rs, NULL, 0,
+                                               rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
                }
        }
 
@@ -1814,6 +2068,8 @@ int rsetsockopt(int socket, int level, int optname,
                case SO_SNDBUF:
                        if (!rs->sbuf)
                                rs->sbuf_size = (*(uint32_t *) optval) << 1;
+                       if (rs->sbuf_size < RS_SNDLOWAT)
+                               rs->sbuf_size = RS_SNDLOWAT << 1;
                        ret = 0;
                        break;
                case SO_LINGER:
@@ -1878,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname,
                        if (rs->sq_inline < RS_MIN_INLINE)
                                rs->sq_inline = RS_MIN_INLINE;
                        break;
+               case RDMA_IOMAPSIZE:
+                       rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+                               (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
+                       break;
                default:
                        break;
                }
@@ -1979,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname,
                        *((int *) optval) = rs->sq_inline;
                        *optlen = sizeof(int);
                        break;
+               case RDMA_IOMAPSIZE:
+                       *((int *) optval) = rs->target_iomap_size;
+                       *optlen = sizeof(int);
+                       break;
                default:
                        ret = ENOTSUP;
                        break;
@@ -2020,3 +2284,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
        va_end(args);
        return ret;
 }
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+       int i;
+
+       if (!rs->remote_iomappings) {
+               rs->remote_iomappings = calloc(rs->remote_iomap.length,
+                                              sizeof(*rs->remote_iomappings));
+               if (!rs->remote_iomappings)
+                       return NULL;
+
+               for (i = 0; i < rs->remote_iomap.length; i++)
+                       rs->remote_iomappings[i].index = i;
+       }
+
+       for (i = 0; i < rs->remote_iomap.length; i++) {
+               if (!rs->remote_iomappings[i].mr)
+                       return &rs->remote_iomappings[i];
+       }
+       return NULL;
+}
+
+/*
+ * If an offset is given, we map to it.  If offset is -1, then we map the
+ * offset to the address of buf.  We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+{
+       struct rsocket *rs;
+       struct rs_iomap_mr *iomr;
+       int access = IBV_ACCESS_LOCAL_WRITE;
+
+       rs = idm_at(&idm, socket);
+       if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
+               return ERR(EINVAL);
+
+       fastlock_acquire(&rs->iomap_lock);
+       if (prot & PROT_WRITE) {
+               iomr = rs_get_iomap_mr(rs);
+               access |= IBV_ACCESS_REMOTE_WRITE;
+       } else {
+               iomr = calloc(1, sizeof *iomr);
+               iomr->index = -1;
+       }
+       if (!iomr) {
+               offset = ERR(ENOMEM);
+               goto out;
+       }
+
+       iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+       if (!iomr->mr) {
+               if (iomr->index < 0)
+                       free(iomr);
+               offset = -1;
+               goto out;
+       }
+
+       if (offset == -1)
+               offset = (uintptr_t) buf;
+       iomr->offset = offset;
+       atomic_init(&iomr->refcnt);
+       atomic_set(&iomr->refcnt, 1);
+
+       if (iomr->index >= 0) {
+               dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+               rs->iomap_pending = 1;
+       } else {
+               dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+       }
+out:
+       fastlock_release(&rs->iomap_lock);
+       return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+       struct rsocket *rs;
+       struct rs_iomap_mr *iomr;
+       dlist_entry *entry;
+       int ret = 0;
+
+       rs = idm_at(&idm, socket);
+       fastlock_acquire(&rs->iomap_lock);
+
+       for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+            entry = entry->next) {
+               iomr = container_of(entry, struct rs_iomap_mr, entry);
+               if (iomr->mr->addr == buf && iomr->mr->length == len) {
+                       rs_release_iomap_mr(iomr);
+                       goto out;
+               }
+       }
+
+       for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+            entry = entry->next) {
+               iomr = container_of(entry, struct rs_iomap_mr, entry);
+               if (iomr->mr->addr == buf && iomr->mr->length == len) {
+                       rs_release_iomap_mr(iomr);
+                       goto out;
+               }
+       }
+       ret = ERR(EINVAL);
+out:
+       fastlock_release(&rs->iomap_lock);
+       return ret;
+}
+
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+       int i;
+
+       for (i = 0; i < rs->target_iomap_size; i++) {
+               if (offset >= rs->target_iomap[i].offset &&
+                   offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
+                       return &rs->target_iomap[i];
+       }
+       return NULL;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
+       struct rsocket *rs;
+       struct rs_iomap *iom = NULL;
+       struct ibv_sge sge;
+       size_t left = count;
+       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+       int ret = 0;
+
+       rs = idm_at(&idm, socket);
+       fastlock_acquire(&rs->slock);
+       if (rs->iomap_pending) {
+               ret = rs_send_iomaps(rs, flags);
+               if (ret)
+                       goto out;
+       }
+       for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+               if (!iom || offset > iom->offset + iom->sge.length) {
+                       iom = rs_find_iomap(rs, offset);
+                       if (!iom)
+                               break;
+               }
+
+               if (!rs_can_send(rs)) {
+                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+                                         rs_conn_can_send);
+                       if (ret)
+                               break;
+                       if (!(rs->state & rs_connect_wr)) {
+                               ret = ERR(ECONNRESET);
+                               break;
+                       }
+               }
+
+               if (olen < left) {
+                       xfer_size = olen;
+                       if (olen < RS_MAX_TRANSFER)
+                               olen <<= 1;
+               } else {
+                       xfer_size = left;
+               }
+
+               if (xfer_size > rs->sbuf_bytes_avail)
+                       xfer_size = rs->sbuf_bytes_avail;
+               if (xfer_size > iom->offset + iom->sge.length - offset)
+                       xfer_size = iom->offset + iom->sge.length - offset;
+
+               if (xfer_size <= rs->sq_inline) {
+                       sge.addr = (uintptr_t) buf;
+                       sge.length = xfer_size;
+                       sge.lkey = 0;
+                       ret = rs_write_direct(rs, iom, offset, &sge, 1,
+                                             xfer_size, IBV_SEND_INLINE);
+               } else if (xfer_size <= rs_sbuf_left(rs)) {
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+                       rs->ssgl[0].length = xfer_size;
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
+                       if (xfer_size < rs_sbuf_left(rs))
+                               rs->ssgl[0].addr += xfer_size;
+                       else
+                               rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+               } else {
+                       rs->ssgl[0].length = rs_sbuf_left(rs);
+                       memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+                               rs->ssgl[0].length);
+                       rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+                       memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+                       ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
+                       rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+               }
+               if (ret)
+                       break;
+       }
+out:
+       fastlock_release(&rs->slock);
+
+       return (ret && left == count) ? ret : count - left;
+}