]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
commit
authorSean Hefty <sean.hefty@intel.com>
Mon, 5 Nov 2012 20:02:59 +0000 (12:02 -0800)
committerSean Hefty <sean.hefty@intel.com>
Mon, 5 Nov 2012 20:02:59 +0000 (12:02 -0800)
meta
patches/fscanf [deleted file]
patches/riostream [deleted file]
patches/rs-iomap [deleted file]

diff --git a/meta b/meta
index 747a7ac7cfd0758080d1c30870e0e2d1f1f10fac..042e3916ac56e955e01bddd986622e5d635ded2e 100644 (file)
--- a/meta
+++ b/meta
@@ -1,10 +1,7 @@
 Version: 1
-Previous: bbec85153cdf0b6cce9cc248f080adefb3d0c76a
+Previous: 12a4515095a10c730e9a703af13cf060eeab9e44
 Head: ab0d488c1e3ba7658f61a4d8da022b5afc17737f
 Applied:
-  rs-iomap: bb9fcba81acdfe34ea5df3bb23a45e0a486207da
-  riostream: 7d92d0106f50e0371256e74863963a0e2e99a5c8
-  fscanf: ab0d488c1e3ba7658f61a4d8da022b5afc17737f
 Unapplied:
   iom-dbg: 88434072d07f8edc58f454ac954d78bd39441eed
   resv-rs-len: 7b6ff5c4894f54b221d877adcd709795dffb2fe9
diff --git a/patches/fscanf b/patches/fscanf
deleted file mode 100644 (file)
index 5d386ec..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-Bottom: 59dfa020a0be74bc45df6be3de00c8ca1f997348
-Top:    92d2aab8615c3d1003fee963587c4078b732e465
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-11-05 11:53:03 -0800
-
-rsocket: Remove fscanf build warnings
-
-Cast fscanf return values to (void) to indicate that we don't
-care if the call fails.  In the case of a failure, we simply
-fall back to using default values.
-
-Problem reported by Or Gerlitz <ogerlitz@mellanox.com>
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 74dbcc7..58fcb8e 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -256,12 +256,12 @@ void rs_configure(void)
-               goto out;
-       if ((f = fopen(RS_CONF_DIR "/polling_time", "r"))) {
--              fscanf(f, "%u", &polling_time);
-+              (void) fscanf(f, "%u", &polling_time);
-               fclose(f);
-       }
-       if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
--              fscanf(f, "%hu", &def_inline);
-+              (void) fscanf(f, "%hu", &def_inline);
-               fclose(f);
-               if (def_inline < RS_MIN_INLINE)
-@@ -269,17 +269,17 @@ void rs_configure(void)
-       }
-       if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
--              fscanf(f, "%hu", &def_sqsize);
-+              (void) fscanf(f, "%hu", &def_sqsize);
-               fclose(f);
-       }
-       if ((f = fopen(RS_CONF_DIR "/rqsize_default", "r"))) {
--              fscanf(f, "%hu", &def_rqsize);
-+              (void) fscanf(f, "%hu", &def_rqsize);
-               fclose(f);
-       }
-       if ((f = fopen(RS_CONF_DIR "/mem_default", "r"))) {
--              fscanf(f, "%u", &def_mem);
-+              (void) fscanf(f, "%u", &def_mem);
-               fclose(f);
-               if (def_mem < 1)
-@@ -287,14 +287,14 @@ void rs_configure(void)
-       }
-       if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
--              fscanf(f, "%u", &def_wmem);
-+              (void) fscanf(f, "%u", &def_wmem);
-               fclose(f);
-               if (def_wmem < RS_SNDLOWAT)
-                       def_wmem = RS_SNDLOWAT << 1;
-       }
-       if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
--              fscanf(f, "%hu", &def_iomap_size);
-+              (void) fscanf(f, "%hu", &def_iomap_size);
-               fclose(f);
-               /* round to supported values */
diff --git a/patches/riostream b/patches/riostream
deleted file mode 100644 (file)
index fe83fe7..0000000
+++ /dev/null
@@ -1,853 +0,0 @@
-Bottom: f577125f6a6e9da1cf989ea2c4e933c66a84cf05
-Top:    59dfa020a0be74bc45df6be3de00c8ca1f997348
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-10-24 10:23:52 -0700
-
-riostream: Add example program for using iomap routines.
-
-riostream is based on rstream, but uses the new riomap, riounmap,
-and riowrite calls instead.  It runs a series of latency and
-bandwidth tests using remote iomapped memory.
-
-riostream is limited to using zero copy transfers at the
-receiving side only at this time.
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/Makefile.am b/Makefile.am
-index 4a8b9bc..d72016d 100644
---- a/Makefile.am
-+++ b/Makefile.am
-@@ -28,7 +28,8 @@ src_librspreload_la_LIBADD = $(top_builddir)/src/librdmacm.la
- bin_PROGRAMS = examples/ucmatose examples/rping examples/udaddy examples/mckey \
-              examples/rdma_client examples/rdma_server examples/rdma_xclient \
--             examples/rdma_xserver examples/rstream examples/rcopy
-+             examples/rdma_xserver examples/rstream examples/rcopy \
-+             examples/riostream
- examples_ucmatose_SOURCES = examples/cmatose.c examples/common.c
- examples_ucmatose_LDADD = $(top_builddir)/src/librdmacm.la
- examples_rping_SOURCES = examples/rping.c
-@@ -47,6 +48,8 @@ examples_rdma_xserver_SOURCES = examples/rdma_xserver.c
- examples_rdma_xserver_LDADD = $(top_builddir)/src/librdmacm.la
- examples_rstream_SOURCES = examples/rstream.c
- examples_rstream_LDADD = $(top_builddir)/src/librdmacm.la
-+examples_riostream_SOURCES = examples/riostream.c
-+examples_riostream_LDADD = $(top_builddir)/src/librdmacm.la
- examples_rcopy_SOURCES = examples/rcopy.c
- examples_rcopy_LDADD = $(top_builddir)/src/librdmacm.la
-diff --git a/examples/riostream.c b/examples/riostream.c
-new file mode 100644
-index 0000000..dfb03e5
---- /dev/null
-+++ b/examples/riostream.c
-@@ -0,0 +1,735 @@
-+/*
-+ * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
-+ *
-+ * This software is available to you under the OpenIB.org BSD license
-+ * below:
-+ *
-+ *     Redistribution and use in source and binary forms, with or
-+ *     without modification, are permitted provided that the following
-+ *     conditions are met:
-+ *
-+ *      - Redistributions of source code must retain the above
-+ *        copyright notice, this list of conditions and the following
-+ *        disclaimer.
-+ *
-+ *      - Redistributions in binary form must reproduce the above
-+ *        copyright notice, this list of conditions and the following
-+ *        disclaimer in the documentation and/or other materials
-+ *        provided with the distribution.
-+ *
-+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
-+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
-+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
-+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-+ * SOFTWARE.
-+ */
-+
-+#include <stdio.h>
-+#include <stdlib.h>
-+#include <string.h>
-+#include <strings.h>
-+#include <errno.h>
-+#include <getopt.h>
-+#include <sys/types.h>
-+#include <sys/socket.h>
-+#include <sys/time.h>
-+#include <sys/wait.h>
-+#include <netdb.h>
-+#include <fcntl.h>
-+#include <unistd.h>
-+#include <netinet/in.h>
-+#include <netinet/tcp.h>
-+
-+#include <rdma/rdma_cma.h>
-+#include <rdma/rsocket.h>
-+
-+struct test_size_param {
-+      int size;
-+      int option;
-+};
-+
-+static struct test_size_param test_size[] = {
-+      { 1 <<  6, 0 },
-+      { 1 <<  7, 1 }, { (1 <<  7) + (1 <<  6), 1},
-+      { 1 <<  8, 1 }, { (1 <<  8) + (1 <<  7), 1},
-+      { 1 <<  9, 1 }, { (1 <<  9) + (1 <<  8), 1},
-+      { 1 << 10, 1 }, { (1 << 10) + (1 <<  9), 1},
-+      { 1 << 11, 1 }, { (1 << 11) + (1 << 10), 1},
-+      { 1 << 12, 0 }, { (1 << 12) + (1 << 11), 1},
-+      { 1 << 13, 1 }, { (1 << 13) + (1 << 12), 1},
-+      { 1 << 14, 1 }, { (1 << 14) + (1 << 13), 1},
-+      { 1 << 15, 1 }, { (1 << 15) + (1 << 14), 1},
-+      { 1 << 16, 0 }, { (1 << 16) + (1 << 15), 1},
-+      { 1 << 17, 1 }, { (1 << 17) + (1 << 16), 1},
-+      { 1 << 18, 1 }, { (1 << 18) + (1 << 17), 1},
-+      { 1 << 19, 1 }, { (1 << 19) + (1 << 18), 1},
-+      { 1 << 20, 0 }, { (1 << 20) + (1 << 19), 1},
-+      { 1 << 21, 1 }, { (1 << 21) + (1 << 20), 1},
-+      { 1 << 22, 1 }, { (1 << 22) + (1 << 21), 1},
-+};
-+#define TEST_CNT (sizeof test_size / sizeof test_size[0])
-+
-+enum rs_optimization {
-+      opt_mixed,
-+      opt_latency,
-+      opt_bandwidth
-+};
-+
-+static int rs, lrs;
-+static int use_async;
-+static int verify;
-+static int flags = MSG_DONTWAIT;
-+static int poll_timeout = 0;
-+static int custom;
-+static enum rs_optimization optimization;
-+static int size_option;
-+static int iterations = 1;
-+static int transfer_size = 1000;
-+static int transfer_count = 1000;
-+static int buffer_size;
-+static char test_name[10] = "custom";
-+static char *port = "7471";
-+static char *dst_addr;
-+static char *src_addr;
-+static struct timeval start, end;
-+static void *buf;
-+static volatile uint8_t *poll_byte;
-+
-+static void size_str(char *str, size_t ssize, long long size)
-+{
-+      long long base, fraction = 0;
-+      char mag;
-+
-+      if (size >= (1 << 30)) {
-+              base = 1 << 30;
-+              mag = 'g';
-+      } else if (size >= (1 << 20)) {
-+              base = 1 << 20;
-+              mag = 'm';
-+      } else if (size >= (1 << 10)) {
-+              base = 1 << 10;
-+              mag = 'k';
-+      } else {
-+              base = 1;
-+              mag = '\0';
-+      }
-+
-+      if (size / base < 10)
-+              fraction = (size % base) * 10 / base;
-+      if (fraction) {
-+              snprintf(str, ssize, "%lld.%lld%c", size / base, fraction, mag);
-+      } else {
-+              snprintf(str, ssize, "%lld%c", size / base, mag);
-+      }
-+}
-+
-+static void cnt_str(char *str, size_t ssize, long long cnt)
-+{
-+      if (cnt >= 1000000000)
-+              snprintf(str, ssize, "%lldb", cnt / 1000000000);
-+      else if (cnt >= 1000000)
-+              snprintf(str, ssize, "%lldm", cnt / 1000000);
-+      else if (cnt >= 1000)
-+              snprintf(str, ssize, "%lldk", cnt / 1000);
-+      else
-+              snprintf(str, ssize, "%lld", cnt);
-+}
-+
-+static void show_perf(void)
-+{
-+      char str[32];
-+      float usec;
-+      long long bytes;
-+
-+      usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
-+      bytes = (long long) iterations * transfer_count * transfer_size * 2;
-+
-+      /* name size transfers iterations bytes seconds Gb/sec usec/xfer */
-+      printf("%-10s", test_name);
-+      size_str(str, sizeof str, transfer_size);
-+      printf("%-8s", str);
-+      cnt_str(str, sizeof str, transfer_count);
-+      printf("%-8s", str);
-+      cnt_str(str, sizeof str, iterations);
-+      printf("%-8s", str);
-+      size_str(str, sizeof str, bytes);
-+      printf("%-8s", str);
-+      printf("%8.2fs%10.2f%11.2f\n",
-+              usec / 1000000., (bytes * 8) / (1000. * usec),
-+              (usec / iterations) / (transfer_count * 2));
-+}
-+
-+static int size_to_count(int size)
-+{
-+      if (size >= 1000000)
-+              return 100;
-+      else if (size >= 100000)
-+              return 1000;
-+      else if (size >= 10000)
-+              return 10000;
-+      else if (size >= 1000)
-+              return 100000;
-+      else
-+              return 1000000;
-+}
-+
-+static void init_latency_test(int size)
-+{
-+      char sstr[5];
-+
-+      size_str(sstr, sizeof sstr, size);
-+      snprintf(test_name, sizeof test_name, "%s_lat", sstr);
-+      transfer_count = 1;
-+      transfer_size = size;
-+      iterations = size_to_count(transfer_size);
-+}
-+
-+static void init_bandwidth_test(int size)
-+{
-+      char sstr[5];
-+
-+      size_str(sstr, sizeof sstr, size);
-+      snprintf(test_name, sizeof test_name, "%s_bw", sstr);
-+      iterations = 1;
-+      transfer_size = size;
-+      transfer_count = size_to_count(transfer_size);
-+}
-+
-+static void format_buf(void *buf, int size)
-+{
-+      uint8_t *array = buf;
-+      static uint8_t data;
-+      int i;
-+
-+      for (i = 0; i < size; i++)
-+              array[i] = data++;
-+}
-+
-+static int verify_buf(void *buf, int size)
-+{
-+      static long long total_bytes;
-+      uint8_t *array = buf;
-+      static uint8_t data;
-+      int i;
-+
-+      for (i = 0; i < size; i++, total_bytes++) {
-+              if (array[i] != data++) {
-+                      printf("data verification failed byte %lld\n", total_bytes);
-+                      return -1;
-+              }
-+      }
-+      return 0;
-+}
-+
-+static int do_poll(struct pollfd *fds)
-+{
-+      int ret;
-+
-+      do {
-+              ret = rpoll(fds, 1, poll_timeout);
-+      } while (!ret);
-+
-+      return ret == 1 ? 0 : ret;
-+}
-+
-+static int send_msg(int size)
-+{
-+      struct pollfd fds;
-+      int offset, ret;
-+
-+      if (verify)
-+              format_buf(buf, size);
-+
-+      if (use_async) {
-+              fds.fd = rs;
-+              fds.events = POLLOUT;
-+      }
-+
-+      for (offset = 0; offset < size; ) {
-+              if (use_async) {
-+                      ret = do_poll(&fds);
-+                      if (ret)
-+                              return ret;
-+              }
-+
-+              ret = rsend(rs, buf + offset, size - offset, flags);
-+              if (ret > 0) {
-+                      offset += ret;
-+              } else if (errno != EWOULDBLOCK && errno != EAGAIN) {
-+                      perror("rsend");
-+                      return ret;
-+              }
-+      }
-+
-+      return 0;
-+}
-+
-+static int send_xfer(int size)
-+{
-+      struct pollfd fds;
-+      int offset, ret;
-+
-+      if (verify)
-+              format_buf(buf, size - 1);
-+
-+      if (use_async) {
-+              fds.fd = rs;
-+              fds.events = POLLOUT;
-+      }
-+
-+      for (offset = 0; offset < size; ) {
-+              if (use_async) {
-+                      ret = do_poll(&fds);
-+                      if (ret)
-+                              return ret;
-+              }
-+
-+              ret = riowrite(rs, buf + offset, size - offset, offset, flags);
-+              if (ret > 0) {
-+                      offset += ret;
-+              } else if (errno != EWOULDBLOCK && errno != EAGAIN) {
-+                      perror("riowrite");
-+                      return ret;
-+              }
-+      }
-+
-+      return 0;
-+}
-+
-+static int recv_msg(int size)
-+{
-+      struct pollfd fds;
-+      int offset, ret;
-+
-+      if (use_async) {
-+              fds.fd = rs;
-+              fds.events = POLLIN;
-+      }
-+
-+      for (offset = 0; offset < size; ) {
-+              if (use_async) {
-+                      ret = do_poll(&fds);
-+                      if (ret)
-+                              return ret;
-+              }
-+
-+              ret = rrecv(rs, buf + offset, size - offset, flags);
-+              if (ret > 0) {
-+                      offset += ret;
-+              } else if (errno != EWOULDBLOCK && errno != EAGAIN) {
-+                      perror("rrecv");
-+                      return ret;
-+              }
-+      }
-+
-+      if (verify) {
-+              ret = verify_buf(buf, size);
-+              if (ret)
-+                      return ret;
-+      }
-+
-+      return 0;
-+}
-+
-+static int recv_xfer(int size, uint8_t marker)
-+{
-+      int ret;
-+
-+      while (*poll_byte != marker)
-+              ;
-+
-+      if (verify) {
-+              ret = verify_buf(buf, size - 1);
-+              if (ret)
-+                      return ret;
-+      }
-+
-+      return 0;
-+}
-+
-+static int sync_test(void)
-+{
-+      int ret;
-+
-+      ret = dst_addr ? send_msg(16) : recv_msg(16);
-+      if (ret)
-+              return ret;
-+
-+      return dst_addr ? recv_msg(16) : send_msg(16);
-+}
-+
-+static int run_test(void)
-+{
-+      int ret, i, t;
-+      off_t offset;
-+      uint8_t marker = 0;
-+
-+      poll_byte = buf + transfer_size - 1;
-+      *poll_byte = -1;
-+      offset = riomap(rs, buf, transfer_size, PROT_WRITE, 0, 0);
-+      if (offset ==  -1) {
-+              perror("riomap");
-+              ret = -1;
-+              goto out;
-+      }
-+      ret = sync_test();
-+      if (ret)
-+              goto out;
-+
-+      gettimeofday(&start, NULL);
-+      for (i = 0; i < iterations; i++) {
-+              if (dst_addr) {
-+                      for (t = 0; t < transfer_count - 1; t++) {
-+                              ret = send_xfer(transfer_size);
-+                              if (ret)
-+                                      goto out;
-+                      }
-+                      *poll_byte = (uint8_t) marker++;
-+                      ret = send_xfer(transfer_size);
-+                      if (ret)
-+                              goto out;
-+
-+                      ret = recv_xfer(transfer_size, marker++);
-+              } else {
-+                      ret = recv_xfer(transfer_size, marker++);
-+                      if (ret)
-+                              goto out;
-+
-+                      for (t = 0; t < transfer_count - 1; t++) {
-+                              ret = send_xfer(transfer_size);
-+                              if (ret)
-+                                      goto out;
-+                      }
-+                      *poll_byte = (uint8_t) marker++;
-+                      ret = send_xfer(transfer_size);
-+              }
-+              if (ret)
-+                      goto out;
-+      }
-+      gettimeofday(&end, NULL);
-+      show_perf();
-+      ret = riounmap(rs, buf, transfer_size);
-+
-+out:
-+      return ret;
-+}
-+
-+static void set_options(int rs)
-+{
-+      int val;
-+
-+      if (buffer_size) {
-+              rsetsockopt(rs, SOL_SOCKET, SO_SNDBUF, (void *) &buffer_size,
-+                          sizeof buffer_size);
-+              rsetsockopt(rs, SOL_SOCKET, SO_RCVBUF, (void *) &buffer_size,
-+                          sizeof buffer_size);
-+      } else {
-+              val = 1 << 19;
-+              rsetsockopt(rs, SOL_SOCKET, SO_SNDBUF, (void *) &val, sizeof val);
-+              rsetsockopt(rs, SOL_SOCKET, SO_RCVBUF, (void *) &val, sizeof val);
-+      }
-+
-+      val = 1;
-+      rsetsockopt(rs, IPPROTO_TCP, TCP_NODELAY, (void *) &val, sizeof(val));
-+      rsetsockopt(rs, SOL_RDMA, RDMA_IOMAPSIZE, (void *) &val, sizeof val);
-+
-+      if (flags & MSG_DONTWAIT)
-+              rfcntl(rs, F_SETFL, O_NONBLOCK);
-+
-+      /* Inline size based on experimental data */
-+      if (optimization == opt_latency) {
-+              val = 384;
-+              rsetsockopt(rs, SOL_RDMA, RDMA_INLINE, &val, sizeof val);
-+      } else if (optimization == opt_bandwidth) {
-+              val = 0;
-+              rsetsockopt(rs, SOL_RDMA, RDMA_INLINE, &val, sizeof val);
-+      }
-+}
-+
-+static int server_listen(void)
-+{
-+      struct addrinfo hints, *res;
-+      int val, ret;
-+
-+      memset(&hints, 0, sizeof hints);
-+      hints.ai_flags = RAI_PASSIVE;
-+      ret = getaddrinfo(src_addr, port, &hints, &res);
-+      if (ret) {
-+              perror("getaddrinfo");
-+              return ret;
-+      }
-+
-+      lrs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
-+      if (lrs < 0) {
-+              perror("rsocket");
-+              ret = lrs;
-+              goto free;
-+      }
-+
-+      val = 1;
-+      ret = rsetsockopt(lrs, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
-+      if (ret) {
-+              perror("rsetsockopt SO_REUSEADDR");
-+              goto close;
-+      }
-+
-+      ret = rbind(lrs, res->ai_addr, res->ai_addrlen);
-+      if (ret) {
-+              perror("rbind");
-+              goto close;
-+      }
-+
-+      ret = rlisten(lrs, 1);
-+      if (ret)
-+              perror("rlisten");
-+
-+close:
-+      if (ret)
-+              rclose(lrs);
-+free:
-+      freeaddrinfo(res);
-+      return ret;
-+}
-+
-+static int server_connect(void)
-+{
-+      struct pollfd fds;
-+      int ret = 0;
-+
-+      set_options(lrs);
-+      do {
-+              if (use_async) {
-+                      fds.fd = lrs;
-+                      fds.events = POLLIN;
-+
-+                      ret = do_poll(&fds);
-+                      if (ret) {
-+                              perror("rpoll");
-+                              return ret;
-+                      }
-+              }
-+
-+              rs = raccept(lrs, NULL, 0);
-+      } while (rs < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
-+      if (rs < 0) {
-+              perror("raccept");
-+              return rs;
-+      }
-+
-+      set_options(rs);
-+      return ret;
-+}
-+
-+static int client_connect(void)
-+{
-+      struct addrinfo *res;
-+      struct pollfd fds;
-+      int ret, err;
-+      socklen_t len;
-+
-+      ret = getaddrinfo(dst_addr, port, NULL, &res);
-+      if (ret) {
-+              perror("getaddrinfo");
-+              return ret;
-+      }
-+
-+      rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
-+      if (rs < 0) {
-+              perror("rsocket");
-+              ret = rs;
-+              goto free;
-+      }
-+
-+      set_options(rs);
-+      /* TODO: bind client to src_addr */
-+
-+      ret = rconnect(rs, res->ai_addr, res->ai_addrlen);
-+      if (ret && (errno != EINPROGRESS)) {
-+              perror("rconnect");
-+              goto close;
-+      }
-+
-+      if (ret && (errno == EINPROGRESS)) {
-+              fds.fd = rs;
-+              fds.events = POLLOUT;
-+              ret = do_poll(&fds);
-+              if (ret)
-+                      goto close;
-+
-+              len = sizeof err;
-+              ret = rgetsockopt(rs, SOL_SOCKET, SO_ERROR, &err, &len);
-+              if (ret)
-+                      goto close;
-+              if (err) {
-+                      ret = -1;
-+                      errno = err;
-+                      perror("async rconnect");
-+              }
-+      }
-+
-+close:
-+      if (ret)
-+              rclose(rs);
-+free:
-+      freeaddrinfo(res);
-+      return ret;
-+}
-+
-+static int run(void)
-+{
-+      int i, ret = 0;
-+
-+      buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
-+      if (!buf) {
-+              perror("malloc");
-+              return -1;
-+      }
-+
-+      if (!dst_addr) {
-+              ret = server_listen();
-+              if (ret)
-+                      goto free;
-+      }
-+
-+      printf("%-10s%-8s%-8s%-8s%-8s%8s %10s%13s\n",
-+             "name", "bytes", "xfers", "iters", "total", "time", "Gb/sec", "usec/xfer");
-+      if (!custom) {
-+              optimization = opt_latency;
-+              ret = dst_addr ? client_connect() : server_connect();
-+              if (ret)
-+                      goto free;
-+
-+              for (i = 0; i < TEST_CNT; i++) {
-+                      if (test_size[i].option > size_option)
-+                              continue;
-+                      init_latency_test(test_size[i].size);
-+                      run_test();
-+              }
-+              rshutdown(rs, SHUT_RDWR);
-+              rclose(rs);
-+
-+              optimization = opt_bandwidth;
-+              ret = dst_addr ? client_connect() : server_connect();
-+              if (ret)
-+                      goto free;
-+              for (i = 0; i < TEST_CNT; i++) {
-+                      if (test_size[i].option > size_option)
-+                              continue;
-+                      init_bandwidth_test(test_size[i].size);
-+                      run_test();
-+              }
-+      } else {
-+              ret = dst_addr ? client_connect() : server_connect();
-+              if (ret)
-+                      goto free;
-+
-+              ret = run_test();
-+      }
-+
-+      rshutdown(rs, SHUT_RDWR);
-+      rclose(rs);
-+free:
-+      free(buf);
-+      return ret;
-+}
-+
-+static int set_test_opt(char *optarg)
-+{
-+      if (strlen(optarg) == 1) {
-+              switch (optarg[0]) {
-+              case 'a':
-+                      use_async = 1;
-+                      break;
-+              case 'b':
-+                      flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
-+                      break;
-+              case 'n':
-+                      flags |= MSG_DONTWAIT;
-+                      break;
-+              case 'v':
-+                      verify = 1;
-+                      break;
-+              default:
-+                      return -1;
-+              }
-+      } else {
-+              if (!strncasecmp("async", optarg, 5)) {
-+                      use_async = 1;
-+              } else if (!strncasecmp("block", optarg, 5)) {
-+                      flags = (flags & ~MSG_DONTWAIT) | MSG_WAITALL;
-+              } else if (!strncasecmp("nonblock", optarg, 8)) {
-+                      flags |= MSG_DONTWAIT;
-+              } else if (!strncasecmp("verify", optarg, 6)) {
-+                      verify = 1;
-+              } else {
-+                      return -1;
-+              }
-+      }
-+      return 0;
-+}
-+
-+int main(int argc, char **argv)
-+{
-+      int op, ret;
-+
-+      while ((op = getopt(argc, argv, "s:b:B:I:C:S:p:T:")) != -1) {
-+              switch (op) {
-+              case 's':
-+                      dst_addr = optarg;
-+                      break;
-+              case 'b':
-+                      src_addr = optarg;
-+                      break;
-+              case 'B':
-+                      buffer_size = atoi(optarg);
-+                      break;
-+              case 'I':
-+                      custom = 1;
-+                      iterations = atoi(optarg);
-+                      break;
-+              case 'C':
-+                      custom = 1;
-+                      transfer_count = atoi(optarg);
-+                      break;
-+              case 'S':
-+                      if (!strncasecmp("all", optarg, 3)) {
-+                              size_option = 1;
-+                      } else {
-+                              custom = 1;
-+                              transfer_size = atoi(optarg);
-+                      }
-+                      break;
-+              case 'p':
-+                      port = optarg;
-+                      break;
-+              case 'T':
-+                      if (!set_test_opt(optarg))
-+                              break;
-+                      /* invalid option - fall through */
-+              default:
-+                      printf("usage: %s\n", argv[0]);
-+                      printf("\t[-s server_address]\n");
-+                      printf("\t[-b bind_address]\n");
-+                      printf("\t[-B buffer_size]\n");
-+                      printf("\t[-I iterations]\n");
-+                      printf("\t[-C transfer_count]\n");
-+                      printf("\t[-S transfer_size or all]\n");
-+                      printf("\t[-p port_number]\n");
-+                      printf("\t[-T test_option]\n");
-+                      printf("\t    a|async - asynchronous operation (use poll)\n");
-+                      printf("\t    b|blocking - use blocking calls\n");
-+                      printf("\t    n|nonblocking - use nonblocking calls\n");
-+                      printf("\t    v|verify - verify data\n");
-+                      exit(1);
-+              }
-+      }
-+
-+      if (!(flags & MSG_DONTWAIT))
-+              poll_timeout = -1;
-+
-+      ret = run();
-+      return ret;
-+}
-diff --git a/man/riostream.1 b/man/riostream.1
-new file mode 100644
-index 0000000..14e77fa
---- /dev/null
-+++ b/man/riostream.1
-@@ -0,0 +1,64 @@
-+.TH "RIOSTREAM" 1 "2012-10-24" "librdmacm" "librdmacm" librdmacm
-+.SH NAME
-+riostream \- zero-copy streaming over RDMA ping-pong test.
-+.SH SYNOPSIS
-+.sp
-+.nf
-+\fIriostream\fR [-s server_address] [-b bind_address] [-B buffer_size]
-+                      [-I iterations] [-C transfer_count]
-+                      [-S transfer_size] [-p server_port] [-T test_option]
-+.fi
-+.SH "DESCRIPTION"
-+Uses the streaming over RDMA protocol (rsocket) to connect and exchange
-+data between a client and server application.
-+.SH "OPTIONS"
-+.TP
-+\-s server_address
-+The network name or IP address of the server system listening for
-+connections.  The used name or address must route over an RDMA device.
-+This option must be specified by the client.
-+.TP
-+\-b bind_address
-+The local network address to bind to.
-+.TP
-+\-B buffer_size
-+Indicates the size of the send and receive network buffers.
-+.TP
-+\-I iterations
-+The number of times that the specified number of messages will be
-+exchanged between the client and server.  (default 1000)
-+.TP
-+\-C transfer_count
-+The number of messages to transfer from the client to the server and
-+back again on each iteration.  (default 1)
-+.TP
-+\-S transfer_size
-+The size of each send transfer, in bytes.  (default 1000)  If 'all'
-+is specified, rstream will run a series of tests of various sizes.
-+.TP
-+\-p server_port
-+The server's port number.
-+.TP
-+\-T test_option
-+Specifies test parameters.  Available options are:
-+.P
-+a | async - uses asynchronous operation (e.g. select / poll)
-+.P
-+b | blocking - uses blocking calls
-+.P
-+n | nonblocking - uses non-blocking calls
-+.P
-+v | verify - verifies data transfers
-+.SH "NOTES"
-+Basic usage is to start riostream on a server system, then run
-+riostream -s server_name on a client system.  By default, riostream
-+will run a series of latency and bandwidth performance tests.
-+Specifying a different iterations, transfer_count, or transfer_size
-+will run a user customized test using default values where none
-+have been specified.
-+.P
-+Because this test maps RDMA resources to userspace, users must ensure
-+that they have available system resources and permissions.  See the
-+libibverbs README file for additional details.
-+.SH "SEE ALSO"
-+rdma_cm(7) rstream(1)
diff --git a/patches/rs-iomap b/patches/rs-iomap
deleted file mode 100644 (file)
index 6fc026c..0000000
+++ /dev/null
@@ -1,1179 +0,0 @@
-Bottom: daf53db464152f40dc8d6f2c99844510b03f8567
-Top:    f577125f6a6e9da1cf989ea2c4e933c66a84cf05
-Author: Sean Hefty <sean.hefty@intel.com>
-Date:   2012-10-21 14:16:03 -0700
-
-rsocket: Add APIs for direct data placement
-
-We introduce rsocket extensions for supporting direct
-data placement (also known as zero copy).  Direct data
-placement avoids data copies into network buffers when
-sending or receiving data.  This patch implements zero
-copies on the receive side, but adds some basic framework for
-supporting it on the sending side.
-
-Integrating zero copy support into the existing socket APIs
-is difficult to achieve when the sockets are set as
-nonblocking.  Any such implementation is likely to be unusable
-in practice.  The problem stems from the fact that socket
-operations are synchronous in nature.  Support for asynchronous
-operations is limited to connection establishment.
-
-Therefore we introduce new calls to handle direct data placement.
-The use of the new calls is optional and does not affect the
-use of the existing calls.  An attempt is made to have the new
-routines integrate naturally with the existing APIs.  The new
-functions are: riomap, riounmap, and riowrite.  The basic operation
-can be described as follows:
-
-1. App A calls riomap to register a data buffer with the local
-   RDMA device.  Riomap returns an off_t offset value that
-   corresponds to the registered data buffer.  The app may
-   select the offset value.
-2. Rsockets will transmit an internal message to the remote
-   peer with information about the registration.  This exchange
-   is hidden from the applications.
-3. App A sends a notification message to app B indicating that
-   the remote iomapped buffer is now available to receive data.
-4. App B calls riowrite to transmit data directly into the
-   riomapped data buffer.
-5. App B sends a notification message to app A indicating that
-   data is available in the mapped buffer.
-6. After all transfers are complete, app A calls riounmap to
-   deregister its data buffer.
-
-Riomap and riounmap are functionally equivalent to RDMA
-memory registration and deregistration routines.  They are loosely
-based on the mmap and munmap APIs.
-
-off_t riomap(int socket, void *buf, size_t len,
-            int prot, int flags, off_t offset)
-
-Riomap registers an application buffer with the RDMA hardware
-associated with an rsocket.  The buffer is registered either for
-local only access (PROT_NONE) or for remote write access (PROT_WRITE).
-When registered for remote access, the buffer is mapped to a given
-offset.  The offset is either provided by the user, or if the user
-selects -1 for the offset, rsockets selects one.  The remote peer may
-access an iomapped buffer directly by specifying the correct offset.
-The mapping is not guaranteed to be available until after the remote
-peer receives a data transfer initiated after riomap has completed.
-
-int riounmap(int socket, void *buf, size_t len)
-
-Riounmap removes the mapping between a buffer and an rsocket.
-
-size_t riowrite(int socket, const void *buf, size_t count,
-               off_t offset, int flags)
-
-Riowrite allows an application to transfer data over an rsocket
-directly into a remotely iomapped buffer.  The remote buffer is specified
-through an offset parameter, which corresponds to a remote iomapped buffer.
-From the sender's perspective, riowrite behaves similar to rwrite.  From
-a receiver's view, riowrite transfers are silently redirected into a pre-
-determined data buffer.  Data is received automatically, and the receiver
-is not informed of the transfer.  However, iowrite data is still considered
-part of the data stream, such that iowrite data will be written before a
-subsequent transfer is received.  A message sent immediately after
-initiating an iowrite may be used to notify the receiver of the iowrite.
-
-It should be noted that the current implementation primarily focused
-on being functional for evaluation purposes.  Some checks have been
-deferred for subsequent patches, and performance is currently limited
-by linear lookups.
-
-Signed-off-by: Sean Hefty <sean.hefty@intel.com>
-
-
----
-
-diff --git a/docs/rsocket b/docs/rsocket
-index 5399f6c..1484f65 100644
---- a/docs/rsocket
-+++ b/docs/rsocket
-@@ -110,11 +110,11 @@ Bits    Message             Meaning of
- 31:29    Type               Bits 28:0
- 000    Data Transfer     bytes transfered
- 001    reserved
--010    reserved
-+010    reserved - used internally, available for future use
- 011    reserved
- 100    Credit Update     received credits granted
- 101    reserved
--110    reserved
-+110    Iomap Updated     index of updated entry
- 111    Control           control message type
- Data Transfer
-@@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in use.  This is done
- by tracking when a receive buffer referenced by a remote target SGL has been
- filled.
-+Iomap Updated
-+Used to indicate that a remote iomap entry was updated.  The updated entry
-+contains the offset value associated with an address, length, and rkey.  Once
-+an iomap has been updated, the local application can issue directed IO
-+transfers against the corresponding remote buffer.
-+
- Control Message - DISCONNECT
- Indicates that the rsocket connection has been fully disconnected and will no
- longer send or receive data.  Data received before the disconnect message was
-@@ -142,3 +148,44 @@ Control Message - SHUTDOWN
- Indicates that the remote rsocket has shutdown the send side of its
- connection.  The recipient of a shutdown message will no longer accept
- incoming data, but may still transfer outbound data.
-+
-+
-+Iomapped Buffers
-+----------------
-+Rsockets allows for zero-copy transfers using what it refers to as iomapped
-+buffers.  Iomapping and direct data placement (zero-copy) transfers are done
-+using rsocket specific extensions.  The general operation is similar to
-+that used for normal data transfers described above.
-+
-+   host A                   host B
-+                          remote iomap                      
-+  target iomap  <-----------  [  ]
-+     [  ] ------
-+     [  ] --    ------  iomapped buffer(s)         
-+            --        ----->  +--+
-+              --              |  |
-+                --            |  |
-+                  --          |  |
-+                    --        +--+
-+                      --       
-+                        --->  +--+
-+                              |  |
-+                              |  |
-+                              +--+
-+
-+The remote iomap contains the address, size, and rkey of the target iomap.  As
-+the applicaton maps buffers host B to a given rsocket, rsockets will issue an RDMA
-+write against one of the entries in the target iomap on host A.  The
-+updated entry will reference an available iomapped buffer.  Immediate data
-+included with the RDMA write will indicate to host A that a target iomap
-+has been updated.
-+
-+When host A wishes to transfer directly into an iomapped buffer, it will check
-+its target iomap for an offset corresponding to a remotely mapped buffer.  A
-+matching iomap entry will contain the address, size, and rkey of the target
-+buffer on host B.  Host A will then issue an RDMA operation against the
-+registered remote data buffer.
-+
-+From host A's perspective, the transfer appears as a normal send/write
-+operation, with the data stream redirected directly into the receiving
-+application's buffer.
-diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h
-index 65feda9..f220c13 100644
---- a/include/rdma/rsocket.h
-+++ b/include/rdma/rsocket.h
-@@ -1,5 +1,5 @@
- /*
-- * Copyright (c) 2011 Intel Corporation.  All rights reserved.
-+ * Copyright (c) 2011-2012 Intel Corporation.  All rights reserved.
-  *
-  * This software is available to you under a choice of one of two
-  * licenses.  You may choose to be licensed under the terms of the GNU
-@@ -39,6 +39,7 @@
- #include <errno.h>
- #include <poll.h>
- #include <sys/select.h>
-+#include <sys/mman.h>
- #ifdef __cplusplus
- extern "C" {
-@@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen);
- enum {
-       RDMA_SQSIZE,
-       RDMA_RQSIZE,
--      RDMA_INLINE
-+      RDMA_INLINE,
-+      RDMA_IOMAPSIZE
- };
- int rsetsockopt(int socket, int level, int optname,
-@@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname,
-               void *optval, socklen_t *optlen);
- int rfcntl(int socket, int cmd, ... /* arg */ );
-+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset);
-+int riounmap(int socket, void *buf, size_t len);
-+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags);
-+
- #ifdef __cplusplus
- }
- #endif
-diff --git a/man/rsocket.7 b/man/rsocket.7
-index 2ed5ca4..bc5bb10 100644
---- a/man/rsocket.7
-+++ b/man/rsocket.7
-@@ -6,9 +6,9 @@ rsocket \- RDMA socket API
- .SH "DESCRIPTION"
- RDMA socket API and protocol
- .SH "NOTES"
--rsockets is a protocol over RDMA that supports a socket-level API
--for applications.  rsocket APIs are intended to match the behavior
--of corresponding socket calls, except where noted.  rsocket
-+Rsockets is a protocol over RDMA that supports a socket-level API
-+for applications.  Rsocket APIs are intended to match the behavior
-+of corresponding socket calls, except where noted.  Rsocket
- functions match the name and function signature of socket calls,
- with the exception that all function calls are prefixed with an 'r'.
- .P
-@@ -30,7 +30,7 @@ rgetpeername, rgetsockname
- .P
- rsetsockopt, rgetsockopt, rfcntl
- .P
--Functions take the same parameters as that use for sockets.  The
-+Functions take the same parameters as that used for sockets.  The
- follow capabilities and flags are supported at this time:
- .P
- PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG
-@@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF
- .P
- O_NONBLOCK
- .P
-+Rsockets provides extensions beyond normal socket routines that
-+allow for direct placement of data into an application's buffer.
-+This is also known as zero-copy support, since data is sent and
-+received directly, bypassing copies into network controlled buffers.
-+The following calls and options support direct data placement.
-+.P
-+riomap, riounmap, riowrite
-+.TP
-+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
-+.TP
-+Riomap registers an application buffer with the RDMA hardware
-+associated with an rsocket.  The buffer is registered either for
-+local only access (PROT_NONE) or for remote write access (PROT_WRITE).
-+When registered for remote access, the buffer is mapped to a given
-+offset.  The offset is either provided by the user, or if the user
-+selects -1 for the offset, rsockets selects one.  The remote peer may
-+access an iomapped buffer directly by specifying the correct offset.
-+The mapping is not guaranteed to be available until after the remote
-+peer receives a data transfer initiated after riomap has completed.
-+.P
-+riounmap
-+.TP
-+int riounmap(int socket, void *buf, size_t len)
-+.TP
-+Riounmap removes the mapping between a buffer and an rsocket.
-+.P
-+riowrite
-+.TP
-+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
-+.TP
-+Riowrite allows an application to transfer data over an rsocket
-+directly into a remotely iomapped buffer.  The remote buffer is specified
-+through an offset parameter, which corresponds to a remote iomapped buffer.
-+From the sender's perspective, riowrite behaves similar to rwrite.  From
-+a receiver's view, riowrite transfers are silently redirected into a pre-
-+determined data buffer.  Data is received automatically, and the receiver
-+is not informed of the transfer.  However, iowrite data is still considered
-+part of the data stream, such that iowrite data will be written before a
-+subsequent transfer is received.  A message sent immediately after initiating
-+an iowrite may be used to notify the receiver of the iowrite.
-+.P
- In addition to standard socket options, rsockets supports options
- specific to RDMA devices and protocols.  These options are accessible
- through rsetsockopt using SOL_RDMA option level.
-@@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue.
- RDMA_RQSIZE - Integer size of the underlying receive queue.
- .TP
- RDMA_INLINE - Integer size of inline data.
-+.TP
-+RDMA_IOMAPSIZE - Integer number of remote IO mappings supported
- .P
- Note that rsockets fd's cannot be passed into non-rsocket calls.  For
- applications which must mix rsocket fd's with standard socket fd's or
-@@ -84,6 +127,8 @@ rqsize_default - default size of receive queue
- .P
- inline_default - default size of inline data
- .P
-+iomap_size - default size of remote iomapping table
-+.P
- If configuration files are not available, rsockets uses internal defaults.
- .SH "SEE ALSO"
- rdma_cm(7)
-diff --git a/src/indexer.h b/src/indexer.h
-index 26e7f98..0c5f388 100644
---- a/src/indexer.h
-+++ b/src/indexer.h
-@@ -31,6 +31,9 @@
-  *
-  */
-+#if !defined(INDEXER_H)
-+#define INDEXER_H
-+
- #if HAVE_CONFIG_H
- #  include <config.h>
- #endif /* HAVE_CONFIG_H */
-@@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index)
-       return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ?
-               idm_at(idm, index) : NULL;
- }
-+
-+typedef struct _dlist_entry {
-+      struct _dlist_entry     *next;
-+      struct _dlist_entry     *prev;
-+}     dlist_entry;
-+
-+static inline void dlist_init(dlist_entry *head)
-+{
-+      head->next = head;
-+      head->prev = head;
-+}
-+
-+static inline int dlist_empty(dlist_entry *head)
-+{
-+      return head->next == head;
-+}
-+
-+static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head)
-+{
-+      item->next = head->next;
-+      item->prev = head;
-+      head->next->prev = item;
-+      head->next = item;
-+}
-+
-+static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head)
-+{
-+      dlist_insert_after(item, head->prev);
-+}
-+
-+#define dlist_insert_head dlist_insert_after
-+#define dlist_insert_tail dlist_insert_before
-+
-+static inline void dlist_remove(dlist_entry *item)
-+{
-+      item->prev->next = item->next;
-+      item->next->prev = item->prev;
-+}
-+
-+#endif /* INDEXER_H */
-diff --git a/src/librdmacm.map b/src/librdmacm.map
-index 5c317a3..d5ef736 100644
---- a/src/librdmacm.map
-+++ b/src/librdmacm.map
-@@ -63,5 +63,8 @@ RDMACM_1.0 {
-               rselect;
-               rdma_get_src_port;
-               rdma_get_dst_port;
-+              riomap;
-+              riounmap;
-+              riowrite;
-       local: *;
- };
-diff --git a/src/rsocket.c b/src/rsocket.c
-index cc5effe..74dbcc7 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -55,6 +55,7 @@
- #define RS_OLAP_START_SIZE 2048
- #define RS_MAX_TRANSFER 65536
-+#define RS_SNDLOWAT 64
- #define RS_QP_MAX_SIZE 0xFFFE
- #define RS_QP_CTRL_SIZE 4
- #define RS_CONN_RETRIES 6
-@@ -62,6 +63,7 @@
- static struct index_map idm;
- static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-+static uint16_t def_iomap_size = 0;
- static uint16_t def_inline = 64;
- static uint16_t def_sqsize = 384;
- static uint16_t def_rqsize = 384;
-@@ -76,19 +78,22 @@ static uint32_t polling_time = 10;
-  * bit 29: more data, 0 - end of transfer, 1 - more data available
-  *
-  * for data transfers:
-- * bits [28:0]: bytes transfered, 0 = 1 GB
-+ * bits [28:0]: bytes transferred
-  * for control messages:
-+ * SGL, CTRL
-  * bits [28-0]: receive credits granted
-+ * IOMAP_SGL
-+ * bits [28-16]: reserved, bits [15-0]: index
-  */
- enum {
-       RS_OP_DATA,
-       RS_OP_RSVD_DATA_MORE,
--      RS_OP_RSVD_DRA,
-+      RS_OP_WRITE, /* opcode is not transmitted over the network */
-       RS_OP_RSVD_DRA_MORE,
-       RS_OP_SGL,
-       RS_OP_RSVD,
--      RS_OP_RSVD_DRA_SGL,
-+      RS_OP_IOMAP_SGL,
-       RS_OP_CTRL
- };
- #define rs_msg_set(op, data)  ((op << 29) | (uint32_t) (data))
-@@ -111,15 +116,30 @@ struct rs_sge {
-       uint32_t length;
- };
--#define RS_MIN_INLINE    (sizeof(struct rs_sge))
--#define rs_host_is_net() (1 == htonl(1))
--#define RS_CONN_FLAG_NET 1
-+struct rs_iomap {
-+      uint64_t offset;
-+      struct rs_sge sge;
-+};
-+
-+struct rs_iomap_mr {
-+      uint64_t offset;
-+      struct ibv_mr *mr;
-+      dlist_entry entry;
-+      atomic_t refcnt;
-+      int index;      /* -1 if mapping is local and not in iomap_list */
-+};
-+
-+#define RS_MIN_INLINE      (sizeof(struct rs_sge))
-+#define rs_host_is_net()   (1 == htonl(1))
-+#define RS_CONN_FLAG_NET   (1 << 0)
-+#define RS_CONN_FLAG_IOMAP (1 << 1)
- struct rs_conn_data {
-       uint8_t           version;
-       uint8_t           flags;
-       uint16_t          credits;
--      uint32_t          reserved2;
-+      uint8_t           reserved[3];
-+      uint8_t           target_iomap_size;
-       struct rs_sge     target_sgl;
-       struct rs_sge     data_buf;
- };
-@@ -155,6 +175,7 @@ struct rsocket {
-       fastlock_t        rlock;
-       fastlock_t        cq_lock;
-       fastlock_t        cq_wait_lock;
-+      fastlock_t        iomap_lock;
-       int               opts;
-       long              fd_flags;
-@@ -186,10 +207,19 @@ struct rsocket {
-       int               remote_sge;
-       struct rs_sge     remote_sgl;
-+      struct rs_sge     remote_iomap;
-+
-+      struct rs_iomap_mr *remote_iomappings;
-+      dlist_entry       iomap_list;
-+      dlist_entry       iomap_queue;
-+      int               iomap_pending;
-       struct ibv_mr    *target_mr;
-       int               target_sge;
--      volatile struct rs_sge    target_sgl[RS_SGL_SIZE];
-+      int               target_iomap_size;
-+      void             *target_buffer_list;
-+      volatile struct rs_sge    *target_sgl;
-+      struct rs_iomap  *target_iomap;
-       uint32_t          rbuf_size;
-       struct ibv_mr    *rmr;
-@@ -201,6 +231,18 @@ struct rsocket {
-       uint8_t           *sbuf;
- };
-+static int rs_value_to_scale(int value, int bits)
-+{
-+      return value <= (1 << (bits - 1)) ?
-+             value : (1 << (bits - 1)) | (value >> bits);
-+}
-+
-+static int rs_scale_to_value(int value, int bits)
-+{
-+      return value <= (1 << (bits - 1)) ?
-+             value : (value & ~(1 << (bits - 1))) << bits;
-+}
-+
- void rs_configure(void)
- {
-       FILE *f;
-@@ -247,9 +289,17 @@ void rs_configure(void)
-       if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
-               fscanf(f, "%u", &def_wmem);
-               fclose(f);
-+              if (def_wmem < RS_SNDLOWAT)
-+                      def_wmem = RS_SNDLOWAT << 1;
-+      }
-+
-+      if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
-+              fscanf(f, "%hu", &def_iomap_size);
-+              fclose(f);
--              if (def_wmem < 1)
--                      def_wmem = 1;
-+              /* round to supported values */
-+              def_iomap_size = (uint8_t) rs_value_to_scale(
-+                      (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
-       }
-       init = 1;
- out:
-@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-               rs->sq_size = inherited_rs->sq_size;
-               rs->rq_size = inherited_rs->rq_size;
-               rs->ctrl_avail = inherited_rs->ctrl_avail;
-+              rs->target_iomap_size = inherited_rs->target_iomap_size;
-       } else {
-               rs->sbuf_size = def_wmem;
-               rs->rbuf_size = def_mem;
-@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
-               rs->sq_size = def_sqsize;
-               rs->rq_size = def_rqsize;
-               rs->ctrl_avail = RS_QP_CTRL_SIZE;
-+              rs->target_iomap_size = def_iomap_size;
-       }
-       fastlock_init(&rs->slock);
-       fastlock_init(&rs->rlock);
-       fastlock_init(&rs->cq_lock);
-       fastlock_init(&rs->cq_wait_lock);
-+      fastlock_init(&rs->iomap_lock);
-+      dlist_init(&rs->iomap_list);
-+      dlist_init(&rs->iomap_queue);
-       return rs;
- }
-@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
- static int rs_init_bufs(struct rsocket *rs)
- {
-+      size_t len;
-+
-       rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
-       if (!rs->rmsg)
-               return -1;
-@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
-       if (!rs->smr)
-               return -1;
--      rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
--                                     sizeof(rs->target_sgl));
-+      len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
-+            sizeof(*rs->target_iomap) * rs->target_iomap_size;
-+      rs->target_buffer_list = malloc(len);
-+      if (!rs->target_buffer_list)
-+              return -1;
-+
-+      rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
-       if (!rs->target_mr)
-               return -1;
-+      memset(rs->target_buffer_list, 0, len);
-+      rs->target_sgl = rs->target_buffer_list;
-+      if (rs->target_iomap_size)
-+              rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
-+
-       rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
-       if (!rs->rbuf)
-               return -1;
-@@ -452,6 +519,35 @@ static int rs_create_ep(struct rsocket *rs)
-       return 0;
- }
-+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
-+{
-+      if (atomic_dec(&iomr->refcnt))
-+              return;
-+
-+      dlist_remove(&iomr->entry);
-+      ibv_dereg_mr(iomr->mr);
-+      if (iomr->index >= 0)
-+              iomr->mr = NULL;
-+      else
-+              free(iomr);
-+}
-+
-+static void rs_free_iomappings(struct rsocket *rs)
-+{
-+      struct rs_iomap_mr *iomr;
-+
-+      while (!dlist_empty(&rs->iomap_list)) {
-+              iomr = container_of(rs->iomap_list.next,
-+                                  struct rs_iomap_mr, entry);
-+              riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
-+      }
-+      while (!dlist_empty(&rs->iomap_queue)) {
-+              iomr = container_of(rs->iomap_queue.next,
-+                                  struct rs_iomap_mr, entry);
-+              riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
-+      }
-+}
-+
- static void rs_free(struct rsocket *rs)
- {
-       if (rs->index >= 0)
-@@ -472,15 +568,20 @@ static void rs_free(struct rsocket *rs)
-               free(rs->rbuf);
-       }
--      if (rs->target_mr)
--              rdma_dereg_mr(rs->target_mr);
-+      if (rs->target_buffer_list) {
-+              if (rs->target_mr)
-+                      rdma_dereg_mr(rs->target_mr);
-+              free(rs->target_buffer_list);
-+      }
-       if (rs->cm_id) {
-+              rs_free_iomappings(rs);
-               if (rs->cm_id->qp)
-                       rdma_destroy_qp(rs->cm_id);
-               rdma_destroy_id(rs->cm_id);
-       }
-+      fastlock_destroy(&rs->iomap_lock);
-       fastlock_destroy(&rs->cq_wait_lock);
-       fastlock_destroy(&rs->cq_lock);
-       fastlock_destroy(&rs->rlock);
-@@ -492,9 +593,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param,
-                            struct rs_conn_data *conn)
- {
-       conn->version = 1;
--      conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
-+      conn->flags = RS_CONN_FLAG_IOMAP |
-+                    (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
-       conn->credits = htons(rs->rq_size);
--      conn->reserved2 = 0;
-+      memset(conn->reserved, 0, sizeof conn->reserved);
-+      conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
-       conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
-       conn->target_sgl.length = htonl(RS_SGL_SIZE);
-@@ -518,6 +621,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
-           (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
-               rs->opts = RS_OPT_SWAP_SGL;
-+      if (conn->flags & RS_CONN_FLAG_IOMAP) {
-+              rs->remote_iomap.addr = rs->remote_sgl.addr +
-+                                      sizeof(rs->remote_sgl) * rs->remote_sgl.length;
-+              rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
-+              rs->remote_iomap.key = rs->remote_sgl.key;
-+      }
-+
-       rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
-       rs->target_sgl[0].length = ntohl(conn->data_buf.length);
-       rs->target_sgl[0].key = ntohl(conn->data_buf.key);
-@@ -753,7 +863,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
-       return rs_do_connect(rs);
- }
--static int rs_post_write(struct rsocket *rs,
-+static int rs_post_write_msg(struct rsocket *rs,
-                        struct ibv_sge *sgl, int nsge,
-                        uint32_t imm_data, int flags,
-                        uint64_t addr, uint32_t rkey)
-@@ -773,6 +883,25 @@ static int rs_post_write(struct rsocket *rs,
-       return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
- }
-+static int rs_post_write(struct rsocket *rs,
-+                       struct ibv_sge *sgl, int nsge,
-+                       uint64_t wr_id, int flags,
-+                       uint64_t addr, uint32_t rkey)
-+{
-+      struct ibv_send_wr wr, *bad;
-+
-+      wr.wr_id = wr_id;
-+      wr.next = NULL;
-+      wr.sg_list = sgl;
-+      wr.num_sge = nsge;
-+      wr.opcode = IBV_WR_RDMA_WRITE;
-+      wr.send_flags = flags;
-+      wr.wr.rdma.remote_addr = addr;
-+      wr.wr.rdma.rkey = rkey;
-+
-+      return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
-+}
-+
- /*
-  * Update target SGE before sending data.  Otherwise the remote side may
-  * update the entry before we do.
-@@ -799,8 +928,35 @@ static int rs_write_data(struct rsocket *rs,
-                       rs->target_sge = 0;
-       }
--      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
--                           flags, addr, rkey);
-+      return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
-+                               flags, addr, rkey);
-+}
-+
-+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
-+                         struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
-+{
-+      uint64_t addr;
-+
-+      rs->sqe_avail--;
-+      rs->sbuf_bytes_avail -= length;
-+
-+      addr = iom->sge.addr + offset - iom->offset;
-+      return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
-+                           flags, addr, iom->sge.key);
-+}
-+
-+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
-+                        struct ibv_sge *sgl, int nsge, int flags)
-+{
-+      uint64_t addr;
-+
-+      rs->sseq_no++;
-+      rs->sqe_avail--;
-+      rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
-+
-+      addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
-+      return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-+                               flags, addr, rs->remote_iomap.key);
- }
- static uint32_t rs_sbuf_left(struct rsocket *rs)
-@@ -831,12 +987,12 @@ static void rs_send_credits(struct rsocket *rs)
-               ibsge.lkey = 0;
-               ibsge.length = sizeof(sge);
--              rs_post_write(rs, &ibsge, 1,
--                            rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
--                            IBV_SEND_INLINE,
--                            rs->remote_sgl.addr +
--                            rs->remote_sge * sizeof(struct rs_sge),
--                            rs->remote_sgl.key);
-+              rs_post_write_msg(rs, &ibsge, 1,
-+                                rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-+                                IBV_SEND_INLINE,
-+                                rs->remote_sgl.addr +
-+                                rs->remote_sge * sizeof(struct rs_sge),
-+                                rs->remote_sgl.key);
-               rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
-               rs->rbuf_free_offset += rs->rbuf_size >> 1;
-@@ -845,8 +1001,9 @@ static void rs_send_credits(struct rsocket *rs)
-               if (++rs->remote_sge == rs->remote_sgl.length)
-                       rs->remote_sge = 0;
-       } else {
--              rs_post_write(rs, NULL, 0,
--                            rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
-+              rs_post_write_msg(rs, NULL, 0,
-+                                rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-+                                0, 0, 0);
-       }
- }
-@@ -880,6 +1037,9 @@ static int rs_poll_cq(struct rsocket *rs)
-                       case RS_OP_SGL:
-                               rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
-                               break;
-+                      case RS_OP_IOMAP_SGL:
-+                              /* The iomap was updated, that's nice to know. */
-+                              break;
-                       case RS_OP_CTRL:
-                               if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
-                                       rs->state = rs_disconnected;
-@@ -888,6 +1048,9 @@ static int rs_poll_cq(struct rsocket *rs)
-                                       rs->state &= ~rs_connect_rd;
-                               }
-                               break;
-+                      case RS_OP_WRITE:
-+                              /* We really shouldn't be here. */
-+                              break;
-                       default:
-                               rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
-                               rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
-@@ -905,6 +1068,10 @@ static int rs_poll_cq(struct rsocket *rs)
-                               if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
-                                       rs->state = rs_disconnected;
-                               break;
-+                      case RS_OP_IOMAP_SGL:
-+                              rs->sqe_avail++;
-+                              rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
-+                              break;
-                       default:
-                               rs->sqe_avail++;
-                               rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
-@@ -1046,7 +1213,7 @@ static int rs_poll_all(struct rsocket *rs)
-  */
- static int rs_can_send(struct rsocket *rs)
- {
--      return rs->sqe_avail && rs->sbuf_bytes_avail &&
-+      return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
-              (rs->sseq_no != rs->sseq_comp) &&
-              (rs->target_sgl[rs->target_sge].length != 0);
- }
-@@ -1216,6 +1383,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt)
-       return rrecvv(socket, iov, iovcnt, 0);
- }
-+static int rs_send_iomaps(struct rsocket *rs, int flags)
-+{
-+      struct rs_iomap_mr *iomr;
-+      struct ibv_sge sge;
-+      struct rs_iomap iom;
-+      int ret;
-+
-+      fastlock_acquire(&rs->iomap_lock);
-+      while (!dlist_empty(&rs->iomap_queue)) {
-+              if (!rs_can_send(rs)) {
-+                      ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-+                                        rs_conn_can_send);
-+                      if (ret)
-+                              break;
-+                      if (!(rs->state & rs_connect_wr)) {
-+                              ret = ERR(ECONNRESET);
-+                              break;
-+                      }
-+              }
-+
-+              iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
-+              if (!(rs->opts & RS_OPT_SWAP_SGL)) {
-+                      iom.offset = iomr->offset;
-+                      iom.sge.addr = (uintptr_t) iomr->mr->addr;
-+                      iom.sge.length = iomr->mr->length;
-+                      iom.sge.key = iomr->mr->rkey;
-+              } else {
-+                      iom.offset = bswap_64(iomr->offset);
-+                      iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
-+                      iom.sge.length = bswap_32(iomr->mr->length);
-+                      iom.sge.key = bswap_32(iomr->mr->rkey);
-+              }
-+
-+              if (rs->sq_inline >= sizeof iom) {
-+                      sge.addr = (uintptr_t) &iom;
-+                      sge.length = sizeof iom;
-+                      sge.lkey = 0;
-+                      ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
-+              } else if (rs_sbuf_left(rs) >= sizeof iom) {
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
-+                      rs->ssgl[0].length = sizeof iom;
-+                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
-+                      if (rs_sbuf_left(rs) > sizeof iom)
-+                              rs->ssgl[0].addr += sizeof iom;
-+                      else
-+                              rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+              } else {
-+                      rs->ssgl[0].length = rs_sbuf_left(rs);
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
-+                              rs->ssgl[0].length);
-+                      rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
-+                      memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
-+                             rs->ssgl[1].length);
-+                      ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
-+                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+              }
-+              dlist_remove(&iomr->entry);
-+              dlist_insert_tail(&iomr->entry, &rs->iomap_list);
-+              if (ret)
-+                      break;
-+      }
-+
-+      rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
-+      fastlock_release(&rs->iomap_lock);
-+      return ret;
-+}
-+
- /*
-  * We overlap sending the data, by posting a small work request immediately,
-  * then increasing the size of the send on each iteration.
-@@ -1224,7 +1458,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- {
-       struct rsocket *rs;
-       struct ibv_sge sge;
--      size_t left;
-+      size_t left = len;
-       uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
-       int ret = 0;
-@@ -1239,7 +1473,12 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-       }
-       fastlock_acquire(&rs->slock);
--      for (left = len; left; left -= xfer_size, buf += xfer_size) {
-+      if (rs->iomap_pending) {
-+              ret = rs_send_iomaps(rs, flags);
-+              if (ret)
-+                      goto out;
-+      }
-+      for (; left; left -= xfer_size, buf += xfer_size) {
-               if (!rs_can_send(rs)) {
-                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-                                         rs_conn_can_send);
-@@ -1289,6 +1528,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
-               if (ret)
-                       break;
-       }
-+out:
-       fastlock_release(&rs->slock);
-       return (ret && left == len) ? ret : len - left;
-@@ -1345,9 +1585,15 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-       len = iov[0].iov_len;
-       for (i = 1; i < iovcnt; i++)
-               len += iov[i].iov_len;
-+      left = len;
-       fastlock_acquire(&rs->slock);
--      for (left = len; left; left -= xfer_size) {
-+      if (rs->iomap_pending) {
-+              ret = rs_send_iomaps(rs, flags);
-+              if (ret)
-+                      goto out;
-+      }
-+      for (; left; left -= xfer_size) {
-               if (!rs_can_send(rs)) {
-                       ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-                                         rs_conn_can_send);
-@@ -1395,6 +1641,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
-               if (ret)
-                       break;
-       }
-+out:
-       fastlock_release(&rs->slock);
-       return (ret && left == len) ? ret : len - left;
-@@ -1725,8 +1972,8 @@ int rshutdown(int socket, int how)
-               if ((rs->state & rs_connected) && rs->ctrl_avail) {
-                       rs->ctrl_avail--;
--                      ret = rs_post_write(rs, NULL, 0,
--                                          rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-+                      ret = rs_post_write_msg(rs, NULL, 0,
-+                                              rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
-               }
-       }
-@@ -1814,6 +2061,8 @@ int rsetsockopt(int socket, int level, int optname,
-               case SO_SNDBUF:
-                       if (!rs->sbuf)
-                               rs->sbuf_size = (*(uint32_t *) optval) << 1;
-+                      if (rs->sbuf_size < RS_SNDLOWAT)
-+                              rs->sbuf_size = RS_SNDLOWAT << 1;
-                       ret = 0;
-                       break;
-               case SO_LINGER:
-@@ -1878,6 +2127,10 @@ int rsetsockopt(int socket, int level, int optname,
-                       if (rs->sq_inline < RS_MIN_INLINE)
-                               rs->sq_inline = RS_MIN_INLINE;
-                       break;
-+              case RDMA_IOMAPSIZE:
-+                      rs->target_iomap_size = (uint16_t) rs_scale_to_value(
-+                              (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
-+                      break;
-               default:
-                       break;
-               }
-@@ -1979,6 +2232,10 @@ int rgetsockopt(int socket, int level, int optname,
-                       *((int *) optval) = rs->sq_inline;
-                       *optlen = sizeof(int);
-                       break;
-+              case RDMA_IOMAPSIZE:
-+                      *((int *) optval) = rs->target_iomap_size;
-+                      *optlen = sizeof(int);
-+                      break;
-               default:
-                       ret = ENOTSUP;
-                       break;
-@@ -2020,3 +2277,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
-       va_end(args);
-       return ret;
- }
-+
-+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
-+{
-+      int i;
-+
-+      if (!rs->remote_iomappings) {
-+              rs->remote_iomappings = calloc(rs->remote_iomap.length,
-+                                             sizeof(*rs->remote_iomappings));
-+              if (!rs->remote_iomappings)
-+                      return NULL;
-+
-+              for (i = 0; i < rs->remote_iomap.length; i++)
-+                      rs->remote_iomappings[i].index = i;
-+      }
-+
-+      for (i = 0; i < rs->remote_iomap.length; i++) {
-+              if (!rs->remote_iomappings[i].mr)
-+                      return &rs->remote_iomappings[i];
-+      }
-+      return NULL;
-+}
-+
-+/*
-+ * If an offset is given, we map to it.  If offset is -1, then we map the
-+ * offset to the address of buf.  We do not check for conflicts, which must
-+ * be fixed at some point.
-+ */
-+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
-+{
-+      struct rsocket *rs;
-+      struct rs_iomap_mr *iomr;
-+      int access = IBV_ACCESS_LOCAL_WRITE;
-+
-+      rs = idm_at(&idm, socket);
-+      if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
-+              return ERR(EINVAL);
-+
-+      fastlock_acquire(&rs->iomap_lock);
-+      if (prot & PROT_WRITE) {
-+              iomr = rs_get_iomap_mr(rs);
-+              access |= IBV_ACCESS_REMOTE_WRITE;
-+      } else {
-+              iomr = calloc(1, sizeof *iomr);
-+              iomr->index = -1;
-+      }
-+      if (!iomr) {
-+              offset = ERR(ENOMEM);
-+              goto out;
-+      }
-+
-+      iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
-+      if (!iomr->mr) {
-+              if (iomr->index < 0)
-+                      free(iomr);
-+              offset = -1;
-+              goto out;
-+      }
-+
-+      if (offset == -1)
-+              offset = (uintptr_t) buf;
-+      iomr->offset = offset;
-+      atomic_init(&iomr->refcnt);
-+      atomic_set(&iomr->refcnt, 1);
-+
-+      if (iomr->index >= 0) {
-+              dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
-+              rs->iomap_pending = 1;
-+      } else {
-+              dlist_insert_tail(&iomr->entry, &rs->iomap_list);
-+      }
-+out:
-+      fastlock_release(&rs->iomap_lock);
-+      return offset;
-+}
-+
-+int riounmap(int socket, void *buf, size_t len)
-+{
-+      struct rsocket *rs;
-+      struct rs_iomap_mr *iomr;
-+      dlist_entry *entry;
-+      int ret = 0;
-+
-+      rs = idm_at(&idm, socket);
-+      fastlock_acquire(&rs->iomap_lock);
-+
-+      for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
-+           entry = entry->next) {
-+              iomr = container_of(entry, struct rs_iomap_mr, entry);
-+              if (iomr->mr->addr == buf && iomr->mr->length == len) {
-+                      rs_release_iomap_mr(iomr);
-+                      goto out;
-+              }
-+      }
-+
-+      for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
-+           entry = entry->next) {
-+              iomr = container_of(entry, struct rs_iomap_mr, entry);
-+              if (iomr->mr->addr == buf && iomr->mr->length == len) {
-+                      rs_release_iomap_mr(iomr);
-+                      goto out;
-+              }
-+      }
-+      ret = ERR(EINVAL);
-+out:
-+      fastlock_release(&rs->iomap_lock);
-+      return ret;
-+}
-+
-+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
-+{
-+      int i;
-+
-+      for (i = 0; i < rs->target_iomap_size; i++) {
-+              if (offset >= rs->target_iomap[i].offset &&
-+                  offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
-+                      return &rs->target_iomap[i];
-+      }
-+      return NULL;
-+}
-+
-+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
-+{
-+      struct rsocket *rs;
-+      struct rs_iomap *iom = NULL;
-+      struct ibv_sge sge;
-+      size_t left = count;
-+      uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
-+      int ret = 0;
-+
-+      rs = idm_at(&idm, socket);
-+      fastlock_acquire(&rs->slock);
-+      if (rs->iomap_pending) {
-+              ret = rs_send_iomaps(rs, flags);
-+              if (ret)
-+                      goto out;
-+      }
-+      for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
-+              if (!iom || offset > iom->offset + iom->sge.length) {
-+                      iom = rs_find_iomap(rs, offset);
-+                      if (!iom)
-+                              break;
-+              }
-+
-+              if (!rs_can_send(rs)) {
-+                      ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
-+                                        rs_conn_can_send);
-+                      if (ret)
-+                              break;
-+                      if (!(rs->state & rs_connect_wr)) {
-+                              ret = ERR(ECONNRESET);
-+                              break;
-+                      }
-+              }
-+
-+              if (olen < left) {
-+                      xfer_size = olen;
-+                      if (olen < RS_MAX_TRANSFER)
-+                              olen <<= 1;
-+              } else {
-+                      xfer_size = left;
-+              }
-+
-+              if (xfer_size > rs->sbuf_bytes_avail)
-+                      xfer_size = rs->sbuf_bytes_avail;
-+              if (xfer_size > iom->offset + iom->sge.length - offset)
-+                      xfer_size = iom->offset + iom->sge.length - offset;
-+
-+              if (xfer_size <= rs->sq_inline) {
-+                      sge.addr = (uintptr_t) buf;
-+                      sge.length = xfer_size;
-+                      sge.lkey = 0;
-+                      ret = rs_write_direct(rs, iom, offset, &sge, 1,
-+                                            xfer_size, IBV_SEND_INLINE);
-+              } else if (xfer_size <= rs_sbuf_left(rs)) {
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
-+                      rs->ssgl[0].length = xfer_size;
-+                      ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
-+                      if (xfer_size < rs_sbuf_left(rs))
-+                              rs->ssgl[0].addr += xfer_size;
-+                      else
-+                              rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
-+              } else {
-+                      rs->ssgl[0].length = rs_sbuf_left(rs);
-+                      memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
-+                              rs->ssgl[0].length);
-+                      rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
-+                      memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
-+                      ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
-+                      rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
-+              }
-+              if (ret)
-+                      break;
-+      }
-+out:
-+      fastlock_release(&rs->slock);
-+
-+      return (ret && left == count) ? ret : count - left;
-+}