*
*/
+#if !defined(INDEXER_H)
+#define INDEXER_H
+
#if HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ?
idm_at(idm, index) : NULL;
}
+
+typedef struct _dlist_entry {
+ struct _dlist_entry *next;
+ struct _dlist_entry *prev;
+} dlist_entry;
+
+static inline void dlist_init(dlist_entry *head)
+{
+ head->next = head;
+ head->prev = head;
+}
+
+static inline int dlist_empty(dlist_entry *head)
+{
+ return head->next == head;
+}
+
+static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head)
+{
+ item->next = head->next;
+ item->prev = head;
+ head->next->prev = item;
+ head->next = item;
+}
+
+static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head)
+{
+ dlist_insert_after(item, head->prev);
+}
+
+#define dlist_insert_head dlist_insert_after
+#define dlist_insert_tail dlist_insert_before
+
+static inline void dlist_remove(dlist_entry *item)
+{
+ item->prev->next = item->next;
+ item->next->prev = item->prev;
+}
+
+#endif /* INDEXER_H */
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
#define RS_SGL_SIZE 2
-#define RS_MAX_IOMAP 128
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
struct rs_sge sge;
};
+struct rs_iomap_mr {
+ uint64_t offset;
+ struct ibv_mr *mr;
+ dlist_entry entry;
+ atomic_t refcnt;
+ int index; /* -1 if mapping is local and not in iomap_list */
+};
+
#define RS_MIN_INLINE (sizeof(struct rs_sge))
#define rs_host_is_net() (1 == htonl(1))
#define RS_CONN_FLAG_NET (1 << 0)
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
+ fastlock_t iomap_lock;
int opts;
long fd_flags;
int remote_sge;
struct rs_sge remote_sgl;
struct rs_sge remote_iomap;
- struct rs_iomap *remote_iomappings;
+
+ struct rs_iomap_mr *remote_iomappings;
+ dlist_entry iomap_list;
+ dlist_entry iomap_queue;
+ int iomap_pending;
struct ibv_mr *target_mr;
int target_sge;
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
+ fastlock_init(&rs->iomap_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
return rs;
}
return 0;
}
+/*
+static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...)
+{
+ TODO: write me
+}
+*/
+
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+ if (atomic_dec(&iomr->refcnt))
+ return;
+
+ dlist_remove(&iomr->entry);
+ ibv_dereg_mr(iomr->mr);
+ if (iomr->index >= 0)
+ iomr->mr = NULL;
+ else
+ free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+
+ while (!dlist_empty(&rs->iomap_list)) {
+ iomr = container_of(rs->iomap_list.next,
+ struct rs_iomap_mr, entry);
+ riounmap(iomr->mr->addr, iomr->mr->length);
+ }
+ while (!dlist_empty(&rs->iomap_queue)) {
+ iomr = container_of(rs->iomap_queue.next,
+ struct rs_iomap_mr, entry);
+ riounmap(iomr->mr->addr, iomr->mr->length);
+ }
+}
+
static void rs_free(struct rsocket *rs)
{
if (rs->index >= 0)
}
if (rs->cm_id) {
+ rs_free_iomappings(rs);
if (rs->cm_id->qp)
rdma_destroy_qp(rs->cm_id);
rdma_destroy_id(rs->cm_id);
}
+ fastlock_destroy(&rs->iomap_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
va_end(args);
return ret;
}
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+ int i;
+
+ if (!rs->remote_iomappings) {
+ rs->remote_iomappings = calloc(rs->remote_iomap.length,
+ sizeof(*rs->remote_iomappings));
+ if (!rs->remote_iomappings)
+ return NULL;
+
+ for (i = 0; i < rs->remote_iomap.length; i++)
+ rs->remote_iomappings[i].index = i;
+ }
+
+ for (i = 0; i < rs->remote_iomap.length; i++) {
+ if (!rs->remote_iomappings[i]->mr)
+ return &rs->remote_iomappings[i];
+ }
+ return NULL;
+}
+
+/*
+ * If an offset is given, we map to it. If offset is -1, then we map the
+ * offset to the address of buf. We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ int ret, access = IBV_ACCESS_LOCAL_WRITE;
+
+ rs = idm_at(&idm, socket);
+ if ((rs->state != rs_connect_rdwr) || (prot & ~(PROT_WRITE | PROT_NONE)))
+ return ERR(EINVAL);
+
+ fastlock_acquire(&rs->iomap_lock);
+ if (prot & PROT_WRITE) {
+ iomr = rs_get_iomap_mr(rs);
+ access |= IBV_ACCESS_REMOTE_WRITE;
+ } else {
+ iomr = calloc(1, sizeof *iomr);
+ iomr->index = -1;
+ }
+ if (!iomr) {
+ offset = ERR(ENOMEM);
+ goto out;
+ }
+
+ iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+ if (!iomr->mr) {
+ if (iomr->index < 0)
+ free(iomr);
+ offset = -1;
+ goto out;
+ }
+
+ if (offset == -1)
+ offset = (uintptr_t) buf;
+ iomr->offset = offset;
+ iomr->prot = prot;
+ atomic_init(&iomr->refcnt);
+ atomic_set(&iomr->refcnt, 1);
+
+ if (iomr->index >= 0) {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+ rs->iomap_pending = 1;
+ } else {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ }
+out:
+ fastlock_release(&rs->iomap_lock);
+ return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ struct dlist_entry *entry;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->iomap_lock);
+
+ for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+
+ for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+ ret = ERR(EINVAL);
+out:
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags)
+{
+
+}