Bottom: 1fa07c62817ac4b6cb8d9c5e327ea2cdc75dbd21
-Top: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640
+Top: 118b72bdf8de5351caaf3d746984f9e4401d4253
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-11-09 10:26:38 -0800
{
errno = err;
diff --git a/src/rsocket.c b/src/rsocket.c
-index a060f66..aca705b 100644
+index a060f66..8fd901f 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -47,6 +47,8 @@
+static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
+{
+ if (addr->sa.sa_family == AF_INET) {
-+ PRINTADDR(addr);
++// PRINTADDR(addr);
+ hdr->version = 4;
+ hdr->length = DS_IPV4_HDR_LEN;
+ hdr->port = addr->sin.sin_port;
+ struct epoll_event event;
+ int i, ret;
+
-+ PRINTADDR(src_addr);
++// PRINTADDR(src_addr);
+ qp = calloc(1, sizeof(*qp));
+ if (!qp)
+ return ERR(ENOMEM);
+ struct ds_dest **tdest, *new_dest;
+ int ret = 0;
+
-+ PRINTADDR(addr);
++// PRINTADDR(addr);
+ fastlock_acquire(&rs->map_lock);
+ tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
+ if (tdest)
+ }
+
+ fastlock_acquire(&rs->slock);
-+ PRINTADDR(addr);
++// PRINTADDR(addr);
+ ret = connect(rs->udp_sock, addr, addrlen);
+ if (!ret)
+ ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
}
break;
case RS_OP_WRITE:
-@@ -1137,42 +1843,214 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+@@ -1137,42 +1843,215 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
fastlock_acquire(&rs->cq_lock);
do {
+ struct ds_header *hdr;
+
+ hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id));
-+ return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) &&
++ return ((wc->byte_len >= sizeof(struct ibv_grh) + DS_IPV4_HDR_LEN) &&
+ ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
+ (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
+}
+ if (ds_wr_is_recv(wc.wr_id)) {
+ if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
+ ds_valid_recv(qp, &wc)) {
++ printf("%s recv over QP\n", __func__);
+ rs->rqe_avail--;
+ rmsg = &rs->dmsg[rs->rmsg_tail];
+ rmsg->qp = qp;
+ } else {
+ smsg = (struct ds_smsg *)
+ (rs->sbuf + ds_wr_offset(wc.wr_id));
-+ printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
++// printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
+ smsg->next = rs->smsg_free;
+ rs->smsg_free = smsg;
+ rs->sqe_avail++;
if (!ret || nonblock || errno != EWOULDBLOCK)
return ret;
-@@ -1184,7 +2062,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
+@@ -1184,7 +2063,7 @@ static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsoc
(e.tv_usec - s.tv_usec) + 1;
} while (poll_time <= polling_time);
return ret;
}
-@@ -1219,9 +2097,19 @@ static int rs_can_send(struct rsocket *rs)
+@@ -1219,9 +2098,19 @@ static int rs_can_send(struct rsocket *rs)
(rs->target_sgl[rs->target_sge].length != 0);
}
}
static int rs_conn_can_send_ctrl(struct rsocket *rs)
-@@ -1236,7 +2124,7 @@ static int rs_have_rdata(struct rsocket *rs)
+@@ -1236,7 +2125,7 @@ static int rs_have_rdata(struct rsocket *rs)
static int rs_conn_have_rdata(struct rsocket *rs)
{
}
static int rs_conn_all_sends_done(struct rsocket *rs)
-@@ -1245,6 +2133,73 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
+@@ -1245,6 +2134,73 @@ static int rs_conn_all_sends_done(struct rsocket *rs)
!(rs->state & rs_connected);
}
+ struct ds_header *hdr;
+ int ret;
+
-+ printf("%s \n", __func__);
++// printf("%s \n", __func__);
+ if (!(rs->state & rs_readable))
+ return ERR(EINVAL);
+
+ if (!rs_have_rdata(rs)) {
-+ printf("%s need rdata \n", __func__);
++// printf("%s need rdata \n", __func__);
+ ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_have_rdata);
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ if (ret)
+ return ret;
+ }
+ if (addrlen)
+{
+ ds_set_src(src_addr, addrlen, hdr);
-+PRINTADDR(src_addr);
++//PRINTADDR(src_addr);
+}
+
+ if (!(flags & MSG_PEEK)) {
+ rs->rmsg_head = 0;
+ }
+
-+ printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
++// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
+ return len;
+}
+
static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len)
{
size_t left = len;
-@@ -1290,6 +2245,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1290,6 +2246,13 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
int ret;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1339,7 +2301,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1339,7 +2302,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
rs->rbuf_bytes_avail += rsize;
}
fastlock_release(&rs->rlock);
return ret ? ret : len - left;
-@@ -1348,8 +2310,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1348,8 +2311,17 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
ssize_t rrecvfrom(int socket, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ret = rrecv(socket, buf, len, flags);
if (ret > 0 && src_addr)
rgetpeername(socket, src_addr, addrlen);
-@@ -1391,14 +2362,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1391,14 +2363,14 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
struct rs_iomap iom;
int ret;
ret = ERR(ECONNRESET);
break;
}
-@@ -1447,10 +2418,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
+@@ -1447,10 +2419,99 @@ static int rs_send_iomaps(struct rsocket *rs, int flags)
}
rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ msg.msg_iovlen = iovcnt + 1;
+// printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
+ ret = sendmsg(rs->udp_sock, &msg, flags);
-+ return ret > 0 ? ret - sizeof hdr : ret;
++ return ret > 0 ? ret - hdr.length : ret;
+}
+
+static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
-@@ -1464,6 +2524,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1464,6 +2525,13 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
int ret = 0;
rs = idm_at(&idm, socket);
if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1485,7 +2552,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1485,7 +2553,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1538,10 +2605,36 @@ out:
+@@ -1538,10 +2606,36 @@ out:
ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
+ int ret;
- return rsend(socket, buf, len, flags);
-+ PRINTADDR(dest_addr);
-+ printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
++// PRINTADDR(dest_addr);
++// printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
+ rs = idm_at(&idm, socket);
+ if (rs->type == SOCK_STREAM) {
+ if (dest_addr || addrlen)
}
static void rs_copy_iov(void *dst, const struct iovec **iov, size_t *offset, size_t len)
-@@ -1600,7 +2693,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1600,7 +2694,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -1653,7 +2746,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
+@@ -1653,7 +2747,7 @@ ssize_t rsendmsg(int socket, const struct msghdr *msg, int flags)
if (msg->msg_control && msg->msg_controllen)
return ERR(ENOTSUP);
}
ssize_t rwrite(int socket, const void *buf, size_t count)
-@@ -1690,8 +2783,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1690,8 +2784,8 @@ static int rs_poll_rs(struct rsocket *rs, int events,
int ret;
check_cq:
rs_process_cq(rs, nonblock, test);
revents = 0;
-@@ -1707,6 +2800,16 @@ check_cq:
+@@ -1707,6 +2801,16 @@ check_cq:
}
return revents;
}
if (rs->state == rs_listening) {
-@@ -1766,11 +2869,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1766,11 +2870,14 @@ static int rs_poll_arm(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
if (fds[i].revents)
return 1;
rfds[i].events = POLLIN;
} else {
rfds[i].fd = fds[i].fd;
-@@ -1793,7 +2899,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
+@@ -1793,7 +2900,10 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
-@@ -1949,7 +3058,7 @@ int rshutdown(int socket, int how)
+@@ -1949,7 +3059,7 @@ int rshutdown(int socket, int how)
rs = idm_at(&idm, socket);
if (how == SHUT_RD) {
return 0;
}
-@@ -1959,10 +3068,10 @@ int rshutdown(int socket, int how)
+@@ -1959,10 +3069,10 @@ int rshutdown(int socket, int how)
if (rs->state & rs_connected) {
if (how == SHUT_RDWR) {
ctrl = RS_CTRL_DISCONNECT;
RS_CTRL_SHUTDOWN : RS_CTRL_DISCONNECT;
}
if (!rs->ctrl_avail) {
-@@ -1987,13 +3096,29 @@ int rshutdown(int socket, int how)
+@@ -1987,13 +3097,29 @@ int rshutdown(int socket, int how)
return 0;
}
rs_free(rs);
return 0;
-@@ -2018,8 +3143,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2018,8 +3144,12 @@ int rgetpeername(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
-@@ -2027,8 +3156,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
+@@ -2027,8 +3157,12 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t *addrlen)
struct rsocket *rs;
rs = idm_at(&idm, socket);
}
int rsetsockopt(int socket, int level, int optname,
-@@ -2040,22 +3173,31 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2040,22 +3174,31 @@ int rsetsockopt(int socket, int level, int optname,
ret = ERR(ENOTSUP);
rs = idm_at(&idm, socket);
rs->rbuf_size = (*(uint32_t *) optval) << 1;
ret = 0;
break;
-@@ -2101,9 +3243,11 @@ int rsetsockopt(int socket, int level, int optname,
+@@ -2101,9 +3244,11 @@ int rsetsockopt(int socket, int level, int optname,
opts = &rs->ipv6_opts;
switch (optname) {
case IPV6_V6ONLY:
opt_on = *(int *) optval;
break;
default:
-@@ -2315,7 +3459,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2315,7 +3460,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
return ERR(EINVAL);
if (prot & PROT_WRITE) {
iomr = rs_get_iomap_mr(rs);
access |= IBV_ACCESS_REMOTE_WRITE;
-@@ -2349,7 +3493,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
+@@ -2349,7 +3494,7 @@ off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t offse
dlist_insert_tail(&iomr->entry, &rs->iomap_list);
}
out:
return offset;
}
-@@ -2361,7 +3505,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2361,7 +3506,7 @@ int riounmap(int socket, void *buf, size_t len)
int ret = 0;
rs = idm_at(&idm, socket);
for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
entry = entry->next) {
-@@ -2382,7 +3526,7 @@ int riounmap(int socket, void *buf, size_t len)
+@@ -2382,7 +3527,7 @@ int riounmap(int socket, void *buf, size_t len)
}
ret = ERR(EINVAL);
out:
return ret;
}
-@@ -2426,7 +3570,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
+@@ -2426,7 +3571,7 @@ size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int fla
rs_conn_can_send);
if (ret)
break;
ret = ERR(ECONNRESET);
break;
}
-@@ -2476,3 +3620,278 @@ out:
+@@ -2476,3 +3621,278 @@ out:
return (ret && left == count) ? ret : count - left;
}
+++ /dev/null
-Bottom: 232d6a57cc2f2d81d4457edeeef4cb9e418b9640
-Top: 118b72bdf8de5351caaf3d746984f9e4401d4253
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-12-15 00:40:02 -0800
-
-Refresh of dsocket
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index aca705b..8fd901f 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -1349,7 +1349,7 @@ out:
- static void ds_format_hdr(struct ds_header *hdr, union socket_addr *addr)
- {
- if (addr->sa.sa_family == AF_INET) {
-- PRINTADDR(addr);
-+// PRINTADDR(addr);
- hdr->version = 4;
- hdr->length = DS_IPV4_HDR_LEN;
- hdr->port = addr->sin.sin_port;
-@@ -1400,7 +1400,7 @@ static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
- struct epoll_event event;
- int i, ret;
-
-- PRINTADDR(src_addr);
-+// PRINTADDR(src_addr);
- qp = calloc(1, sizeof(*qp));
- if (!qp)
- return ERR(ENOMEM);
-@@ -1489,7 +1489,7 @@ static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr,
- struct ds_dest **tdest, *new_dest;
- int ret = 0;
-
-- PRINTADDR(addr);
-+// PRINTADDR(addr);
- fastlock_acquire(&rs->map_lock);
- tdest = tfind(addr, &rs->dest_map, ds_compare_addr);
- if (tdest)
-@@ -1541,7 +1541,7 @@ int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
- }
-
- fastlock_acquire(&rs->slock);
-- PRINTADDR(addr);
-+// PRINTADDR(addr);
- ret = connect(rs->udp_sock, addr, addrlen);
- if (!ret)
- ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest);
-@@ -1899,7 +1899,7 @@ static int ds_valid_recv(struct ds_qp *qp, struct ibv_wc *wc)
- struct ds_header *hdr;
-
- hdr = (struct ds_header *) (qp->rbuf + ds_wr_offset(wc->wr_id));
-- return ((wc->byte_len >= sizeof(struct ibv_grh) + sizeof(*hdr)) &&
-+ return ((wc->byte_len >= sizeof(struct ibv_grh) + DS_IPV4_HDR_LEN) &&
- ((hdr->version == 4 && hdr->length == DS_IPV4_HDR_LEN) ||
- (hdr->version == 6 && hdr->length == DS_IPV6_HDR_LEN)));
- }
-@@ -1934,6 +1934,7 @@ static void ds_poll_cqs(struct rsocket *rs)
- if (ds_wr_is_recv(wc.wr_id)) {
- if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS &&
- ds_valid_recv(qp, &wc)) {
-+ printf("%s recv over QP\n", __func__);
- rs->rqe_avail--;
- rmsg = &rs->dmsg[rs->rmsg_tail];
- rmsg->qp = qp;
-@@ -1948,7 +1949,7 @@ static void ds_poll_cqs(struct rsocket *rs)
- } else {
- smsg = (struct ds_smsg *)
- (rs->sbuf + ds_wr_offset(wc.wr_id));
-- printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
-+// printf("%s send smsg %p free %p\n", __func__, smsg, rs->smsg_free);
- smsg->next = rs->smsg_free;
- rs->smsg_free = smsg;
- rs->sqe_avail++;
-@@ -2165,15 +2166,15 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- struct ds_header *hdr;
- int ret;
-
-- printf("%s \n", __func__);
-+// printf("%s \n", __func__);
- if (!(rs->state & rs_readable))
- return ERR(EINVAL);
-
- if (!rs_have_rdata(rs)) {
-- printf("%s need rdata \n", __func__);
-+// printf("%s need rdata \n", __func__);
- ret = ds_get_comp(rs, rs_nonblocking(rs, flags),
- rs_have_rdata);
-- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- if (ret)
- return ret;
- }
-@@ -2187,7 +2188,7 @@ static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags,
- if (addrlen)
- {
- ds_set_src(src_addr, addrlen, hdr);
--PRINTADDR(src_addr);
-+//PRINTADDR(src_addr);
- }
-
- if (!(flags & MSG_PEEK)) {
-@@ -2196,7 +2197,7 @@ PRINTADDR(src_addr);
- rs->rmsg_head = 0;
- }
-
-- printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
-+// printf("%s ret %d errno %s\n", __func__, ret, strerror(errno));
- return len;
- }
-
-@@ -2459,7 +2460,7 @@ static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov,
- msg.msg_iovlen = iovcnt + 1;
- // printf("%s iov cnt %d\n", __func__, msg.msg_iovlen);
- ret = sendmsg(rs->udp_sock, &msg, flags);
-- return ret > 0 ? ret - sizeof hdr : ret;
-+ return ret > 0 ? ret - hdr.length : ret;
- }
-
- static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len,
-@@ -2608,8 +2609,8 @@ ssize_t rsendto(int socket, const void *buf, size_t len, int flags,
- struct rsocket *rs;
- int ret;
-
-- PRINTADDR(dest_addr);
-- printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
-+// PRINTADDR(dest_addr);
-+// printf("%s sendto data 0x%x\n", __func__, *((uint32_t*)buf));
- rs = idm_at(&idm, socket);
- if (rs->type == SOCK_STREAM) {
- if (dest_addr || addrlen)