-Bottom: 92d2aab8615c3d1003fee963587c4078b732e465
-Top: bd15dfc9adaf449efbd6fcccaac92ea3ed7ad81b
+Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
+Top: 6800649e940a6a3669cefc3c017533f9a40e63d0
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
+rsocket QP.
\ No newline at end of file
diff --git a/src/cma.c b/src/cma.c
-index 91bf108..2c6b032 100755
+index 388be61..49b88a0 100755
--- a/src/cma.c
+++ b/src/cma.c
-@@ -2237,9 +2237,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
+@@ -2232,9 +2232,18 @@ void rdma_destroy_ep(struct rdma_cm_id *id)
int ucma_max_qpsize(struct rdma_cm_id *id)
{
struct cma_id_private *id_priv;
uint16_t ucma_get_port(struct sockaddr *addr)
diff --git a/src/rsocket.c b/src/rsocket.c
-index 58fcb8e..07cf31d 100644
+index a060f66..f62fe90 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -46,6 +46,7 @@
+@@ -47,6 +47,7 @@
#include <string.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
-@@ -55,7 +56,7 @@
+@@ -56,7 +57,7 @@
#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
-@@ -63,6 +64,26 @@
+@@ -64,6 +65,26 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
-@@ -99,6 +120,14 @@ enum {
+@@ -100,6 +121,14 @@ enum {
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
#define rs_msg_op(imm_data) (imm_data >> 29)
#define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
enum {
RS_CTRL_DISCONNECT,
-@@ -110,6 +139,18 @@ struct rs_msg {
+@@ -111,6 +140,18 @@ struct rs_msg {
uint32_t data;
};
struct rs_sge {
uint64_t addr;
uint32_t key;
-@@ -144,8 +185,6 @@ struct rs_conn_data {
+@@ -145,8 +186,6 @@ struct rs_conn_data {
struct rs_sge data_buf;
};
/*
* rsocket states are ordered as passive, connecting, connected, disconnected.
*/
-@@ -159,9 +198,9 @@ enum rs_state {
+@@ -160,9 +199,9 @@ enum rs_state {
rs_connecting = rs_opening | 0x0040,
rs_accepting = rs_opening | 0x0080,
rs_connected = 0x0100,
rs_connect_error = 0x0800,
rs_disconnected = 0x1000,
rs_error = 0x2000,
-@@ -169,68 +208,349 @@ enum rs_state {
+@@ -170,68 +209,349 @@ enum rs_state {
#define RS_OPT_SWAP_SGL 1
static int rs_value_to_scale(int value, int bits)
{
return value <= (1 << (bits - 1)) ?
-@@ -306,10 +626,10 @@ out:
+@@ -307,10 +627,10 @@ out:
pthread_mutex_unlock(&mut);
}
pthread_mutex_unlock(&mut);
return rs->index;
}
-@@ -321,7 +641,7 @@ static void rs_remove(struct rsocket *rs)
+@@ -322,7 +642,7 @@ static void rs_remove(struct rsocket *rs)
pthread_mutex_unlock(&mut);
}
{
struct rsocket *rs;
-@@ -329,7 +649,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -330,7 +650,11 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
if (!rs)
return NULL;
if (inherited_rs) {
rs->sbuf_size = inherited_rs->sbuf_size;
rs->rbuf_size = inherited_rs->rbuf_size;
-@@ -351,7 +675,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -352,7 +676,7 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
dlist_init(&rs->iomap_list);
dlist_init(&rs->iomap_queue);
return rs;
-@@ -359,13 +683,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
+@@ -360,13 +684,27 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
static int rs_set_nonblocking(struct rsocket *rs, long arg)
{
return ret;
}
-@@ -389,17 +727,39 @@ static void rs_set_qp_size(struct rsocket *rs)
+@@ -390,17 +728,39 @@ static void rs_set_qp_size(struct rsocket *rs)
rs->rq_size = 2;
}
rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
if (!rs->smr)
-@@ -409,7 +769,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -410,7 +770,7 @@ static int rs_init_bufs(struct rsocket *rs)
sizeof(*rs->target_iomap) * rs->target_iomap_size;
rs->target_buffer_list = malloc(len);
if (!rs->target_buffer_list)
rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
-@@ -422,7 +782,7 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -423,7 +783,7 @@ static int rs_init_bufs(struct rsocket *rs)
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rs->rbuf_size);
if (!rs->rmr)
-@@ -439,15 +799,32 @@ static int rs_init_bufs(struct rsocket *rs)
+@@ -440,15 +800,32 @@ static int rs_init_bufs(struct rsocket *rs)
return 0;
}
goto err1;
if (rs->fd_flags & O_NONBLOCK) {
-@@ -455,21 +832,20 @@ static int rs_create_cq(struct rsocket *rs)
+@@ -456,21 +833,20 @@ static int rs_create_cq(struct rsocket *rs)
goto err2;
}
{
struct ibv_recv_wr wr, *bad;
-@@ -481,6 +857,23 @@ rs_post_recv(struct rsocket *rs)
+@@ -482,6 +858,23 @@ rs_post_recv(struct rsocket *rs)
return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad));
}
static int rs_create_ep(struct rsocket *rs)
{
struct ibv_qp_init_attr qp_attr;
-@@ -491,7 +884,7 @@ static int rs_create_ep(struct rsocket *rs)
+@@ -492,7 +885,7 @@ static int rs_create_ep(struct rsocket *rs)
if (ret)
return ret;
if (ret)
return ret;
-@@ -548,8 +941,76 @@ static void rs_free_iomappings(struct rsocket *rs)
+@@ -549,8 +942,76 @@ static void rs_free_iomappings(struct rsocket *rs)
}
}
if (rs->index >= 0)
rs_remove(rs);
-@@ -581,7 +1042,7 @@ static void rs_free(struct rsocket *rs)
+@@ -582,7 +1043,7 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
-@@ -635,29 +1096,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
+@@ -636,29 +1097,54 @@ static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn)
rs->sseq_comp = ntohs(conn->credits);
}
return rs->index;
err:
-@@ -671,9 +1157,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
+@@ -672,9 +1158,18 @@ int rbind(int socket, const struct sockaddr *addr, socklen_t addrlen)
int ret;
rs = idm_at(&idm, socket);
return ret;
}
-@@ -709,7 +1204,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -710,7 +1205,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
int ret;
rs = idm_at(&idm, socket);
if (!new_rs)
return ERR(ENOMEM);
-@@ -717,7 +1212,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -718,7 +1213,7 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
if (ret)
goto err;
if (ret < 0)
goto err;
-@@ -825,42 +1320,309 @@ connected:
+@@ -826,42 +1321,309 @@ connected:
break;
}
}
static int rs_post_write_msg(struct rsocket *rs,
-@@ -902,6 +1664,24 @@ static int rs_post_write(struct rsocket *rs,
+@@ -903,6 +1665,24 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
-@@ -1045,7 +1825,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -1046,7 +1826,7 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state = rs_disconnected;
return 0;
} else if (rs_msg_data(imm_data) == RS_CTRL_SHUTDOWN) {
}
break;
case RS_OP_WRITE:
-@@ -1187,6 +1967,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1188,6 +1968,165 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
return ret;
}
static int rs_nonblocking(struct rsocket *rs, int flags)
{
return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT);
-@@ -1218,9 +2157,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2158,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1235,7 +2184,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2185,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1244,6 +2193,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2194,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1289,6 +2298,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2299,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1338,7 +2354,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2355,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1349,6 +2365,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
+@@ -1350,6 +2366,14 @@ ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
{
int ret;
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1390,14 +2414,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2415,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1446,10 +2470,81 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2471,81 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1463,6 +2558,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2559,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1484,7 +2586,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2587,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1537,10 +2639,26 @@ out:
+@@ -1538,10 +2640,26 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1599,7 +2717,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2718,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1652,7 +2770,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2771,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1689,8 +2807,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2808,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1706,6 +2824,16 @@ check_cq:
+@@ -1707,6 +2825,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1765,11 +2893,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2894,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1792,7 +2923,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2924,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1948,7 +3082,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3083,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1958,10 +3092,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3093,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1986,13 +3120,31 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3121,31 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2017,8 +3169,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3170,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2026,8 +3182,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3183,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2039,18 +3199,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,18 +3200,26 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
-@@ -2100,9 +3268,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3269,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2314,7 +3484,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3485,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2348,7 +3518,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3519,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2360,7 +3530,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3531,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2381,7 +3551,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3552,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2425,7 +3595,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3596,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;