From 5a6fcf8af7162d16274556981d27be74f8a43295 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Sun, 21 Oct 2012 14:16:03 -0700 Subject: [PATCH] rsocket: Add APIs for direct data placement We introduce rsocket extensions for supporting direct data placement (also known as zero copy). Direct data placement avoids data copies into network buffers when sending or receiving data. This patch implements zero copies on the receive side, but adds some basic framework for supporting it on the sending side. Integrating zero copy support into the existing socket APIs is difficult to achieve when the sockets are set as nonblocking. Any such implementation is likely to be unusable in practice. The problem stems from the fact that socket operations are synchronous in nature. Support for asynchronous operations is limited to connection establishment. Therefore we introduce new calls to handle direct data placement. The use of the new calls is optional and does not affect the use of the existing calls. An attempt is made to have the new routines integrate naturally with the existing APIs. The new functions are: riomap, riounmap, and riowrite. The basic operation can be described as follows: 1. App A calls riomap to register a data buffer with the local RDMA device. Riomap returns an off_t offset value that corresponds to the registered data buffer. The app may select the offset value. 2. Rsockets will transmit an internal message to the remote peer with information about the registration. This exchange is hidden from the applications. 3. App A sends a notification message to app B indicating that the remote iomapped buffer is now available to receive data. 4. App B calls riowrite to transmit data directly into the riomapped data buffer. 5. App B sends a notification message to app A indicating that data is available in the mapped buffer. 6. After all transfers are complete, app A calls riounmap to deregister its data buffer. Riomap and riounmap are functionally equivalent to RDMA memory registration and deregistration routines. They are loosely based on the mmap and munmap APIs. off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) Riomap registers an application buffer with the RDMA hardware associated with an rsocket. The buffer is registered either for local only access (PROT_NONE) or for remote write access (PROT_WRITE). When registered for remote access, the buffer is mapped to a given offset. The offset is either provided by the user, or if the user selects -1 for the offset, rsockets selects one. The remote peer may access an iomapped buffer directly by specifying the correct offset. The mapping is not guaranteed to be available until after the remote peer receives a data transfer initiated after riomap has completed. int riounmap(int socket, void *buf, size_t len) Riounmap removes the mapping between a buffer and an rsocket. size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) Riowrite allows an application to transfer data over an rsocket directly into a remotely iomapped buffer. The remote buffer is specified through an offset parameter, which corresponds to a remote iomapped buffer. From the sender's perspective, riowrite behaves similar to rwrite. From a receiver's view, riowrite transfers are silently redirected into a pre- determined data buffer. Data is received automatically, and the receiver is not informed of the transfer. However, iowrite data is still considered part of the data stream, such that iowrite data will be written before a subsequent transfer is received. A message sent immediately after initiating an iowrite may be used to notify the receiver of the iowrite. It should be noted that the current implementation primarily focused on being functional for evaluation purposes. Some checks have been deferred for subsequent patches, and performance is currently limited by linear lookups. Signed-off-by: Sean Hefty --- docs/rsocket | 51 +++- include/rdma/rsocket.h | 10 +- man/rsocket.7 | 49 +++- src/indexer.h | 43 ++++ src/librdmacm.map | 3 + src/rsocket.c | 528 ++++++++++++++++++++++++++++++++++++++--- 6 files changed, 645 insertions(+), 39 deletions(-) diff --git a/docs/rsocket b/docs/rsocket index 5399f6cb..1484f65b 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -110,11 +110,11 @@ Bits Message Meaning of 31:29 Type Bits 28:0 000 Data Transfer bytes transfered 001 reserved -010 reserved +010 reserved - used internally, available for future use 011 reserved 100 Credit Update received credits granted 101 reserved -110 reserved +110 Iomap Updated index of updated entry 111 Control control message type Data Transfer @@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in use. This is done by tracking when a receive buffer referenced by a remote target SGL has been filled. +Iomap Updated +Used to indicate that a remote iomap entry was updated. The updated entry +contains the offset value associated with an address, length, and rkey. Once +an iomap has been updated, the local application can issue directed IO +transfers against the corresponding remote buffer. + Control Message - DISCONNECT Indicates that the rsocket connection has been fully disconnected and will no longer send or receive data. Data received before the disconnect message was @@ -142,3 +148,44 @@ Control Message - SHUTDOWN Indicates that the remote rsocket has shutdown the send side of its connection. The recipient of a shutdown message will no longer accept incoming data, but may still transfer outbound data. + + +Iomapped Buffers +---------------- +Rsockets allows for zero-copy transfers using what it refers to as iomapped +buffers. Iomapping and direct data placement (zero-copy) transfers are done +using rsocket specific extensions. The general operation is similar to +that used for normal data transfers described above. + + host A host B + remote iomap + target iomap <----------- [ ] + [ ] ------ + [ ] -- ------ iomapped buffer(s) + -- -----> +--+ + -- | | + -- | | + -- | | + -- +--+ + -- + ---> +--+ + | | + | | + +--+ + +The remote iomap contains the address, size, and rkey of the target iomap. As +the applicaton maps buffers host B to a given rsocket, rsockets will issue an RDMA +write against one of the entries in the target iomap on host A. The +updated entry will reference an available iomapped buffer. Immediate data +included with the RDMA write will indicate to host A that a target iomap +has been updated. + +When host A wishes to transfer directly into an iomapped buffer, it will check +its target iomap for an offset corresponding to a remotely mapped buffer. A +matching iomap entry will contain the address, size, and rkey of the target +buffer on host B. Host A will then issue an RDMA operation against the +registered remote data buffer. + +From host A's perspective, the transfer appears as a normal send/write +operation, with the data stream redirected directly into the receiving +application's buffer. diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h index 65feda96..f220c134 100644 --- a/include/rdma/rsocket.h +++ b/include/rdma/rsocket.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011 Intel Corporation. All rights reserved. + * Copyright (c) 2011-2012 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -39,6 +39,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); enum { RDMA_SQSIZE, RDMA_RQSIZE, - RDMA_INLINE + RDMA_INLINE, + RDMA_IOMAPSIZE }; int rsetsockopt(int socket, int level, int optname, @@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname, void *optval, socklen_t *optlen); int rfcntl(int socket, int cmd, ... /* arg */ ); +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset); +int riounmap(int socket, void *buf, size_t len); +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags); + #ifdef __cplusplus } #endif diff --git a/man/rsocket.7 b/man/rsocket.7 index 2ed5ca43..5ecebc2e 100644 --- a/man/rsocket.7 +++ b/man/rsocket.7 @@ -6,7 +6,7 @@ rsocket \- RDMA socket API .SH "DESCRIPTION" RDMA socket API and protocol .SH "NOTES" -rsockets is a protocol over RDMA that supports a socket-level API +Rsockets is a protocol over RDMA that supports a socket-level API for applications. rsocket APIs are intended to match the behavior of corresponding socket calls, except where noted. rsocket functions match the name and function signature of socket calls, @@ -30,7 +30,7 @@ rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl .P -Functions take the same parameters as that use for sockets. The +Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: .P PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG @@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF .P O_NONBLOCK .P +Rsockets provides extensions beyond normal socket routines that +allow for direct placement of data into an application's buffer. +This is also known as zero-copy support, since data is sent and +received directly, bypassing copies into network controlled buffers. +The following calls and options support direct data placement. +.P +riomap, riounmap, riowrite +.TP +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) +.TP +Riomap registers an application buffer with the RDMA hardware +associated with an rsocket. The buffer is registered either for +local only access (PROT_NONE) or for remote write access (PROT_WRITE). +When registered for remote access, the buffer is mapped to a given +offset. The offset is either provided by the user, or if the user +selects -1 for the offset, rsockets selects one. The remote peer may +access an iomapped buffer directly by specifying the correct offset. +The mapping is not guaranteed to be available until after the remote +peer receives a data transfer initiated after riomap has completed. +.P +riounmap +.TP +int riounmap(int socket, void *buf, size_t len) +.TP +Riounmap removes the mapping between a buffer and an rsocket. +.P +riowrite +.TP +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) +.TP +Riowrite allows an application to transfer data over an rsocket +directly into a remotely iomapped buffer. The remote buffer is specified +through an offset parameter, which corresponds to a remote iomapped buffer. +From the sender's perspective, riowrite behaves similar to rwrite. From +a receiver's view, riowrite transfers are silently redirected into a pre- +determined data buffer. Data is received automatically, and the receiver +is not informed of the transfer. However, iowrite data is still considered +part of the data stream, such that iowrite data will be written before a +subsequent transfer is received. A message sent immediately after initiating +an iowrite may be used to notify the receiver of the iowrite. +.P In addition to standard socket options, rsockets supports options specific to RDMA devices and protocols. These options are accessible through rsetsockopt using SOL_RDMA option level. @@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue. RDMA_RQSIZE - Integer size of the underlying receive queue. .TP RDMA_INLINE - Integer size of inline data. +.TP +RDMA_IOMAPSIZE - Integer number of remote IO mappings supported .P Note that rsockets fd's cannot be passed into non-rsocket calls. For applications which must mix rsocket fd's with standard socket fd's or @@ -84,6 +127,8 @@ rqsize_default - default size of receive queue .P inline_default - default size of inline data .P +iomap_size - default size of remote iomapping table +.P If configuration files are not available, rsockets uses internal defaults. .SH "SEE ALSO" rdma_cm(7) diff --git a/src/indexer.h b/src/indexer.h index 26e7f986..0c5f3882 100644 --- a/src/indexer.h +++ b/src/indexer.h @@ -31,6 +31,9 @@ * */ +#if !defined(INDEXER_H) +#define INDEXER_H + #if HAVE_CONFIG_H # include #endif /* HAVE_CONFIG_H */ @@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index) return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ? idm_at(idm, index) : NULL; } + +typedef struct _dlist_entry { + struct _dlist_entry *next; + struct _dlist_entry *prev; +} dlist_entry; + +static inline void dlist_init(dlist_entry *head) +{ + head->next = head; + head->prev = head; +} + +static inline int dlist_empty(dlist_entry *head) +{ + return head->next == head; +} + +static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head) +{ + item->next = head->next; + item->prev = head; + head->next->prev = item; + head->next = item; +} + +static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head) +{ + dlist_insert_after(item, head->prev); +} + +#define dlist_insert_head dlist_insert_after +#define dlist_insert_tail dlist_insert_before + +static inline void dlist_remove(dlist_entry *item) +{ + item->prev->next = item->next; + item->next->prev = item->prev; +} + +#endif /* INDEXER_H */ diff --git a/src/librdmacm.map b/src/librdmacm.map index 5c317a3e..d5ef7363 100644 --- a/src/librdmacm.map +++ b/src/librdmacm.map @@ -63,5 +63,8 @@ RDMACM_1.0 { rselect; rdma_get_src_port; rdma_get_dst_port; + riomap; + riounmap; + riowrite; local: *; }; diff --git a/src/rsocket.c b/src/rsocket.c index cc5effe1..29b8c018 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -55,6 +55,7 @@ #define RS_OLAP_START_SIZE 2048 #define RS_MAX_TRANSFER 65536 +#define RS_SNDLOWAT 64 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 @@ -62,6 +63,7 @@ static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; static uint16_t def_rqsize = 384; @@ -76,19 +78,22 @@ static uint32_t polling_time = 10; * bit 29: more data, 0 - end of transfer, 1 - more data available * * for data transfers: - * bits [28:0]: bytes transfered, 0 = 1 GB + * bits [28:0]: bytes transfered * for control messages: + * SGL, CTRL * bits [28-0]: receive credits granted + * IOMAP_SGL + * bits [28-16]: reserved, bits [15-0]: index */ enum { RS_OP_DATA, RS_OP_RSVD_DATA_MORE, - RS_OP_RSVD_DRA, + RS_OP_WRITE, /* opcode is not transmitted over the network */ RS_OP_RSVD_DRA_MORE, RS_OP_SGL, RS_OP_RSVD, - RS_OP_RSVD_DRA_SGL, + RS_OP_IOMAP_SGL, RS_OP_CTRL }; #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) @@ -111,15 +116,30 @@ struct rs_sge { uint32_t length; }; -#define RS_MIN_INLINE (sizeof(struct rs_sge)) -#define rs_host_is_net() (1 == htonl(1)) -#define RS_CONN_FLAG_NET 1 +struct rs_iomap { + uint64_t offset; + struct rs_sge sge; +}; + +struct rs_iomap_mr { + uint64_t offset; + struct ibv_mr *mr; + dlist_entry entry; + atomic_t refcnt; + int index; /* -1 if mapping is local and not in iomap_list */ +}; + +#define RS_MIN_INLINE (sizeof(struct rs_sge)) +#define rs_host_is_net() (1 == htonl(1)) +#define RS_CONN_FLAG_NET (1 << 0) +#define RS_CONN_FLAG_IOMAP (1 << 1) struct rs_conn_data { uint8_t version; uint8_t flags; uint16_t credits; - uint32_t reserved2; + uint8_t reserved[3]; + uint8_t target_iomap_size; struct rs_sge target_sgl; struct rs_sge data_buf; }; @@ -155,6 +175,7 @@ struct rsocket { fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; + fastlock_t iomap_lock; int opts; long fd_flags; @@ -186,10 +207,19 @@ struct rsocket { int remote_sge; struct rs_sge remote_sgl; + struct rs_sge remote_iomap; + + struct rs_iomap_mr *remote_iomappings; + dlist_entry iomap_list; + dlist_entry iomap_queue; + int iomap_pending; struct ibv_mr *target_mr; int target_sge; - volatile struct rs_sge target_sgl[RS_SGL_SIZE]; + int target_iomap_size; + void *target_buffer_list; + volatile struct rs_sge *target_sgl; + struct rs_iomap *target_iomap; uint32_t rbuf_size; struct ibv_mr *rmr; @@ -201,6 +231,18 @@ struct rsocket { uint8_t *sbuf; }; +static int rs_value_to_scale(int value, int bits) +{ + return value <= (1 << (bits - 1)) ? + value : (1 << (bits - 1)) | (value >> bits); +} + +static int rs_scale_to_value(int value, int bits) +{ + return value <= (1 << (bits - 1)) ? + value : (value & ~(1 << (bits - 1))) << bits; +} + void rs_configure(void) { FILE *f; @@ -247,9 +289,17 @@ void rs_configure(void) if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) { fscanf(f, "%u", &def_wmem); fclose(f); + if (def_wmem < RS_SNDLOWAT) + def_wmem = RS_SNDLOWAT << 1; + } + + if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) { + fscanf(f, "%hu", &def_iomap_size); + fclose(f); - if (def_wmem < 1) - def_wmem = 1; + /* round to supported values */ + def_iomap_size = (uint8_t) rs_value_to_scale( + (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8); } init = 1; out: @@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->sq_size = inherited_rs->sq_size; rs->rq_size = inherited_rs->rq_size; rs->ctrl_avail = inherited_rs->ctrl_avail; + rs->target_iomap_size = inherited_rs->target_iomap_size; } else { rs->sbuf_size = def_wmem; rs->rbuf_size = def_mem; @@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->sq_size = def_sqsize; rs->rq_size = def_rqsize; rs->ctrl_avail = RS_QP_CTRL_SIZE; + rs->target_iomap_size = def_iomap_size; } fastlock_init(&rs->slock); fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); + fastlock_init(&rs->iomap_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); return rs; } @@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs) static int rs_init_bufs(struct rsocket *rs) { + size_t len; + rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); if (!rs->rmsg) return -1; @@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs) if (!rs->smr) return -1; - rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl, - sizeof(rs->target_sgl)); + len = sizeof(*rs->target_sgl) * RS_SGL_SIZE + + sizeof(*rs->target_iomap) * rs->target_iomap_size; + rs->target_buffer_list = malloc(len); + if (!rs->target_buffer_list) + return -1; + + rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); if (!rs->target_mr) return -1; + memset(rs->target_buffer_list, 0, len); + rs->target_sgl = rs->target_buffer_list; + if (rs->target_iomap_size) + rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE); + rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); if (!rs->rbuf) return -1; @@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs) return 0; } +/* +static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...) +{ + TODO: write me +} +*/ + +static void rs_release_iomap_mr(struct rs_iomap_mr *iomr) +{ + if (atomic_dec(&iomr->refcnt)) + return; + + dlist_remove(&iomr->entry); + ibv_dereg_mr(iomr->mr); + if (iomr->index >= 0) + iomr->mr = NULL; + else + free(iomr); +} + +static void rs_free_iomappings(struct rsocket *rs) +{ + struct rs_iomap_mr *iomr; + + while (!dlist_empty(&rs->iomap_list)) { + iomr = container_of(rs->iomap_list.next, + struct rs_iomap_mr, entry); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); + } + while (!dlist_empty(&rs->iomap_queue)) { + iomr = container_of(rs->iomap_queue.next, + struct rs_iomap_mr, entry); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); + } +} + static void rs_free(struct rsocket *rs) { if (rs->index >= 0) @@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs) free(rs->rbuf); } - if (rs->target_mr) - rdma_dereg_mr(rs->target_mr); + if (rs->target_buffer_list) { + if (rs->target_mr) + rdma_dereg_mr(rs->target_mr); + free(rs->target_buffer_list); + } if (rs->cm_id) { + rs_free_iomappings(rs); if (rs->cm_id->qp) rdma_destroy_qp(rs->cm_id); rdma_destroy_id(rs->cm_id); } + fastlock_destroy(&rs->iomap_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param, struct rs_conn_data *conn) { conn->version = 1; - conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0; + conn->flags = RS_CONN_FLAG_IOMAP | + (rs_host_is_net() ? RS_CONN_FLAG_NET : 0); conn->credits = htons(rs->rq_size); - conn->reserved2 = 0; + memset(conn->reserved, 0, sizeof conn->reserved); + conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8); conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl); conn->target_sgl.length = htonl(RS_SGL_SIZE); @@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET))) rs->opts = RS_OPT_SWAP_SGL; + if (conn->flags & RS_CONN_FLAG_IOMAP) { + rs->remote_iomap.addr = rs->remote_sgl.addr + + sizeof(rs->remote_sgl) * rs->remote_sgl.length; + rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8); + rs->remote_iomap.key = rs->remote_sgl.key; + } + rs->target_sgl[0].addr = ntohll(conn->data_buf.addr); rs->target_sgl[0].length = ntohl(conn->data_buf.length); rs->target_sgl[0].key = ntohl(conn->data_buf.key); @@ -753,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return rs_do_connect(rs); } -static int rs_post_write(struct rsocket *rs, +static int rs_post_write_msg(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t imm_data, int flags, uint64_t addr, uint32_t rkey) @@ -773,6 +890,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int rs_post_write(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags, + uint64_t addr, uint32_t rkey) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_RDMA_WRITE; + wr.send_flags = flags; + wr.wr.rdma.remote_addr = addr; + wr.wr.rdma.rkey = rkey; + + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -799,8 +935,35 @@ static int rs_write_data(struct rsocket *rs, rs->target_sge = 0; } - return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), - flags, addr, rkey); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), + flags, addr, rkey); +} + +static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, + struct ibv_sge *sgl, int nsge, uint32_t length, int flags) +{ + uint64_t addr; + + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + + addr = iom->sge.addr + offset - iom->offset; + return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), + flags, addr, iom->sge.key); +} + +static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, + struct ibv_sge *sgl, int nsge, int flags) +{ + uint64_t addr; + + rs->sseq_no++; + rs->sqe_avail--; + rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); + + addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), + flags, addr, rs->remote_iomap.key); } static uint32_t rs_sbuf_left(struct rsocket *rs) @@ -831,12 +994,12 @@ static void rs_send_credits(struct rsocket *rs) ibsge.lkey = 0; ibsge.length = sizeof(sge); - rs_post_write(rs, &ibsge, 1, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), - IBV_SEND_INLINE, - rs->remote_sgl.addr + - rs->remote_sge * sizeof(struct rs_sge), - rs->remote_sgl.key); + rs_post_write_msg(rs, &ibsge, 1, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + IBV_SEND_INLINE, + rs->remote_sgl.addr + + rs->remote_sge * sizeof(struct rs_sge), + rs->remote_sgl.key); rs->rbuf_bytes_avail -= rs->rbuf_size >> 1; rs->rbuf_free_offset += rs->rbuf_size >> 1; @@ -845,8 +1008,9 @@ static void rs_send_credits(struct rsocket *rs) if (++rs->remote_sge == rs->remote_sgl.length) rs->remote_sge = 0; } else { - rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0); + rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + 0, 0, 0); } } @@ -880,6 +1044,9 @@ static int rs_poll_cq(struct rsocket *rs) case RS_OP_SGL: rs->sseq_comp = (uint16_t) rs_msg_data(imm_data); break; + case RS_OP_IOMAP_SGL: + /* The iomap was updated, that's nice to know. */ + break; case RS_OP_CTRL: if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; @@ -888,6 +1055,9 @@ static int rs_poll_cq(struct rsocket *rs) rs->state &= ~rs_connect_rd; } break; + case RS_OP_WRITE: + /* We really shouldn't be here. */ + break; default: rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data); rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data); @@ -905,6 +1075,10 @@ static int rs_poll_cq(struct rsocket *rs) if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) rs->state = rs_disconnected; break; + case RS_OP_IOMAP_SGL: + rs->sqe_avail++; + rs->sbuf_bytes_avail += sizeof(struct rs_iomap); + break; default: rs->sqe_avail++; rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); @@ -1046,7 +1220,7 @@ static int rs_poll_all(struct rsocket *rs) */ static int rs_can_send(struct rsocket *rs) { - return rs->sqe_avail && rs->sbuf_bytes_avail && + return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && (rs->sseq_no != rs->sseq_comp) && (rs->target_sgl[rs->target_sge].length != 0); } @@ -1216,6 +1390,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt) return rrecvv(socket, iov, iovcnt, 0); } +static int rs_send_iomaps(struct rsocket *rs, int flags) +{ + struct rs_iomap_mr *iomr; + struct ibv_sge sge; + struct rs_iomap iom; + int ret; + + fastlock_acquire(&rs->iomap_lock); + while (!dlist_empty(&rs->iomap_queue)) { + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry); + if (!(rs->opts & RS_OPT_SWAP_SGL)) { + iom.offset = iomr->offset; + iom.sge.addr = (uintptr_t) iomr->mr->addr; + iom.sge.length = iomr->mr->length; + iom.sge.key = iomr->mr->rkey; + } else { + iom.offset = bswap_64(iomr->offset); + iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr); + iom.sge.length = bswap_32(iomr->mr->length); + iom.sge.key = bswap_32(iomr->mr->rkey); + } + + if (rs->sq_inline >= sizeof iom) { + sge.addr = (uintptr_t) &iom; + sge.length = sizeof iom; + sge.lkey = 0; + ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE); + } else if (rs_sbuf_left(rs) >= sizeof iom) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom); + rs->ssgl[0].length = sizeof iom; + ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0); + if (rs_sbuf_left(rs) > sizeof iom) + rs->ssgl[0].addr += sizeof iom; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, + rs->ssgl[0].length); + rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length; + memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length, + rs->ssgl[1].length); + ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + dlist_remove(&iomr->entry); + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + if (ret) + break; + } + + rs->iomap_pending = !dlist_empty(&rs->iomap_queue); + fastlock_release(&rs->iomap_lock); + return ret; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1224,7 +1465,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) { struct rsocket *rs; struct ibv_sge sge; - size_t left; + size_t left = len; uint32_t xfer_size, olen = RS_OLAP_START_SIZE; int ret = 0; @@ -1239,7 +1480,12 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } fastlock_acquire(&rs->slock); - for (left = len; left; left -= xfer_size, buf += xfer_size) { + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_can_send); @@ -1289,6 +1535,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1345,9 +1592,15 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags len = iov[0].iov_len; for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; + left = len; fastlock_acquire(&rs->slock); - for (left = len; left; left -= xfer_size) { + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_can_send); @@ -1395,6 +1648,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1725,8 +1979,8 @@ int rshutdown(int socket, int how) if ((rs->state & rs_connected) && rs->ctrl_avail) { rs->ctrl_avail--; - ret = rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); + ret = rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); } } @@ -1814,6 +2068,8 @@ int rsetsockopt(int socket, int level, int optname, case SO_SNDBUF: if (!rs->sbuf) rs->sbuf_size = (*(uint32_t *) optval) << 1; + if (rs->sbuf_size < RS_SNDLOWAT) + rs->sbuf_size = RS_SNDLOWAT << 1; ret = 0; break; case SO_LINGER: @@ -1878,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname, if (rs->sq_inline < RS_MIN_INLINE) rs->sq_inline = RS_MIN_INLINE; break; + case RDMA_IOMAPSIZE: + rs->target_iomap_size = (uint16_t) rs_scale_to_value( + (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8); + break; default: break; } @@ -1979,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname, *((int *) optval) = rs->sq_inline; *optlen = sizeof(int); break; + case RDMA_IOMAPSIZE: + *((int *) optval) = rs->target_iomap_size; + *optlen = sizeof(int); + break; default: ret = ENOTSUP; break; @@ -2020,3 +2284,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ ) va_end(args); return ret; } + +static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs) +{ + int i; + + if (!rs->remote_iomappings) { + rs->remote_iomappings = calloc(rs->remote_iomap.length, + sizeof(*rs->remote_iomappings)); + if (!rs->remote_iomappings) + return NULL; + + for (i = 0; i < rs->remote_iomap.length; i++) + rs->remote_iomappings[i].index = i; + } + + for (i = 0; i < rs->remote_iomap.length; i++) { + if (!rs->remote_iomappings[i].mr) + return &rs->remote_iomappings[i]; + } + return NULL; +} + +/* + * If an offset is given, we map to it. If offset is -1, then we map the + * offset to the address of buf. We do not check for conflicts, which must + * be fixed at some point. + */ +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + int access = IBV_ACCESS_LOCAL_WRITE; + + rs = idm_at(&idm, socket); + if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) + return ERR(EINVAL); + + fastlock_acquire(&rs->iomap_lock); + if (prot & PROT_WRITE) { + iomr = rs_get_iomap_mr(rs); + access |= IBV_ACCESS_REMOTE_WRITE; + } else { + iomr = calloc(1, sizeof *iomr); + iomr->index = -1; + } + if (!iomr) { + offset = ERR(ENOMEM); + goto out; + } + + iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access); + if (!iomr->mr) { + if (iomr->index < 0) + free(iomr); + offset = -1; + goto out; + } + + if (offset == -1) + offset = (uintptr_t) buf; + iomr->offset = offset; + atomic_init(&iomr->refcnt); + atomic_set(&iomr->refcnt, 1); + + if (iomr->index >= 0) { + dlist_insert_tail(&iomr->entry, &rs->iomap_queue); + rs->iomap_pending = 1; + } else { + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + } +out: + fastlock_release(&rs->iomap_lock); + return offset; +} + +int riounmap(int socket, void *buf, size_t len) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + dlist_entry *entry; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->iomap_lock); + + for (entry = rs->iomap_list.next; entry != &rs->iomap_list; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + + for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + ret = ERR(EINVAL); +out: + fastlock_release(&rs->iomap_lock); + return ret; +} + +static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset) +{ + int i; + + for (i = 0; i < rs->target_iomap_size; i++) { + if (offset >= rs->target_iomap[i].offset && + offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length) + return &rs->target_iomap[i]; + } + return NULL; +} + +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) +{ + struct rsocket *rs; + struct rs_iomap *iom = NULL; + struct ibv_sge sge; + size_t left = count; + uint32_t xfer_size, olen = RS_OLAP_START_SIZE; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->slock); + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) { + if (!iom || offset > iom->offset + iom->sge.length) { + iom = rs_find_iomap(rs, offset); + if (!iom) + break; + } + + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + if (olen < left) { + xfer_size = olen; + if (olen < RS_MAX_TRANSFER) + olen <<= 1; + } else { + xfer_size = left; + } + + if (xfer_size > rs->sbuf_bytes_avail) + xfer_size = rs->sbuf_bytes_avail; + if (xfer_size > iom->offset + iom->sge.length - offset) + xfer_size = iom->offset + iom->sge.length - offset; + + if (xfer_size <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = xfer_size; + sge.lkey = 0; + ret = rs_write_direct(rs, iom, offset, &sge, 1, + xfer_size, IBV_SEND_INLINE); + } else if (xfer_size <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); + rs->ssgl[0].length = xfer_size; + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0); + if (xfer_size < rs_sbuf_left(rs)) + rs->ssgl[0].addr += xfer_size; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + if (ret) + break; + } +out: + fastlock_release(&rs->slock); + + return (ret && left == count) ? ret : count - left; +} -- 2.41.0