Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: 0a9005e21ff77f266c98c49bb8473c3d295a6c35
+Top: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..7dc5409 100644
+index a060f66..c61d689 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
+
+ pthread_mutex_lock(&mut);
+ if (!svc_cnt) {
-+ ret = socketpair(AF_INET, SOCK_STREAM, 0, svc_sock);
++ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
+ if (ret)
+ goto err1;
+
+ msg.rs = rs;
+ write(svc_sock[0], &msg, sizeof msg);
+ read(svc_sock[0], &msg, sizeof msg);
-+ ret = ERR(msg.status);
++ ret = rdma_seterrno(msg.status);
+ if (ret && !svc_cnt)
+ goto err3;
+
-static int rs_create_cq(struct rsocket *rs)
+static int ds_init_bufs(struct ds_qp *qp)
-+{
+ {
+- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
+- if (!rs->cm_id->recv_cq_channel)
+ qp->rbuf = calloc(qp->rs->rbuf_size, sizeof(*qp->rbuf));
+ if (!qp->rbuf)
+ return ERR(ENOMEM);
+
+ qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size);
+ if (!qp->smr)
-+ return -1;
-+
+ return -1;
+
+- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
+- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
+- if (!rs->cm_id->recv_cq)
+ qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size);
+ if (!qp->rmr)
+ return -1;
+}
+
+static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id)
- {
-- rs->cm_id->recv_cq_channel = ibv_create_comp_channel(rs->cm_id->verbs);
-- if (!rs->cm_id->recv_cq_channel)
++{
+ cm_id->recv_cq_channel = ibv_create_comp_channel(cm_id->verbs);
+ if (!cm_id->recv_cq_channel)
- return -1;
-
-- rs->cm_id->recv_cq = ibv_create_cq(rs->cm_id->verbs, rs->sq_size + rs->rq_size,
-- rs->cm_id, rs->cm_id->recv_cq_channel, 0);
-- if (!rs->cm_id->recv_cq)
++ return -1;
++
+ cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size,
+ cm_id, cm_id->recv_cq_channel, 0);
+ if (!cm_id->recv_cq)
}
break;
case RS_OP_WRITE:
-@@ -1133,46 +1807,208 @@ static int rs_get_cq_event(struct rsocket *rs)
- */
- static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
-- int ret;
-+ int ret;
-+
-+ fastlock_acquire(&rs->cq_lock);
-+ do {
+@@ -1137,42 +1811,213 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+
+ fastlock_acquire(&rs->cq_lock);
+ do {
+- rs_update_credits(rs);
+- ret = rs_poll_cq(rs);
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ if (test(rs)) {
+ struct ibv_wc wc;
+ int ret, cnt;
+
-+ qp = rs->qp_list;
-+ if (!qp)
++ if (!(qp = rs->qp_list))
+ return;
+
+ do {
+{
+ struct ds_qp *qp;
+
-+ qp = rs->qp_list;
++ if (!(qp = rs->qp_list))
++ return;
++
+ do {
+ if (!qp->cq_armed) {
+ ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
+ void *context;
+ int ret;
+
++ printf("%s \n", __func__);
+ if (!rs->cq_armed)
+ return 0;
+
+ ret = epoll_wait(rs->epfd, &event, 1, -1);
++ printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret <= 0)
+ return ret;
+
+ qp = event.data.ptr;
-+ ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
++ ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
++ printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (!ret) {
-+ ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
++ ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
+ qp->cq_armed = 0;
+ rs->cq_armed = 0;
+ }
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
-
- fastlock_acquire(&rs->cq_lock);
- do {
-- rs_update_credits(rs);
-- ret = rs_poll_cq(rs);
++
++ fastlock_acquire(&rs->cq_lock);
++ do {
+ ds_poll_cqs(rs);
if (test(rs)) {
++ printf("%s test succeeded\n", __func__);
ret = 0;
break;
- } else if (ret) {
- break;
} else if (nonblock) {
ret = ERR(EWOULDBLOCK);
++ printf("%s nonblocking \n", __func__);
} else if (!rs->cq_armed) {
- ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
++ printf("%s req notify \n", __func__);
+ ds_req_notify_cqs(rs);
rs->cq_armed = 1;
} else {
- rs_update_credits(rs);
+- rs_update_credits(rs);
fastlock_acquire(&rs->cq_wait_lock);
fastlock_release(&rs->cq_lock);
- ret = rs_get_cq_event(rs);
+ ret = ds_get_cq_event(rs);
++ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->cq_wait_lock);
fastlock_acquire(&rs->cq_lock);
}
- rs_update_credits(rs);
fastlock_release(&rs->cq_lock);
++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
return ret;
}
do {
- ret = rs_process_cq(rs, 1, test);
+ ret = ds_process_cqs(rs, 1, test);
++ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2020,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2029,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2055,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2064,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2082,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2091,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2091,66 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2100,70 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ struct ds_header *hdr;
+ int ret;
+
++ret = 0;
++ printf("%s \n", __func__);
+ if (!(rs->state & rs_readable))
+ return ERR(EINVAL);
+
+ if (!rs_have_rdata(rs)) {
++ printf("%s need rdata \n", __func__);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_have_rdata);
++ printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ return ret;
+ }
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2196,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2209,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2252,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2265,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2261,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2274,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2313,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2326,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2369,90 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2382,90 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2466,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2479,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2494,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2507,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2547,27 @@ out:
+@@ -1538,10 +2560,27 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2626,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2639,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2679,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2692,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2716,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2729,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2733,16 @@ check_cq:
+@@ -1707,6 +2746,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2802,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2815,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2832,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2845,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +2991,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3004,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3001,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3014,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3029,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3042,29 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2018,8 +3076,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3089,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3089,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3102,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,18 +3106,26 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,18 +3119,26 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
opt_on = *(int *) optval;
break;
case SO_RCVBUF:
-@@ -2101,9 +3175,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3188,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3391,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3404,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3425,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3438,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3437,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3450,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3458,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3471,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3502,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3515,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3552,269 @@ out:
+@@ -2476,3 +3565,269 @@ out:
return (ret && left == count) ? ret : count - left;
}
+++ /dev/null
-Bottom: 0a9005e21ff77f266c98c49bb8473c3d295a6c35
-Top: f1822f3bbe2c9b92b5e2ca8b4e5c3cece427c5ff
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-11 14:00:12 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 7dc5409..c61d689 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -386,7 +386,7 @@ static int rs_add_to_svc(struct rsocket *rs)
-
- pthread_mutex_lock(&mut);
- if (!svc_cnt) {
-- ret = socketpair(AF_INET, SOCK_STREAM, 0, svc_sock);
-+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, svc_sock);
- if (ret)
- goto err1;
-
-@@ -402,7 +402,7 @@ static int rs_add_to_svc(struct rsocket *rs)
- msg.rs = rs;
- write(svc_sock[0], &msg, sizeof msg);
- read(svc_sock[0], &msg, sizeof msg);
-- ret = ERR(msg.status);
-+ ret = rdma_seterrno(msg.status);
- if (ret && !svc_cnt)
- goto err3;
-
-@@ -1885,8 +1885,7 @@ static void ds_poll_cqs(struct rsocket *rs)
- struct ibv_wc wc;
- int ret, cnt;
-
-- qp = rs->qp_list;
-- if (!qp)
-+ if (!(qp = rs->qp_list))
- return;
-
- do {
-@@ -1935,7 +1934,9 @@ static void ds_req_notify_cqs(struct rsocket *rs)
- {
- struct ds_qp *qp;
-
-- qp = rs->qp_list;
-+ if (!(qp = rs->qp_list))
-+ return;
-+
- do {
- if (!qp->cq_armed) {
- ibv_req_notify_cq(qp->cm_id->recv_cq, 0);
-@@ -1953,17 +1954,20 @@ static int ds_get_cq_event(struct rsocket *rs)
- void *context;
- int ret;
-
-+ printf("%s \n", __func__);
- if (!rs->cq_armed)
- return 0;
-
- ret = epoll_wait(rs->epfd, &event, 1, -1);
-+ printf("%s epoll wait ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret <= 0)
- return ret;
-
- qp = event.data.ptr;
-- ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
-+ ret = ibv_get_cq_event(qp->cm_id->recv_cq_channel, &cq, &context);
-+ printf("%s get cq event ret %d errno %s\n", __func__, ret, strerror(errno));
- if (!ret) {
-- ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
-+ ibv_ack_cq_events(qp->cm_id->recv_cq, 1);
- qp->cq_armed = 0;
- rs->cq_armed = 0;
- }
-@@ -1979,25 +1983,29 @@ static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct r
- do {
- ds_poll_cqs(rs);
- if (test(rs)) {
-+ printf("%s test succeeded\n", __func__);
- ret = 0;
- break;
- } else if (nonblock) {
- ret = ERR(EWOULDBLOCK);
-+ printf("%s nonblocking \n", __func__);
- } else if (!rs->cq_armed) {
-+ printf("%s req notify \n", __func__);
- ds_req_notify_cqs(rs);
- rs->cq_armed = 1;
- } else {
-- rs_update_credits(rs);
- fastlock_acquire(&rs->cq_wait_lock);
- fastlock_release(&rs->cq_lock);
-
- ret = ds_get_cq_event(rs);
-+ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
- fastlock_release(&rs->cq_wait_lock);
- fastlock_acquire(&rs->cq_lock);
- }
- } while (!ret);
-
- fastlock_release(&rs->cq_lock);
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return ret;
- }
-
-@@ -2009,6 +2017,7 @@ static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
-
- do {
- ret = ds_process_cqs(rs, 1, test);
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (!ret || nonblock || errno != EWOULDBLOCK)
- return ret;
-
-@@ -2123,12 +2132,16 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- struct ds_header *hdr;
- int ret;
-
-+ret = 0;
-+ printf("%s \n", __func__);
- if (!(rs->state & rs_readable))
- return ERR(EINVAL);
-
- if (!rs_have_rdata(rs)) {
-+ printf("%s need rdata \n", __func__);
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
- rs_have_rdata);
-+ printf("%s ds_get_comp ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret)
- return ret;
- }