From: Sean Hefty Date: Sun, 21 Oct 2012 21:16:03 +0000 (-0700) Subject: rsocket: Add direct data placement X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=e5a8d2c61f31c60bf37aa3ad41219bdf62e08e57;p=~shefty%2Flibrdmacm.git rsocket: Add direct data placement Signed-off-by: Sean Hefty --- diff --git a/docs/rsocket b/docs/rsocket index 5399f6cb..1484f65b 100644 --- a/docs/rsocket +++ b/docs/rsocket @@ -110,11 +110,11 @@ Bits Message Meaning of 31:29 Type Bits 28:0 000 Data Transfer bytes transfered 001 reserved -010 reserved +010 reserved - used internally, available for future use 011 reserved 100 Credit Update received credits granted 101 reserved -110 reserved +110 Iomap Updated index of updated entry 111 Control control message type Data Transfer @@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in use. This is done by tracking when a receive buffer referenced by a remote target SGL has been filled. +Iomap Updated +Used to indicate that a remote iomap entry was updated. The updated entry +contains the offset value associated with an address, length, and rkey. Once +an iomap has been updated, the local application can issue directed IO +transfers against the corresponding remote buffer. + Control Message - DISCONNECT Indicates that the rsocket connection has been fully disconnected and will no longer send or receive data. Data received before the disconnect message was @@ -142,3 +148,44 @@ Control Message - SHUTDOWN Indicates that the remote rsocket has shutdown the send side of its connection. The recipient of a shutdown message will no longer accept incoming data, but may still transfer outbound data. + + +Iomapped Buffers +---------------- +Rsockets allows for zero-copy transfers using what it refers to as iomapped +buffers. Iomapping and direct data placement (zero-copy) transfers are done +using rsocket specific extensions. The general operation is similar to +that used for normal data transfers described above. + + host A host B + remote iomap + target iomap <----------- [ ] + [ ] ------ + [ ] -- ------ iomapped buffer(s) + -- -----> +--+ + -- | | + -- | | + -- | | + -- +--+ + -- + ---> +--+ + | | + | | + +--+ + +The remote iomap contains the address, size, and rkey of the target iomap. As +the applicaton maps buffers host B to a given rsocket, rsockets will issue an RDMA +write against one of the entries in the target iomap on host A. The +updated entry will reference an available iomapped buffer. Immediate data +included with the RDMA write will indicate to host A that a target iomap +has been updated. + +When host A wishes to transfer directly into an iomapped buffer, it will check +its target iomap for an offset corresponding to a remotely mapped buffer. A +matching iomap entry will contain the address, size, and rkey of the target +buffer on host B. Host A will then issue an RDMA operation against the +registered remote data buffer. + +From host A's perspective, the transfer appears as a normal send/write +operation, with the data stream redirected directly into the receiving +application's buffer. diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h index 65feda96..f220c134 100644 --- a/include/rdma/rsocket.h +++ b/include/rdma/rsocket.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011 Intel Corporation. All rights reserved. + * Copyright (c) 2011-2012 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -39,6 +39,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen); enum { RDMA_SQSIZE, RDMA_RQSIZE, - RDMA_INLINE + RDMA_INLINE, + RDMA_IOMAPSIZE }; int rsetsockopt(int socket, int level, int optname, @@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname, void *optval, socklen_t *optlen); int rfcntl(int socket, int cmd, ... /* arg */ ); +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset); +int riounmap(int socket, void *buf, size_t len); +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags); + #ifdef __cplusplus } #endif diff --git a/man/rsocket.7 b/man/rsocket.7 index 2ed5ca43..5ecebc2e 100644 --- a/man/rsocket.7 +++ b/man/rsocket.7 @@ -6,7 +6,7 @@ rsocket \- RDMA socket API .SH "DESCRIPTION" RDMA socket API and protocol .SH "NOTES" -rsockets is a protocol over RDMA that supports a socket-level API +Rsockets is a protocol over RDMA that supports a socket-level API for applications. rsocket APIs are intended to match the behavior of corresponding socket calls, except where noted. rsocket functions match the name and function signature of socket calls, @@ -30,7 +30,7 @@ rgetpeername, rgetsockname .P rsetsockopt, rgetsockopt, rfcntl .P -Functions take the same parameters as that use for sockets. The +Functions take the same parameters as that used for sockets. The follow capabilities and flags are supported at this time: .P PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG @@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF .P O_NONBLOCK .P +Rsockets provides extensions beyond normal socket routines that +allow for direct placement of data into an application's buffer. +This is also known as zero-copy support, since data is sent and +received directly, bypassing copies into network controlled buffers. +The following calls and options support direct data placement. +.P +riomap, riounmap, riowrite +.TP +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) +.TP +Riomap registers an application buffer with the RDMA hardware +associated with an rsocket. The buffer is registered either for +local only access (PROT_NONE) or for remote write access (PROT_WRITE). +When registered for remote access, the buffer is mapped to a given +offset. The offset is either provided by the user, or if the user +selects -1 for the offset, rsockets selects one. The remote peer may +access an iomapped buffer directly by specifying the correct offset. +The mapping is not guaranteed to be available until after the remote +peer receives a data transfer initiated after riomap has completed. +.P +riounmap +.TP +int riounmap(int socket, void *buf, size_t len) +.TP +Riounmap removes the mapping between a buffer and an rsocket. +.P +riowrite +.TP +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) +.TP +Riowrite allows an application to transfer data over an rsocket +directly into a remotely iomapped buffer. The remote buffer is specified +through an offset parameter, which corresponds to a remote iomapped buffer. +From the sender's perspective, riowrite behaves similar to rwrite. From +a receiver's view, riowrite transfers are silently redirected into a pre- +determined data buffer. Data is received automatically, and the receiver +is not informed of the transfer. However, iowrite data is still considered +part of the data stream, such that iowrite data will be written before a +subsequent transfer is received. A message sent immediately after initiating +an iowrite may be used to notify the receiver of the iowrite. +.P In addition to standard socket options, rsockets supports options specific to RDMA devices and protocols. These options are accessible through rsetsockopt using SOL_RDMA option level. @@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue. RDMA_RQSIZE - Integer size of the underlying receive queue. .TP RDMA_INLINE - Integer size of inline data. +.TP +RDMA_IOMAPSIZE - Integer number of remote IO mappings supported .P Note that rsockets fd's cannot be passed into non-rsocket calls. For applications which must mix rsocket fd's with standard socket fd's or @@ -84,6 +127,8 @@ rqsize_default - default size of receive queue .P inline_default - default size of inline data .P +iomap_size - default size of remote iomapping table +.P If configuration files are not available, rsockets uses internal defaults. .SH "SEE ALSO" rdma_cm(7) diff --git a/src/indexer.h b/src/indexer.h index 26e7f986..0c5f3882 100644 --- a/src/indexer.h +++ b/src/indexer.h @@ -31,6 +31,9 @@ * */ +#if !defined(INDEXER_H) +#define INDEXER_H + #if HAVE_CONFIG_H # include #endif /* HAVE_CONFIG_H */ @@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int index) return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)]) ? idm_at(idm, index) : NULL; } + +typedef struct _dlist_entry { + struct _dlist_entry *next; + struct _dlist_entry *prev; +} dlist_entry; + +static inline void dlist_init(dlist_entry *head) +{ + head->next = head; + head->prev = head; +} + +static inline int dlist_empty(dlist_entry *head) +{ + return head->next == head; +} + +static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head) +{ + item->next = head->next; + item->prev = head; + head->next->prev = item; + head->next = item; +} + +static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head) +{ + dlist_insert_after(item, head->prev); +} + +#define dlist_insert_head dlist_insert_after +#define dlist_insert_tail dlist_insert_before + +static inline void dlist_remove(dlist_entry *item) +{ + item->prev->next = item->next; + item->next->prev = item->prev; +} + +#endif /* INDEXER_H */ diff --git a/src/librdmacm.map b/src/librdmacm.map index 5c317a3e..d5ef7363 100644 --- a/src/librdmacm.map +++ b/src/librdmacm.map @@ -63,5 +63,8 @@ RDMACM_1.0 { rselect; rdma_get_src_port; rdma_get_dst_port; + riomap; + riounmap; + riowrite; local: *; }; diff --git a/src/rsocket.c b/src/rsocket.c index cc5effe1..29b8c018 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -55,6 +55,7 @@ #define RS_OLAP_START_SIZE 2048 #define RS_MAX_TRANSFER 65536 +#define RS_SNDLOWAT 64 #define RS_QP_MAX_SIZE 0xFFFE #define RS_QP_CTRL_SIZE 4 #define RS_CONN_RETRIES 6 @@ -62,6 +63,7 @@ static struct index_map idm; static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; static uint16_t def_rqsize = 384; @@ -76,19 +78,22 @@ static uint32_t polling_time = 10; * bit 29: more data, 0 - end of transfer, 1 - more data available * * for data transfers: - * bits [28:0]: bytes transfered, 0 = 1 GB + * bits [28:0]: bytes transfered * for control messages: + * SGL, CTRL * bits [28-0]: receive credits granted + * IOMAP_SGL + * bits [28-16]: reserved, bits [15-0]: index */ enum { RS_OP_DATA, RS_OP_RSVD_DATA_MORE, - RS_OP_RSVD_DRA, + RS_OP_WRITE, /* opcode is not transmitted over the network */ RS_OP_RSVD_DRA_MORE, RS_OP_SGL, RS_OP_RSVD, - RS_OP_RSVD_DRA_SGL, + RS_OP_IOMAP_SGL, RS_OP_CTRL }; #define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data)) @@ -111,15 +116,30 @@ struct rs_sge { uint32_t length; }; -#define RS_MIN_INLINE (sizeof(struct rs_sge)) -#define rs_host_is_net() (1 == htonl(1)) -#define RS_CONN_FLAG_NET 1 +struct rs_iomap { + uint64_t offset; + struct rs_sge sge; +}; + +struct rs_iomap_mr { + uint64_t offset; + struct ibv_mr *mr; + dlist_entry entry; + atomic_t refcnt; + int index; /* -1 if mapping is local and not in iomap_list */ +}; + +#define RS_MIN_INLINE (sizeof(struct rs_sge)) +#define rs_host_is_net() (1 == htonl(1)) +#define RS_CONN_FLAG_NET (1 << 0) +#define RS_CONN_FLAG_IOMAP (1 << 1) struct rs_conn_data { uint8_t version; uint8_t flags; uint16_t credits; - uint32_t reserved2; + uint8_t reserved[3]; + uint8_t target_iomap_size; struct rs_sge target_sgl; struct rs_sge data_buf; }; @@ -155,6 +175,7 @@ struct rsocket { fastlock_t rlock; fastlock_t cq_lock; fastlock_t cq_wait_lock; + fastlock_t iomap_lock; int opts; long fd_flags; @@ -186,10 +207,19 @@ struct rsocket { int remote_sge; struct rs_sge remote_sgl; + struct rs_sge remote_iomap; + + struct rs_iomap_mr *remote_iomappings; + dlist_entry iomap_list; + dlist_entry iomap_queue; + int iomap_pending; struct ibv_mr *target_mr; int target_sge; - volatile struct rs_sge target_sgl[RS_SGL_SIZE]; + int target_iomap_size; + void *target_buffer_list; + volatile struct rs_sge *target_sgl; + struct rs_iomap *target_iomap; uint32_t rbuf_size; struct ibv_mr *rmr; @@ -201,6 +231,18 @@ struct rsocket { uint8_t *sbuf; }; +static int rs_value_to_scale(int value, int bits) +{ + return value <= (1 << (bits - 1)) ? + value : (1 << (bits - 1)) | (value >> bits); +} + +static int rs_scale_to_value(int value, int bits) +{ + return value <= (1 << (bits - 1)) ? + value : (value & ~(1 << (bits - 1))) << bits; +} + void rs_configure(void) { FILE *f; @@ -247,9 +289,17 @@ void rs_configure(void) if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) { fscanf(f, "%u", &def_wmem); fclose(f); + if (def_wmem < RS_SNDLOWAT) + def_wmem = RS_SNDLOWAT << 1; + } + + if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) { + fscanf(f, "%hu", &def_iomap_size); + fclose(f); - if (def_wmem < 1) - def_wmem = 1; + /* round to supported values */ + def_iomap_size = (uint8_t) rs_value_to_scale( + (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8); } init = 1; out: @@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->sq_size = inherited_rs->sq_size; rs->rq_size = inherited_rs->rq_size; rs->ctrl_avail = inherited_rs->ctrl_avail; + rs->target_iomap_size = inherited_rs->target_iomap_size; } else { rs->sbuf_size = def_wmem; rs->rbuf_size = def_mem; @@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->sq_size = def_sqsize; rs->rq_size = def_rqsize; rs->ctrl_avail = RS_QP_CTRL_SIZE; + rs->target_iomap_size = def_iomap_size; } fastlock_init(&rs->slock); fastlock_init(&rs->rlock); fastlock_init(&rs->cq_lock); fastlock_init(&rs->cq_wait_lock); + fastlock_init(&rs->iomap_lock); + dlist_init(&rs->iomap_list); + dlist_init(&rs->iomap_queue); return rs; } @@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs) static int rs_init_bufs(struct rsocket *rs) { + size_t len; + rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); if (!rs->rmsg) return -1; @@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs) if (!rs->smr) return -1; - rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl, - sizeof(rs->target_sgl)); + len = sizeof(*rs->target_sgl) * RS_SGL_SIZE + + sizeof(*rs->target_iomap) * rs->target_iomap_size; + rs->target_buffer_list = malloc(len); + if (!rs->target_buffer_list) + return -1; + + rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); if (!rs->target_mr) return -1; + memset(rs->target_buffer_list, 0, len); + rs->target_sgl = rs->target_buffer_list; + if (rs->target_iomap_size) + rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE); + rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf)); if (!rs->rbuf) return -1; @@ -452,6 +519,42 @@ static int rs_create_ep(struct rsocket *rs) return 0; } +/* +static xxx rs_acquire_iomap_mr(struct rsocket *rs, ...) +{ + TODO: write me +} +*/ + +static void rs_release_iomap_mr(struct rs_iomap_mr *iomr) +{ + if (atomic_dec(&iomr->refcnt)) + return; + + dlist_remove(&iomr->entry); + ibv_dereg_mr(iomr->mr); + if (iomr->index >= 0) + iomr->mr = NULL; + else + free(iomr); +} + +static void rs_free_iomappings(struct rsocket *rs) +{ + struct rs_iomap_mr *iomr; + + while (!dlist_empty(&rs->iomap_list)) { + iomr = container_of(rs->iomap_list.next, + struct rs_iomap_mr, entry); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); + } + while (!dlist_empty(&rs->iomap_queue)) { + iomr = container_of(rs->iomap_queue.next, + struct rs_iomap_mr, entry); + riounmap(rs->index, iomr->mr->addr, iomr->mr->length); + } +} + static void rs_free(struct rsocket *rs) { if (rs->index >= 0) @@ -472,15 +575,20 @@ static void rs_free(struct rsocket *rs) free(rs->rbuf); } - if (rs->target_mr) - rdma_dereg_mr(rs->target_mr); + if (rs->target_buffer_list) { + if (rs->target_mr) + rdma_dereg_mr(rs->target_mr); + free(rs->target_buffer_list); + } if (rs->cm_id) { + rs_free_iomappings(rs); if (rs->cm_id->qp) rdma_destroy_qp(rs->cm_id); rdma_destroy_id(rs->cm_id); } + fastlock_destroy(&rs->iomap_lock); fastlock_destroy(&rs->cq_wait_lock); fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); @@ -492,9 +600,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct rdma_conn_param *param, struct rs_conn_data *conn) { conn->version = 1; - conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0; + conn->flags = RS_CONN_FLAG_IOMAP | + (rs_host_is_net() ? RS_CONN_FLAG_NET : 0); conn->credits = htons(rs->rq_size); - conn->reserved2 = 0; + memset(conn->reserved, 0, sizeof conn->reserved); + conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8); conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl); conn->target_sgl.length = htonl(RS_SGL_SIZE); @@ -518,6 +628,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) (!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET))) rs->opts = RS_OPT_SWAP_SGL; + if (conn->flags & RS_CONN_FLAG_IOMAP) { + rs->remote_iomap.addr = rs->remote_sgl.addr + + sizeof(rs->remote_sgl) * rs->remote_sgl.length; + rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8); + rs->remote_iomap.key = rs->remote_sgl.key; + } + rs->target_sgl[0].addr = ntohll(conn->data_buf.addr); rs->target_sgl[0].length = ntohl(conn->data_buf.length); rs->target_sgl[0].key = ntohl(conn->data_buf.key); @@ -753,7 +870,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen) return rs_do_connect(rs); } -static int rs_post_write(struct rsocket *rs, +static int rs_post_write_msg(struct rsocket *rs, struct ibv_sge *sgl, int nsge, uint32_t imm_data, int flags, uint64_t addr, uint32_t rkey) @@ -773,6 +890,25 @@ static int rs_post_write(struct rsocket *rs, return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); } +static int rs_post_write(struct rsocket *rs, + struct ibv_sge *sgl, int nsge, + uint64_t wr_id, int flags, + uint64_t addr, uint32_t rkey) +{ + struct ibv_send_wr wr, *bad; + + wr.wr_id = wr_id; + wr.next = NULL; + wr.sg_list = sgl; + wr.num_sge = nsge; + wr.opcode = IBV_WR_RDMA_WRITE; + wr.send_flags = flags; + wr.wr.rdma.remote_addr = addr; + wr.wr.rdma.rkey = rkey; + + return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); +} + /* * Update target SGE before sending data. Otherwise the remote side may * update the entry before we do. @@ -799,8 +935,35 @@ static int rs_write_data(struct rsocket *rs, rs->target_sge = 0; } - return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), - flags, addr, rkey); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), + flags, addr, rkey); +} + +static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, + struct ibv_sge *sgl, int nsge, uint32_t length, int flags) +{ + uint64_t addr; + + rs->sqe_avail--; + rs->sbuf_bytes_avail -= length; + + addr = iom->sge.addr + offset - iom->offset; + return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), + flags, addr, iom->sge.key); +} + +static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, + struct ibv_sge *sgl, int nsge, int flags) +{ + uint64_t addr; + + rs->sseq_no++; + rs->sqe_avail--; + rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); + + addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); + return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), + flags, addr, rs->remote_iomap.key); } static uint32_t rs_sbuf_left(struct rsocket *rs) @@ -831,12 +994,12 @@ static void rs_send_credits(struct rsocket *rs) ibsge.lkey = 0; ibsge.length = sizeof(sge); - rs_post_write(rs, &ibsge, 1, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), - IBV_SEND_INLINE, - rs->remote_sgl.addr + - rs->remote_sge * sizeof(struct rs_sge), - rs->remote_sgl.key); + rs_post_write_msg(rs, &ibsge, 1, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + IBV_SEND_INLINE, + rs->remote_sgl.addr + + rs->remote_sge * sizeof(struct rs_sge), + rs->remote_sgl.key); rs->rbuf_bytes_avail -= rs->rbuf_size >> 1; rs->rbuf_free_offset += rs->rbuf_size >> 1; @@ -845,8 +1008,9 @@ static void rs_send_credits(struct rsocket *rs) if (++rs->remote_sge == rs->remote_sgl.length) rs->remote_sge = 0; } else { - rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), 0, 0, 0); + rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), + 0, 0, 0); } } @@ -880,6 +1044,9 @@ static int rs_poll_cq(struct rsocket *rs) case RS_OP_SGL: rs->sseq_comp = (uint16_t) rs_msg_data(imm_data); break; + case RS_OP_IOMAP_SGL: + /* The iomap was updated, that's nice to know. */ + break; case RS_OP_CTRL: if (rs_msg_data(imm_data) == RS_CTRL_DISCONNECT) { rs->state = rs_disconnected; @@ -888,6 +1055,9 @@ static int rs_poll_cq(struct rsocket *rs) rs->state &= ~rs_connect_rd; } break; + case RS_OP_WRITE: + /* We really shouldn't be here. */ + break; default: rs->rmsg[rs->rmsg_tail].op = rs_msg_op(imm_data); rs->rmsg[rs->rmsg_tail].data = rs_msg_data(imm_data); @@ -905,6 +1075,10 @@ static int rs_poll_cq(struct rsocket *rs) if (rs_msg_data((uint32_t) wc.wr_id) == RS_CTRL_DISCONNECT) rs->state = rs_disconnected; break; + case RS_OP_IOMAP_SGL: + rs->sqe_avail++; + rs->sbuf_bytes_avail += sizeof(struct rs_iomap); + break; default: rs->sqe_avail++; rs->sbuf_bytes_avail += rs_msg_data((uint32_t) wc.wr_id); @@ -1046,7 +1220,7 @@ static int rs_poll_all(struct rsocket *rs) */ static int rs_can_send(struct rsocket *rs) { - return rs->sqe_avail && rs->sbuf_bytes_avail && + return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && (rs->sseq_no != rs->sseq_comp) && (rs->target_sgl[rs->target_sge].length != 0); } @@ -1216,6 +1390,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int iovcnt) return rrecvv(socket, iov, iovcnt, 0); } +static int rs_send_iomaps(struct rsocket *rs, int flags) +{ + struct rs_iomap_mr *iomr; + struct ibv_sge sge; + struct rs_iomap iom; + int ret; + + fastlock_acquire(&rs->iomap_lock); + while (!dlist_empty(&rs->iomap_queue)) { + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry); + if (!(rs->opts & RS_OPT_SWAP_SGL)) { + iom.offset = iomr->offset; + iom.sge.addr = (uintptr_t) iomr->mr->addr; + iom.sge.length = iomr->mr->length; + iom.sge.key = iomr->mr->rkey; + } else { + iom.offset = bswap_64(iomr->offset); + iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr); + iom.sge.length = bswap_32(iomr->mr->length); + iom.sge.key = bswap_32(iomr->mr->rkey); + } + + if (rs->sq_inline >= sizeof iom) { + sge.addr = (uintptr_t) &iom; + sge.length = sizeof iom; + sge.lkey = 0; + ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE); + } else if (rs_sbuf_left(rs) >= sizeof iom) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom); + rs->ssgl[0].length = sizeof iom; + ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0); + if (rs_sbuf_left(rs) > sizeof iom) + rs->ssgl[0].addr += sizeof iom; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, + rs->ssgl[0].length); + rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length; + memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length, + rs->ssgl[1].length); + ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + dlist_remove(&iomr->entry); + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + if (ret) + break; + } + + rs->iomap_pending = !dlist_empty(&rs->iomap_queue); + fastlock_release(&rs->iomap_lock); + return ret; +} + /* * We overlap sending the data, by posting a small work request immediately, * then increasing the size of the send on each iteration. @@ -1224,7 +1465,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) { struct rsocket *rs; struct ibv_sge sge; - size_t left; + size_t left = len; uint32_t xfer_size, olen = RS_OLAP_START_SIZE; int ret = 0; @@ -1239,7 +1480,12 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } fastlock_acquire(&rs->slock); - for (left = len; left; left -= xfer_size, buf += xfer_size) { + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_can_send); @@ -1289,6 +1535,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1345,9 +1592,15 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags len = iov[0].iov_len; for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; + left = len; fastlock_acquire(&rs->slock); - for (left = len; left; left -= xfer_size) { + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_get_comp(rs, rs_nonblocking(rs, flags), rs_conn_can_send); @@ -1395,6 +1648,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } +out: fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; @@ -1725,8 +1979,8 @@ int rshutdown(int socket, int how) if ((rs->state & rs_connected) && rs->ctrl_avail) { rs->ctrl_avail--; - ret = rs_post_write(rs, NULL, 0, - rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); + ret = rs_post_write_msg(rs, NULL, 0, + rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); } } @@ -1814,6 +2068,8 @@ int rsetsockopt(int socket, int level, int optname, case SO_SNDBUF: if (!rs->sbuf) rs->sbuf_size = (*(uint32_t *) optval) << 1; + if (rs->sbuf_size < RS_SNDLOWAT) + rs->sbuf_size = RS_SNDLOWAT << 1; ret = 0; break; case SO_LINGER: @@ -1878,6 +2134,10 @@ int rsetsockopt(int socket, int level, int optname, if (rs->sq_inline < RS_MIN_INLINE) rs->sq_inline = RS_MIN_INLINE; break; + case RDMA_IOMAPSIZE: + rs->target_iomap_size = (uint16_t) rs_scale_to_value( + (uint8_t) rs_value_to_scale(*(int *) optval, 8), 8); + break; default: break; } @@ -1979,6 +2239,10 @@ int rgetsockopt(int socket, int level, int optname, *((int *) optval) = rs->sq_inline; *optlen = sizeof(int); break; + case RDMA_IOMAPSIZE: + *((int *) optval) = rs->target_iomap_size; + *optlen = sizeof(int); + break; default: ret = ENOTSUP; break; @@ -2020,3 +2284,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ ) va_end(args); return ret; } + +static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs) +{ + int i; + + if (!rs->remote_iomappings) { + rs->remote_iomappings = calloc(rs->remote_iomap.length, + sizeof(*rs->remote_iomappings)); + if (!rs->remote_iomappings) + return NULL; + + for (i = 0; i < rs->remote_iomap.length; i++) + rs->remote_iomappings[i].index = i; + } + + for (i = 0; i < rs->remote_iomap.length; i++) { + if (!rs->remote_iomappings[i].mr) + return &rs->remote_iomappings[i]; + } + return NULL; +} + +/* + * If an offset is given, we map to it. If offset is -1, then we map the + * offset to the address of buf. We do not check for conflicts, which must + * be fixed at some point. + */ +off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offset) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + int access = IBV_ACCESS_LOCAL_WRITE; + + rs = idm_at(&idm, socket); + if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) + return ERR(EINVAL); + + fastlock_acquire(&rs->iomap_lock); + if (prot & PROT_WRITE) { + iomr = rs_get_iomap_mr(rs); + access |= IBV_ACCESS_REMOTE_WRITE; + } else { + iomr = calloc(1, sizeof *iomr); + iomr->index = -1; + } + if (!iomr) { + offset = ERR(ENOMEM); + goto out; + } + + iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access); + if (!iomr->mr) { + if (iomr->index < 0) + free(iomr); + offset = -1; + goto out; + } + + if (offset == -1) + offset = (uintptr_t) buf; + iomr->offset = offset; + atomic_init(&iomr->refcnt); + atomic_set(&iomr->refcnt, 1); + + if (iomr->index >= 0) { + dlist_insert_tail(&iomr->entry, &rs->iomap_queue); + rs->iomap_pending = 1; + } else { + dlist_insert_tail(&iomr->entry, &rs->iomap_list); + } +out: + fastlock_release(&rs->iomap_lock); + return offset; +} + +int riounmap(int socket, void *buf, size_t len) +{ + struct rsocket *rs; + struct rs_iomap_mr *iomr; + dlist_entry *entry; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->iomap_lock); + + for (entry = rs->iomap_list.next; entry != &rs->iomap_list; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + + for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue; + entry = entry->next) { + iomr = container_of(entry, struct rs_iomap_mr, entry); + if (iomr->mr->addr == buf && iomr->mr->length == len) { + rs_release_iomap_mr(iomr); + goto out; + } + } + ret = ERR(EINVAL); +out: + fastlock_release(&rs->iomap_lock); + return ret; +} + +static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset) +{ + int i; + + for (i = 0; i < rs->target_iomap_size; i++) { + if (offset >= rs->target_iomap[i].offset && + offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length) + return &rs->target_iomap[i]; + } + return NULL; +} + +size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int flags) +{ + struct rsocket *rs; + struct rs_iomap *iom = NULL; + struct ibv_sge sge; + size_t left = count; + uint32_t xfer_size, olen = RS_OLAP_START_SIZE; + int ret = 0; + + rs = idm_at(&idm, socket); + fastlock_acquire(&rs->slock); + if (rs->iomap_pending) { + ret = rs_send_iomaps(rs, flags); + if (ret) + goto out; + } + for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) { + if (!iom || offset > iom->offset + iom->sge.length) { + iom = rs_find_iomap(rs, offset); + if (!iom) + break; + } + + if (!rs_can_send(rs)) { + ret = rs_get_comp(rs, rs_nonblocking(rs, flags), + rs_conn_can_send); + if (ret) + break; + if (!(rs->state & rs_connect_wr)) { + ret = ERR(ECONNRESET); + break; + } + } + + if (olen < left) { + xfer_size = olen; + if (olen < RS_MAX_TRANSFER) + olen <<= 1; + } else { + xfer_size = left; + } + + if (xfer_size > rs->sbuf_bytes_avail) + xfer_size = rs->sbuf_bytes_avail; + if (xfer_size > iom->offset + iom->sge.length - offset) + xfer_size = iom->offset + iom->sge.length - offset; + + if (xfer_size <= rs->sq_inline) { + sge.addr = (uintptr_t) buf; + sge.length = xfer_size; + sge.lkey = 0; + ret = rs_write_direct(rs, iom, offset, &sge, 1, + xfer_size, IBV_SEND_INLINE); + } else if (xfer_size <= rs_sbuf_left(rs)) { + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); + rs->ssgl[0].length = xfer_size; + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0); + if (xfer_size < rs_sbuf_left(rs)) + rs->ssgl[0].addr += xfer_size; + else + rs->ssgl[0].addr = (uintptr_t) rs->sbuf; + } else { + rs->ssgl[0].length = rs_sbuf_left(rs); + memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, + rs->ssgl[0].length); + rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; + memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); + ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0); + rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; + } + if (ret) + break; + } +out: + fastlock_release(&rs->slock); + + return (ret && left == count) ? ret : count - left; +}