#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
+#define RS_SNDLOWAT 64
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transferred
* for control messages:
+ * SGL, CTRL
* bits [28-0]: receive credits granted
+ * IOMAP_SGL
+ * bits [28-16]: reserved, bits [15-0]: index
*/
enum {
RS_OP_DATA,
RS_OP_RSVD_DATA_MORE,
- RS_OP_RSVD_DRA,
+ RS_OP_WRITE, /* opcode is not transmitted over the network */
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
- RS_OP_RSVD_DRA_SGL,
+ RS_OP_IOMAP_SGL,
RS_OP_CTRL
};
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
uint32_t length;
};
-#define RS_MIN_INLINE (sizeof(struct rs_sge))
-#define rs_host_is_net() (1 == htonl(1))
-#define RS_CONN_FLAG_NET 1
+struct rs_iomap {
+ uint64_t offset;
+ struct rs_sge sge;
+};
+
+struct rs_iomap_mr {
+ uint64_t offset;
+ struct ibv_mr *mr;
+ dlist_entry entry;
+ atomic_t refcnt;
+ int index; /* -1 if mapping is local and not in iomap_list */
+};
+
+#define RS_MIN_INLINE (sizeof(struct rs_sge))
+#define rs_host_is_net() (1 == htonl(1))
+#define RS_CONN_FLAG_NET (1 << 0)
+#define RS_CONN_FLAG_IOMAP (1 << 1)
struct rs_conn_data {
uint8_t version;
uint8_t flags;
uint16_t credits;
- uint32_t reserved2;
+ uint8_t reserved[3];
+ uint8_t target_iomap_size;
struct rs_sge target_sgl;
struct rs_sge data_buf;
};
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
+ fastlock_t iomap_lock;
int opts;
long fd_flags;
int remote_sge;
struct rs_sge remote_sgl;
+ struct rs_sge remote_iomap;
+
+ struct rs_iomap_mr *remote_iomappings;
+ dlist_entry iomap_list;
+ dlist_entry iomap_queue;
+ int iomap_pending;
struct ibv_mr *target_mr;
int target_sge;
- volatile struct rs_sge target_sgl[RS_SGL_SIZE];
+ int target_iomap_size;
+ void *target_buffer_list;
+ volatile struct rs_sge *target_sgl;
+ struct rs_iomap *target_iomap;
uint32_t rbuf_size;
struct ibv_mr *rmr;
uint8_t *sbuf;
};
+static int rs_value_to_scale(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (1 << (bits - 1)) | (value >> bits);
+}
+
+static int rs_scale_to_value(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (value & ~(1 << (bits - 1))) << bits;
+}
+
void rs_configure(void)
{
FILE *f;
if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
fscanf(f, "%u", &def_wmem);
fclose(f);
+ if (def_wmem < RS_SNDLOWAT)
+ def_wmem = RS_SNDLOWAT << 1;
+ }
+
+ if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
+ fscanf(f, "%hu", &def_iomap_size);
+ fclose(f);
- if (def_wmem < 1)
- def_wmem = 1;
+ /* round to supported values */
+ def_iomap_size = (uint8_t) rs_value_to_scale(
+ (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
}
init = 1;
out:
rs->sq_size = inherited_rs->sq_size;
rs->rq_size = inherited_rs->rq_size;
rs->ctrl_avail = inherited_rs->ctrl_avail;
+ rs->target_iomap_size = inherited_rs->target_iomap_size;
} else {
rs->sbuf_size = def_wmem;
rs->rbuf_size = def_mem;
rs->sq_size = def_sqsize;
rs->rq_size = def_rqsize;
rs->ctrl_avail = RS_QP_CTRL_SIZE;
+ rs->target_iomap_size = def_iomap_size;
}
fastlock_init(&rs->slock);
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
+ fastlock_init(&rs->iomap_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
return rs;
}
static int rs_init_bufs(struct rsocket *rs)
{
+ size_t len;
+
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return -1;
if (!rs->smr)
return -1;
- rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
- sizeof(rs->target_sgl));
+ len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
+ sizeof(*rs->target_iomap) * rs->target_iomap_size;
+ rs->target_buffer_list = malloc(len);
+ if (!rs->target_buffer_list)
+ return -1;
+
+ rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
return -1;
+ memset(rs->target_buffer_list, 0, len);
+ rs->target_sgl = rs->target_buffer_list;
+ if (rs->target_iomap_size)
+ rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
+
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
return -1;
return 0;
}
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+ if (atomic_dec(&iomr->refcnt))
+ return;
+
+ dlist_remove(&iomr->entry);
+ ibv_dereg_mr(iomr->mr);
+ if (iomr->index >= 0)
+ iomr->mr = NULL;
+ else
+ free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+
+ while (!dlist_empty(&rs->iomap_list)) {
+ iomr = container_of(rs->iomap_list.next,
+ struct rs_iomap_mr, entry);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+ while (!dlist_empty(&rs->iomap_queue)) {
+ iomr = container_of(rs->iomap_queue.next,
+ struct rs_iomap_mr, entry);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+}
+
static void rs_free(struct rsocket *rs)
{
if (rs->index >= 0)
free(rs->rbuf);
}
- if (rs->target_mr)
- rdma_dereg_mr(rs->target_mr);
+ if (rs->target_buffer_list) {
+ if (rs->target_mr)
+ rdma_dereg_mr(rs->target_mr);
+ free(rs->target_buffer_list);
+ }
if (rs->cm_id) {
+ rs_free_iomappings(rs);
if (rs->cm_id->qp)
rdma_destroy_qp(rs->cm_id);
rdma_destroy_id(rs->cm_id);
}
+ fastlock_destroy(&rs->iomap_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
struct rs_conn_data *conn)
{
conn->version = 1;
- conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
+ conn->flags = RS_CONN_FLAG_IOMAP |
+ (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
conn->credits = htons(rs->rq_size);
- conn->reserved2 = 0;
+ memset(conn->reserved, 0, sizeof conn->reserved);
+ conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
conn->target_sgl.length = htonl(RS_SGL_SIZE);
(!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
rs->opts = RS_OPT_SWAP_SGL;
+ if (conn->flags & RS_CONN_FLAG_IOMAP) {
+ rs->remote_iomap.addr = rs->remote_sgl.addr +
+ sizeof(rs->remote_sgl) * rs->remote_sgl.length;
+ rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
+ rs->remote_iomap.key = rs->remote_sgl.key;
+ }
+
rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
rs->target_sgl[0].length = ntohl(conn->data_buf.length);
rs->target_sgl[0].key = ntohl(conn->data_buf.key);
return rs_do_connect(rs);
}
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
struct ibv_sge *sgl, int nsge,
uint32_t imm_data, int flags,
uint64_t addr, uint32_t rkey)
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+static int rs_post_write(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint64_t wr_id, int flags,
+ uint64_t addr, uint32_t rkey)
+{
+ struct ibv_send_wr wr, *bad;
+
+ wr.wr_id = wr_id;
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+ wr.opcode = IBV_WR_RDMA_WRITE;
+ wr.send_flags = flags;
+ wr.wr.rdma.remote_addr = addr;
+ wr.wr.rdma.rkey = rkey;
+
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
rs->target_sge = 0;
}
- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
- flags, addr, rkey);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+ flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset,
+ struct ibv_sge *sgl, int nsge, uint32_t length, int flags)
+{
+ uint64_t addr;
+
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= length;
+
+ addr = iom->sge.addr + offset - iom->offset;
+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+ flags, addr, iom->sge.key);
+}
+
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+ struct ibv_sge *sgl, int nsge, int flags)
+{
+ uint64_t addr;
+
+ rs->sseq_no++;
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+ flags, addr, rs->remote_iomap.key);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
ibsge.lkey = 0;
ibsge.length = sizeof(sge);
- rs_post_write(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
- rs->remote_sge * sizeof(struct rs_sge),
- rs->remote_sgl.key);
+ rs_post_write_msg(rs, &ibsge, 1,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ IBV_SEND_INLINE,
+ rs->remote_sgl.addr +
+ rs->remote_sge * sizeof(struct rs_sge),
+ rs->remote_sgl.key);
rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
rs->rbuf_free_offset += rs->rbuf_size >> 1;
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
- rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0);
+ rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
+ 0, 0, 0);
}
}
case RS_OP_SGL:
rs->sseq_comp = (uint16_t) rs_msg_data(imm_data);
break;
+ case RS_OP_IOMAP_SGL:
+ /* The iomap was updated, that's nice to know. */
+ break;
case RS_OP_CTRL:
if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) {
rs->state = rs_disconnected;
rs->state &= ~rs_connect_rd;
}
break;
+ case RS_OP_WRITE:
+ /* We really shouldn't be here. */
+ break;
default:
rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data);
rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data);
if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT)
rs->state = rs_disconnected;
break;
+ case RS_OP_IOMAP_SGL:
+ rs->sqe_avail++;
+ rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+ break;
default:
rs->sqe_avail++;
rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id);
*/
static int rs_can_send(struct rsocket *rs)
{
- return rs->sqe_avail && rs->sbuf_bytes_avail &&
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
(rs->sseq_no != rs->sseq_comp) &&
(rs->target_sgl[rs->target_sge].length != 0);
}
return rrecvv(socket, iov, iovcnt, 0);
}
+static int rs_send_iomaps(struct rsocket *rs, int flags)
+{
+ struct rs_iomap_mr *iomr;
+ struct ibv_sge sge;
+ struct rs_iomap iom;
+ int ret;
+
+ fastlock_acquire(&rs->iomap_lock);
+ while (!dlist_empty(&rs->iomap_queue)) {
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
+ if (!(rs->opts & RS_OPT_SWAP_SGL)) {
+ iom.offset = iomr->offset;
+ iom.sge.addr = (uintptr_t) iomr->mr->addr;
+ iom.sge.length = iomr->mr->length;
+ iom.sge.key = iomr->mr->rkey;
+ } else {
+ iom.offset = bswap_64(iomr->offset);
+ iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
+ iom.sge.length = bswap_32(iomr->mr->length);
+ iom.sge.key = bswap_32(iomr->mr->rkey);
+ }
+
+ if (rs->sq_inline >= sizeof iom) {
+ sge.addr = (uintptr_t) &iom;
+ sge.length = sizeof iom;
+ sge.lkey = 0;
+ ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
+ } else if (rs_sbuf_left(rs) >= sizeof iom) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
+ rs->ssgl[0].length = sizeof iom;
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
+ if (rs_sbuf_left(rs) > sizeof iom)
+ rs->ssgl[0].addr += sizeof iom;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
+ memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
+ rs->ssgl[1].length);
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+ dlist_remove(&iomr->entry);
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ if (ret)
+ break;
+ }
+
+ rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
{
struct rsocket *rs;
struct ibv_sge sge;
- size_t left;
+ size_t left = len;
uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
int ret = 0;
}
fastlock_acquire(&rs->slock);
- for (left = len; left; left -= xfer_size, buf += xfer_size) {
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
rs_conn_can_send);
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
len = iov[0].iov_len;
for (i = 1; i < iovcnt; i++)
len += iov[i].iov_len;
+ left = len;
fastlock_acquire(&rs->slock);
- for (left = len; left; left -= xfer_size) {
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
rs_conn_can_send);
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
if ((rs->state & rs_connected) && rs->ctrl_avail) {
rs->ctrl_avail--;
- ret = rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
+ ret = rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0);
}
}
case SO_SNDBUF:
if (!rs->sbuf)
rs->sbuf_size = (*(uint32_t *) optval) << 1;
+ if (rs->sbuf_size < RS_SNDLOWAT)
+ rs->sbuf_size = RS_SNDLOWAT << 1;
ret = 0;
break;
case SO_LINGER:
if (rs->sq_inline < RS_MIN_INLINE)
rs->sq_inline = RS_MIN_INLINE;
break;
+ case RDMA_IOMAPSIZE:
+ rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+ (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8);
+ break;
default:
break;
}
*((int *) optval) = rs->sq_inline;
*optlen = sizeof(int);
break;
+ case RDMA_IOMAPSIZE:
+ *((int *) optval) = rs->target_iomap_size;
+ *optlen = sizeof(int);
+ break;
default:
ret = ENOTSUP;
break;
va_end(args);
return ret;
}
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+ int i;
+
+ if (!rs->remote_iomappings) {
+ rs->remote_iomappings = calloc(rs->remote_iomap.length,
+ sizeof(*rs->remote_iomappings));
+ if (!rs->remote_iomappings)
+ return NULL;
+
+ for (i = 0; i < rs->remote_iomap.length; i++)
+ rs->remote_iomappings[i].index = i;
+ }
+
+ for (i = 0; i < rs->remote_iomap.length; i++) {
+ if (!rs->remote_iomappings[i].mr)
+ return &rs->remote_iomappings[i];
+ }
+ return NULL;
+}
+
+/*
+ * If an offset is given, we map to it. If offset is -1, then we map the
+ * offset to the address of buf. We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ int access = IBV_ACCESS_LOCAL_WRITE;
+
+ rs = idm_at(&idm, socket);
+ if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
+ return ERR(EINVAL);
+
+ fastlock_acquire(&rs->iomap_lock);
+ if (prot & PROT_WRITE) {
+ iomr = rs_get_iomap_mr(rs);
+ access |= IBV_ACCESS_REMOTE_WRITE;
+ } else {
+ iomr = calloc(1, sizeof *iomr);
+ iomr->index = -1;
+ }
+ if (!iomr) {
+ offset = ERR(ENOMEM);
+ goto out;
+ }
+
+ iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+ if (!iomr->mr) {
+ if (iomr->index < 0)
+ free(iomr);
+ offset = -1;
+ goto out;
+ }
+
+ if (offset == -1)
+ offset = (uintptr_t) buf;
+ iomr->offset = offset;
+ atomic_init(&iomr->refcnt);
+ atomic_set(&iomr->refcnt, 1);
+
+ if (iomr->index >= 0) {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+ rs->iomap_pending = 1;
+ } else {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ }
+out:
+ fastlock_release(&rs->iomap_lock);
+ return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ dlist_entry *entry;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->iomap_lock);
+
+ for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+
+ for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+ ret = ERR(EINVAL);
+out:
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+ int i;
+
+ for (i = 0; i < rs->target_iomap_size; i++) {
+ if (offset >= rs->target_iomap[i].offset &&
+ offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length)
+ return &rs->target_iomap[i];
+ }
+ return NULL;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
+ struct rsocket *rs;
+ struct rs_iomap *iom = NULL;
+ struct ibv_sge sge;
+ size_t left = count;
+ uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->slock);
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+ if (!iom || offset > iom->offset + iom->sge.length) {
+ iom = rs_find_iomap(rs, offset);
+ if (!iom)
+ break;
+ }
+
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ if (olen < left) {
+ xfer_size = olen;
+ if (olen < RS_MAX_TRANSFER)
+ olen <<= 1;
+ } else {
+ xfer_size = left;
+ }
+
+ if (xfer_size > rs->sbuf_bytes_avail)
+ xfer_size = rs->sbuf_bytes_avail;
+ if (xfer_size > iom->offset + iom->sge.length - offset)
+ xfer_size = iom->offset + iom->sge.length - offset;
+
+ if (xfer_size <= rs->sq_inline) {
+ sge.addr = (uintptr_t) buf;
+ sge.length = xfer_size;
+ sge.lkey = 0;
+ ret = rs_write_direct(rs, iom, offset, &sge, 1,
+ xfer_size, IBV_SEND_INLINE);
+ } else if (xfer_size <= rs_sbuf_left(rs)) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size);
+ rs->ssgl[0].length = xfer_size;
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0);
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += xfer_size;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+ memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length);
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+ if (ret)
+ break;
+ }
+out:
+ fastlock_release(&rs->slock);
+
+ return (ret && left == count) ? ret : count - left;
+}