From f0fc707aeebfbb6b383d6d5d471310e2a46a004b Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Tue, 23 Oct 2012 00:59:59 -0700 Subject: [PATCH] Refresh of rs-iomap --- include/rdma/rsocket.h | 6 +- src/indexer.h | 43 +++++++++++ src/rsocket.c | 171 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 217 insertions(+), 3 deletions(-) diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h index 65feda96..21477e41 100644 --- a/include/rdma/rsocket.h +++ b/include/rdma/rsocket.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011 Intel Corporation. All rights reserved. + * Copyright (c) 2011-2012 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -85,6 +85,10 @@ int rgetsockopt(int socket, int level, int optname, void *optval, socklen_t *optlen); int rfcntl(int socket, int cmd, ... /* arg */ ); +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset); +int riounmap(int socket, void *buf, size_t len); +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags); + #ifdef __cplusplus } #endif diff --git a/src/indexer.h b/src/indexer.h index 26e7f986..0c5f3882 100644 --- a/src/indexer.h +++ b/src/indexer.h @@ -31,6 +31,9 @@ * */ +#if !defined(INDEXER_H) +#define INDEXER_H + #if HAVE_CONFIG_H # include #endif /* HAVE_CONFIG_H */ @@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index) return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ? idm_at(idm, index) : NULL; } + +typedef struct _dlist_entry { + struct _dlist_entry *next; + struct _dlist_entry *prev; +} dlist_entry; + +static inline void dlist_init(dlist_entry *head) +{ + head->next = head; + head->prev = head; +} + +static inline int dlist_empty(dlist_entry *head) +{ + return head->next == head; +} + +static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head) +{ + item->next = head->next; + item->prev = head; + head->next->prev = item; + head->next = item; +} + +static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head) +{ + dlist_insert_after(item, head->prev); +} + +#define dlist_insert_head dlist_insert_after +#define dlist_insert_tail dlist_insert_before + +static inline void dlist_remove(dlist_entry *item) +{ + item->prev->next = item->next; + item->next->prev = item->prev; +} + +#endif /* INDEXER_H */ diff --git a/src/rsocket.c b/src/rsocket.c index 40d94402..ed708d43 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -59,7 +59,6 @@ #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 #define RS_SGL_SIZE 2 -#define RS_MAX_IOMAP 128 static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; @@ -118,6 +117,14 @@ struct rs_iomap { struct rs_sge sge; }; +struct rs_iomap_mr { + uint64_t offset; + struct ibv_mr *mr; + dlist_entry entry; + atomic_t refcnt; + int index; /* -1 if mapping is local and not in iomap_list */ +}; + #define RS_MIN_INLINE (sizeof(struct rs_sge)) #define rs_host_is_net() (1 == htonl(1)) #define RS_CONN_FLAG_NET (1 << 0) @@ -164,6 +171,7 @@ struct rsocket { fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; + fastlock_t iomap_lock; int opts; long fd_flags; @@ -196,7 +204,11 @@ struct rsocket { int remote_sge; struct rs_sge remote_sgl; struct rs_sge remote_iomap; - struct rs_iomap *remote_iomappings; + + struct rs_iomap_mr *remote_iomappings; + dlist_entry iomap_list; + dlist_entry iomap_queue; + int iomap_pending; struct ibv_mr *target_mr; int target_sge; @@ -336,6 +348,9 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); + fastlock_init(&rs->iomap_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); return rs; } @@ -501,6 +516,42 @@ static int rs_create_ep(struct rsocket *rs) return 0; } +/* +static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...) +{ + TODO: write me +} +*/ + +static void rs_release_iomap_mr(struct rs_iomap_mr *iomr) +{ + if (atomic_dec(&iomr->refcnt)) + return; + + dlist_remove(&iomr->entry); + ibv_dereg_mr(iomr->mr); + if (iomr->index >= 0) + iomr->mr = NULL; + else + free(iomr); +} + +static void rs_free_iomappings(struct rsocket *rs) +{ + struct rs_iomap_mr *iomr; + + while (!dlist_empty(&rs->iomap_list)) { + iomr = container_of(rs->iomap_list.next, + struct rs_iomap_mr, entry); + riounmap(iomr->mr->addr, iomr->mr->length); + } + while (!dlist_empty(&rs->iomap_queue)) { + iomr = container_of(rs->iomap_queue.next, + struct rs_iomap_mr, entry); + riounmap(iomr->mr->addr, iomr->mr->length); + } +} + static void rs_free(struct rsocket *rs) { if (rs->index >= 0) @@ -528,11 +579,13 @@ static void rs_free(struct rsocket *rs) } if (rs->cm_id) { + rs_free_iomappings(rs); if (rs->cm_id->qp) rdma_destroy_qp(rs->cm_id); rdma_destroy_id(rs->cm_id); } + fastlock_destroy(&rs->iomap_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -2081,3 +2134,117 @@ int rfcntl(int socket, int cmd, ... /* arg */ ) va_end(args); return ret; } + +static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs) +{ + struct rs_iomap_mr *iomr; + int i; + + if (!rs->remote_iomappings) { + rs->remote_iomappings = calloc(rs->remote_iomap.length, + sizeof(*rs->remote_iomappings)); + if (!rs->remote_iomappings) + return NULL; + + for (i = 0; i < rs->remote_iomap.length; i++) + rs->remote_iomappings[i].index = i; + } + + for (i = 0; i < rs->remote_iomap.length; i++) { + if (!rs->remote_iomappings[i]->mr) + return &rs->remote_iomappings[i]; + } + return NULL; +} + +/* + * If an offset is given, we map to it. If offset is -1, then we map the + * offset to the address of buf. We do not check for conflicts, which must + * be fixed at some point. + */ +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + int ret, access = IBV_ACCESS_LOCAL_WRITE; + + rs = idm_at(&idm, socket); + if ((rs->state != rs_connect_rdwr) || (prot & ~(PROT_WRITE | PROT_NONE))) + return ERR(EINVAL); + + fastlock_acquire(&rs->iomap_lock); + if (prot & PROT_WRITE) { + iomr = rs_get_iomap_mr(rs); + access |= IBV_ACCESS_REMOTE_WRITE; + } else { + iomr = calloc(1, sizeof *iomr); + iomr->index = -1; + } + if (!iomr) { + offset = ERR(ENOMEM); + goto out; + } + + iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access); + if (!iomr->mr) { + if (iomr->index < 0) + free(iomr); + offset = -1; + goto out; + } + + if (offset == -1) + offset = (uintptr_t) buf; + iomr->offset = offset; + iomr->prot = prot; + atomic_init(&iomr->refcnt); + atomic_set(&iomr->refcnt, 1); + + if (iomr->index >= 0) { + dlist_insert_tail(&iomr->entry, &rs->iomap_queue); + rs->iomap_pending = 1; + } else { + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + } +out: + fastlock_release(&rs->iomap_lock); + return offset; +} + +int riounmap(int socket, void *buf, size_t len) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + struct dlist_entry *entry; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->iomap_lock); + + for (entry = rs->iomap_list.next; entry != &rs->iomap_list; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + + for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + ret = ERR(EINVAL); +out: + fastlock_release(&rs->iomap_lock); + return ret; +} + +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) +{ + +} -- 2.41.0