Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: 085131bec099e61aa57e12da9977ca49a5ea1a1e
+Top: f8258dbc93f14acdb67621e51c1696fca92c841a
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..a5a01ba 100644
+index a060f66..4631b1d 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
- void *target_buffer_list;
- volatile struct rs_sge *target_sgl;
- struct rs_iomap *target_iomap;
-+#define DS_UDP_TAG 0x55555555
-
+-
- uint32_t rbuf_size;
- struct ibv_mr *rmr;
- uint8_t *rbuf;
--
++#define DS_UDP_TAG 0x55555555
+
- uint32_t sbuf_size;
- struct ibv_mr *smr;
- struct ibv_sge ssgl[2];
+ ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP);
+ if (ret)
+ goto err;
-
-- ret = rs_insert(rs);
++
+ rs->cm_id->route.addr.src_addr.sa_family = domain;
+ index = rs->cm_id->channel->fd;
+ } else {
+ ret = ds_init(rs, domain);
+ if (ret)
+ goto err;
-+
+
+- ret = rs_insert(rs);
+ index = rs->udp_sock;
+ }
+
}
break;
case RS_OP_WRITE:
-@@ -1137,42 +1844,215 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
-
- fastlock_acquire(&rs->cq_lock);
- do {
-- rs_update_credits(rs);
-- ret = rs_poll_cq(rs);
+@@ -1133,46 +1840,230 @@ static int rs_get_cq_event(struct rsocket *rs)
+ */
+ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+- int ret;
++ int ret;
++
++ fastlock_acquire(&rs->cq_lock);
++ do {
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+ if (test(rs)) {
+ return ret;
+}
+
++static int rs_have_rdata(struct rsocket *rs);
++static int ds_can_send(struct rsocket *rs);
++
+static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+{
+ int ret = 0;
+
-+ fastlock_acquire(&rs->cq_lock);
-+ do {
++ if (test == rs_have_rdata)
++ printf("%s test rs_have_rdata\n", __func__);
++ else if (test == ds_can_send)
++ printf("%s test ds_can_send\n", __func__);
++ else
++ printf("%s test ?\n", __func__);
+
+ fastlock_acquire(&rs->cq_lock);
+ do {
+- rs_update_credits(rs);
+- ret = rs_poll_cq(rs);
+ ds_poll_cqs(rs);
if (test(rs)) {
-+// printf("%s test succeeded\n", __func__);
++ printf("%s test succeeded\n", __func__);
ret = 0;
break;
- } else if (ret) {
- break;
} else if (nonblock) {
ret = ERR(EWOULDBLOCK);
-+// printf("%s nonblocking \n", __func__);
++ printf("%s nonblocking \n", __func__);
} else if (!rs->cq_armed) {
- ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
-+// printf("%s req notify \n", __func__);
++ printf("%s req notify \n", __func__);
+ ds_req_notify_cqs(rs);
rs->cq_armed = 1;
} else {
fastlock_release(&rs->cq_lock);
- ret = rs_get_cq_event(rs);
++ printf("%s wait for event \n", __func__);
+ ret = ds_get_cq_event(rs);
-+// printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
++ printf("%s get event ret %d %s\n", __func__, ret, strerror(errno));
fastlock_release(&rs->cq_wait_lock);
fastlock_acquire(&rs->cq_lock);
}
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2064,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2075,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2099,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2110,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2126,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2137,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2135,73 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2146,74 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ ds_post_recv(rs, rmsg->qp, rmsg->offset);
+ if (++rs->rmsg_head == rs->rq_size + 1)
+ rs->rmsg_head = 0;
++ rs->rqe_avail++;
+ }
+
+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2247,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2259,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2303,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2315,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2312,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2324,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2364,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2376,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2420,100 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2432,102 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ return ds_send_udp(rs, buf, len, flags, RS_OP_DATA);
+
+ if (!ds_can_send(rs)) {
++ printf("can't send\n");
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send);
++ printf("ds_get_comp %d\n", ret);
+ if (ret)
+ return ret;
+ }
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2527,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2541,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2555,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2569,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2608,51 @@ out:
+@@ -1538,10 +2622,51 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2711,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2725,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2764,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2778,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2801,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2815,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2818,16 @@ check_cq:
+@@ -1707,6 +2832,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2887,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2901,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2917,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2931,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3076,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3090,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3086,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3100,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3114,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3128,29 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2018,8 +3161,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3175,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3174,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3188,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,22 +3191,31 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,22 +3205,31 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
rs->rbuf_size = (*(uint32_t *) optval) << 1;
ret = 0;
break;
-@@ -2101,9 +3261,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3275,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3477,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3491,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3511,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3525,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3523,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3537,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3544,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3558,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3588,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3602,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3638,278 @@ out:
+@@ -2476,3 +3652,278 @@ out:
return (ret && left == count) ? ret : count - left;
}