#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
+#define RS_SNDLOWAT 64
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transfered
* for control messages:
+ * SGL, CTRL
* bits [28-0]: receive credits granted
+ * IOMAP_SGL
+ * bits [28-16]: reserved, bits [15-0]: index
*/
enum {
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
- RS_OP_RSVD_DRA_SGL,
+ RS_OP_IOMAP_SGL,
RS_OP_CTRL
};
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
uint32_t length;
};
-#define RS_MIN_INLINE (sizeof(struct rs_sge))
-#define rs_host_is_net() (1 == htonl(1))
-#define RS_CONN_FLAG_NET 1
+struct rs_iomap {
+ uint64_t offset;
+ struct rs_sge sge;
+};
+
+struct rs_iomap_mr {
+ uint64_t offset;
+ struct ibv_mr *mr;
+ dlist_entry entry;
+ atomic_t refcnt;
+ int index; /* -1 if mapping is local and not in iomap_list */
+};
+
+#define RS_MIN_INLINE (sizeof(struct rs_sge))
+#define rs_host_is_net() (1 == htonl(1))
+#define RS_CONN_FLAG_NET (1 << 0)
+#define RS_CONN_FLAG_IOMAP (1 << 1)
struct rs_conn_data {
uint8_t version;
uint8_t flags;
uint16_t credits;
- uint32_t reserved2;
+ uint8_t reserved[3];
+ uint8_t target_iomap_size;
struct rs_sge target_sgl;
struct rs_sge data_buf;
};
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
+ fastlock_t iomap_lock;
int opts;
long fd_flags;
int remote_sge;
struct rs_sge remote_sgl;
+ struct rs_sge remote_iomap;
+
+ struct rs_iomap_mr *remote_iomappings;
+ dlist_entry iomap_list;
+ dlist_entry iomap_queue;
+ int iomap_pending;
struct ibv_mr *target_mr;
int target_sge;
- volatile struct rs_sge target_sgl[RS_SGL_SIZE];
+ int target_iomap_size;
+ void *target_buffer_list;
+ volatile struct rs_sge *target_sgl;
+ volatile struct rs_iomap *target_iomap;
uint32_t rbuf_size;
struct ibv_mr *rmr;
uint8_t *sbuf;
};
+static int rs_value_to_scale(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (1 << (bits - 1)) | (value >> bits);
+}
+
+static int rs_scale_to_value(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (value & ~(1 << (bits - 1))) << bits;
+}
+
void rs_configure(void)
{
FILE *f;
if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
fscanf(f, "%u", &def_wmem);
fclose(f);
+ if (def_wmem < RS_SNDLOWAT)
+ def_wmem = RS_SNDLOWAT << 1;
+ }
+
+ if ((f = fopen(RS_CONN_DIR "/iomap_size", "r"))) {
+ fscanf(f, "%hu", &def_iomap_size);
+ fclose(f);
- if (def_wmem < 1)
- def_wmem = 1;
+ /* round to supported values */
+ def_iomap_size = (uint8_t) rs_value_to_scale(def_iomap_size, 8);
+ def_iomap_size = (uint16_t) rs_scale_to_value(def_iomap_size, 8);
}
init = 1;
out:
rs->sq_size = inherited_rs->sq_size;
rs->rq_size = inherited_rs->rq_size;
rs->ctrl_avail = inherited_rs->ctrl_avail;
+ rs->target_iomap_size = inherited_rs->target_iomap_size;
} else {
rs->sbuf_size = def_wmem;
rs->rbuf_size = def_mem;
rs->sq_size = def_sqsize;
rs->rq_size = def_rqsize;
rs->ctrl_avail = RS_QP_CTRL_SIZE;
+ rs->target_iomap_size = def_iomap_size;
}
fastlock_init(&rs->slock);
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
+ fastlock_init(&rs->iomap_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
return rs;
}
static int rs_init_bufs(struct rsocket *rs)
{
+ size_t len;
+
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return -1;
if (!rs->smr)
return -1;
- rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
- sizeof(rs->target_sgl));
+ len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
+ sizeof(*rs->target_iomap) * rs->target_iomap_size;
+ rs->target_buffer_list = malloc(len);
+ if (!rs->target_buffer_list)
+ return -1;
+
+ rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
return -1;
+ memset(rs->target_buffer_list, 0, len);
+ rs->target_sgl = rs->target_buffer_list;
+ if (rs->target_iomap_size)
+ rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
+
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
return -1;
return 0;
}
+/*
+static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...)
+{
+ TODO: write me
+}
+*/
+
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+ if (atomic_dec(&iomr->refcnt))
+ return;
+
+ dlist_remove(&iomr->entry);
+ ibv_dereg_mr(iomr->mr);
+ if (iomr->index >= 0)
+ iomr->mr = NULL;
+ else
+ free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+
+ while (!dlist_empty(&rs->iomap_list)) {
+ iomr = container_of(rs->iomap_list.next,
+ struct rs_iomap_mr, entry);
+ riounmap(iomr->mr->addr, iomr->mr->length);
+ }
+ while (!dlist_empty(&rs->iomap_queue)) {
+ iomr = container_of(rs->iomap_queue.next,
+ struct rs_iomap_mr, entry);
+ riounmap(iomr->mr->addr, iomr->mr->length);
+ }
+}
+
static void rs_free(struct rsocket *rs)
{
if (rs->index >= 0)
free(rs->rbuf);
}
- if (rs->target_mr)
- rdma_dereg_mr(rs->target_mr);
+ if (rs->target_buffer_list) {
+ if (rs->target_mr)
+ rdma_dereg_mr(rs->target_mr);
+ free(rs->target_buffer_list);
+ }
if (rs->cm_id) {
+ rs_free_iomappings(rs);
if (rs->cm_id->qp)
rdma_destroy_qp(rs->cm_id);
rdma_destroy_id(rs->cm_id);
}
+ fastlock_destroy(&rs->iomap_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
struct rs_conn_data *conn)
{
conn->version = 1;
- conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
+ conn->flags = RS_CONN_FLAG_IOMAP |
+ (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
conn->credits = htons(rs->rq_size);
- conn->reserved2 = 0;
+ memset(conn->reserved, 0, sizeof conn->reserved);
+ conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8);
conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
conn->target_sgl.length = htonl(RS_SGL_SIZE);
(!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
rs->opts = RS_OPT_SWAP_SGL;
+ if (conn->flags & RS_CONN_FLAG_IOMAP) {
+ rs->remote_iomap.addr = rs->remote_sgl.addr +
+ sizeof(rs->remote_sgl) * rs->remote_sgl.length;
+ rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8);
+ rs->remote_iomap.key = rs->remote_sgl.key;
+ }
+
rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
rs->target_sgl[0].length = ntohl(conn->data_buf.length);
rs->target_sgl[0].key = ntohl(conn->data_buf.key);
flags, addr, rkey);
}
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+ struct ibv_sge *sgl, int nsge, int flags)
+{
+ uint64_t addr;
+
+ rs->sseq_no++;
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
+ flags, addr, rs->remote_iomap.key);
+}
+
static uint32_t rs_sbuf_left(struct rsocket *rs)
{
return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) -
*/
static int rs_can_send(struct rsocket *rs)
{
- return rs->sqe_avail && rs->sbuf_bytes_avail &&
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
(rs->sseq_no != rs->sseq_comp) &&
(rs->target_sgl[rs->target_sge].length != 0);
}
return rrecvv(socket, iov, iovcnt, 0);
}
+static int rs_send_iomaps(struct rsocket *rs, int flags)
+{
+ struct rs_iomap_mr *iomr;
+ struct ibv_sge sge;
+ struct rs_iomap iom;
+ int ret;
+
+ fastlock_acquire(&rs->iomap_lock);
+ while (!dlist_empty(&rs->iomap_queue)) {
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry);
+ if (!(rs->opts & RS_OPT_SWAP_SGL)) {
+ iom.offset = iomr->offset;
+ iom.sge.addr = (uintptr_t) iomr->mr->addr;
+ iom.sge.length = iomr->mr->length;
+ iom.sge.key = iomr->mr->rkey;
+ } else {
+ iom.offset = bswap_64(iomr->offset);
+ iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
+ iom.sge.length = bswap_32(iomr->mr->length);
+ iom.sge.key = bswap_32(iomr->mr->rkey);
+ }
+
+ if (rs->sq_inline >= sizeof iom) {
+ sge.addr = (uintptr_t) &iom;
+ sge.length = sizeof iom;
+ sge.lkey = 0;
+ ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE);
+ } else if (rs_sbuf_left(rs) >= sizeof iom) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom);
+ rs->ssgl[0].length = sizeof iom;
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
+ if (rs_sbuf_left(rs) > sizeof iom)
+ rs->ssgl[0].addr += sizeof iom;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
+ memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
+ rs->ssgl[1].length);
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length;
+ }
+ dlist_remove(&iomr->entry);
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ if (ret)
+ break;
+ }
+
+ rs->iomap_pending = dlist_empty(&rs->iomap_queue);
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
}
fastlock_acquire(&rs->slock);
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
len += iov[i].iov_len;
fastlock_acquire(&rs->slock);
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
case SO_SNDBUF:
if (!rs->sbuf)
rs->sbuf_size = (*(uint32_t *) optval) << 1;
+ if (rs->sbuf_size < RS_SNDLOWAT)
+ rs->sbuf_size = RS_SNDLOWAT << 1;
ret = 0;
break;
case SO_LINGER:
va_end(args);
return ret;
}
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+ int i;
+
+ if (!rs->remote_iomappings) {
+ rs->remote_iomappings = calloc(rs->remote_iomap.length,
+ sizeof(*rs->remote_iomappings));
+ if (!rs->remote_iomappings)
+ return NULL;
+
+ for (i = 0; i < rs->remote_iomap.length; i++)
+ rs->remote_iomappings[i].index = i;
+ }
+
+ for (i = 0; i < rs->remote_iomap.length; i++) {
+ if (!rs->remote_iomappings[i]->mr)
+ return &rs->remote_iomappings[i];
+ }
+ return NULL;
+}
+
+/*
+ * If an offset is given, we map to it. If offset is -1, then we map the
+ * offset to the address of buf. We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ int ret, access = IBV_ACCESS_LOCAL_WRITE;
+
+ rs = idm_at(&idm, socket);
+ if ((rs->state != rs_connect_rdwr) || (prot & ~(PROT_WRITE | PROT_NONE)))
+ return ERR(EINVAL);
+
+ fastlock_acquire(&rs->iomap_lock);
+ if (prot & PROT_WRITE) {
+ iomr = rs_get_iomap_mr(rs);
+ access |= IBV_ACCESS_REMOTE_WRITE;
+ } else {
+ iomr = calloc(1, sizeof *iomr);
+ iomr->index = -1;
+ }
+ if (!iomr) {
+ offset = ERR(ENOMEM);
+ goto out;
+ }
+
+ iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+ if (!iomr->mr) {
+ if (iomr->index < 0)
+ free(iomr);
+ offset = -1;
+ goto out;
+ }
+
+ if (offset == -1)
+ offset = (uintptr_t) buf;
+ iomr->offset = offset;
+ iomr->prot = prot;
+ atomic_init(&iomr->refcnt);
+ atomic_set(&iomr->refcnt, 1);
+
+ if (iomr->index >= 0) {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+ rs->iomap_pending = 1;
+ } else {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ }
+out:
+ fastlock_release(&rs->iomap_lock);
+ return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ struct dlist_entry *entry;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->iomap_lock);
+
+ for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+
+ for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+ ret = ERR(EINVAL);
+out:
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
+
+}