+++ /dev/null
-Bottom: 2ddb5fbcef7548a1d1c87870ef237d187fba4f30
-Top: 628bde79ad910c1eec4036abc1f102ef7b304bc8
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-06-26 16:39:39 -0700
-
-Refresh of shut_wr
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 5dfec0b..8e8a9d9 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -96,7 +96,8 @@ enum {
- #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
-
- enum {
-- RS_CTRL_DISCONNECT
-+ RS_CTRL_DISCONNECT,
-+ RS_CTRL_SHUTDOWN
- };
-
- struct rs_msg {
-@@ -131,6 +132,9 @@ union rs_wr_id {
- };
- };
-
-+/*
-+ * rsocket states are ordered as passive, connecting, connected, disconnected.
-+ */
- enum rs_state {
- rs_init,
- rs_bound = 0x0001,
-@@ -143,9 +147,9 @@ enum rs_state {
- rs_connected = 0x0100,
- rs_connect_wr = rs_connected | 0x0200,
- rs_connect_rd = rs_connected | 0x0400,
-- rs_disconnected = 0x0800,
-- rs_error = 0x1000,
-- rs_connect_error = rs_error | 0x2000
-+ rs_connect_error = 0x0800,
-+ rs_disconnected = 0x1000,
-+ rs_error = 0x2000,
- };
-
- #define RS_OPT_SWAP_SGL 1
-@@ -701,9 +705,9 @@ do_connect:
- if (!ret)
- goto connected;
- if (errno == EAGAIN || errno == EWOULDBLOCK)
-- rs->state = rs_active_connect;
-+ rs->state = rs_connecting;
- break;
-- case rs_active_connect:
-+ case rs_connecting:
- ret = ucma_complete(rs->cm_id);
- if (ret)
- break;
-@@ -852,7 +856,7 @@ static int rs_give_credits(struct rsocket *rs)
- {
- return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
- ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
-- rs->ctrl_avail && (rs->state == rs_connected);
-+ rs->ctrl_avail && (rs->state & rs_connected);
- }
-
- static void rs_update_credits(struct rsocket *rs)
-@@ -900,14 +904,14 @@ static int rs_poll_cq(struct rsocket *rs)
- } else {
- rs->ctrl_avail++;
- }
-- if (wc.status != IBV_WC_SUCCESS && rs->state == rs_connected) {
-+ if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
- rs->state = rs_error;
- rs->err = EIO;
- }
- }
- }
-
-- if (rs->state != rs_error) {
-+ if (rs->state & rs_connected) {
- while (!ret && rcnt--)
- ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0);
-
-@@ -1043,7 +1047,7 @@ static int rs_can_send(struct rsocket *rs)
-
- static int rs_conn_can_send(struct rsocket *rs)
- {
-- return rs_can_send(rs) || (rs->state != rs_connected);
-+ return rs_can_send(rs) || !(rs->state & rs_connect_wr);
- }
-
- static int rs_can_send_ctrl(struct rsocket *rs)
-@@ -1058,7 +1062,7 @@ static int rs_have_rdata(struct rsocket *rs)
-
- static int rs_conn_have_rdata(struct rsocket *rs)
- {
-- return rs_have_rdata(rs) || (rs->state != rs_connected);
-+ return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
- }
-
- static int rs_all_sends_done(struct rsocket *rs)
-@@ -1111,7 +1115,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
- int ret;
-
- rs = idm_at(&idm, socket);
-- if (rs->state < rs_connected) {
-+ if (rs->state & rs_opening) {
- ret = rs_do_connect(rs);
- if (ret) {
- if (errno == EINPROGRESS)
-@@ -1213,7 +1217,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- int ret = 0;
-
- rs = idm_at(&idm, socket);
-- if (rs->state < rs_connected) {
-+ if (rs->state & rs_opening) {
- ret = rs_do_connect(rs);
- if (ret) {
- if (errno == EINPROGRESS)
-@@ -1229,7 +1233,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
- rs_conn_can_send);
- if (ret)
- break;
-- if (rs->state != rs_connected) {
-+ if (!(rs->state & rs_connect_wr)) {
- ret = ERR(ECONNRESET);
- break;
- }
-@@ -1322,7 +1326,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- int i, ret = 0;
-
- rs = idm_at(&idm, socket);
-- if (rs->state < rs_connected) {
-+ if (rs->state & rs_opening) {
- ret = rs_do_connect(rs);
- if (ret) {
- if (errno == EINPROGRESS)
-@@ -1343,7 +1347,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
- rs_conn_can_send);
- if (ret)
- break;
-- if (rs->state != rs_connected) {
-+ if (!(rs->state & rs_connect_wr)) {
- ret = ERR(ECONNRESET);
- break;
- }
-@@ -1435,17 +1439,35 @@ static int rs_poll_rs(struct rsocket *rs, int events,
- short revents;
- int ret;
-
-- switch (rs->state) {
-- case rs_listening:
-+check_cq:
-+ if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
-+ (rs->state & rs_error)) {
-+ rs_process_cq(rs, nonblock, test);
-+
-+ revents = 0;
-+ if ((events & POLLIN) && rs_have_rdata(rs))
-+ revents |= POLLIN;
-+ if ((events & POLLOUT) && rs_can_send(rs))
-+ revents |= POLLOUT;
-+ if (!(rs->state & rs_connected)) {
-+ if (rs->state == rs_disconnected)
-+ revents |= POLLHUP;
-+ else
-+ revents |= POLLERR;
-+ }
-+
-+ return revents;
-+ }
-+
-+ if (rs->state == rs_listening) {
- fds.fd = rs->cm_id->channel->fd;
- fds.events = events;
- fds.revents = 0;
- poll(&fds, 1, 0);
- return fds.revents;
-- case rs_resolving_addr:
-- case rs_resolving_route:
-- case rs_active_connect:
-- case rs_accepting:
-+ }
-+
-+ if (rs->state & rs_opening) {
- ret = rs_do_connect(rs);
- if (ret) {
- if (errno == EINPROGRESS) {
-@@ -1455,30 +1477,13 @@ static int rs_poll_rs(struct rsocket *rs, int events,
- return POLLOUT;
- }
- }
-- /* fall through */
-- case rs_connected:
-- case rs_disconnected:
-- case rs_error:
-- rs_process_cq(rs, nonblock, test);
--
-- revents = 0;
-- if ((events & POLLIN) && rs_have_rdata(rs))
-- revents |= POLLIN;
-- if ((events & POLLOUT) && rs_can_send(rs))
-- revents |= POLLOUT;
-- if (rs->state > rs_connected) {
-- if (rs->state == rs_error)
-- revents |= POLLERR;
-- else
-- revents |= POLLHUP;
-- }
-+ goto check_cq;
-+ }
-
-- return revents;
-- case rs_connect_error:
-+ if (rs->state == rs_connect_error)
- return (rs->err && events & POLLOUT) ? POLLOUT : 0;
-- default:
-- return 0;
-- }
-+
-+ return 0;
- }
-
- static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
-@@ -1690,18 +1695,27 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds,
- int rshutdown(int socket, int how)
- {
- struct rsocket *rs;
-- int ret = 0;
-+ int ctrl, ret = 0;
-
-- if (how == SHUT_RD)
-+ rs = idm_at(&idm, socket);
-+ if (how == SHUT_RD) {
-+ rs->state &= ~rs_connect_rd;
-+ if (rs->state == rs_connected)
-+ rs->state = rs_disconnected;
- return 0;
-+ }
-
-- rs = idm_at(&idm, socket);
- if (rs->fd_flags & O_NONBLOCK)
- rs_set_nonblocking(rs, 0);
-
-- if (rs->state == rs_connected) {
-- if (how == SHUT_RDWR)
-+ if (rs->state & rs_connected) {
-+ if (how == SHUT_RDWR) {
-+ ctrl = RS_CTRL_DISCONNECT;
- rs->state = rs_disconnected;
-+ } else {
-+ ctrl = RS_CTRL_SHUTDOWN;
-+ rs->state &= ~rs_connect_wr;
-+ }
- if (!rs_can_send_ctrl(rs)) {
- ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
- if (ret)
-@@ -1710,7 +1724,7 @@ int rshutdown(int socket, int how)
-
- rs->ctrl_avail--;
- ret = rs_post_write(rs, 0, NULL, 0,
-- rs_msg_set(RS_OP_CTRL, RS_CTRL_DISCONNECT),
-+ rs_msg_set(RS_OP_CTRL, ctrl),
- 0, 0, 0);
- }
-
-@@ -1728,7 +1742,7 @@ int rclose(int socket)
- struct rsocket *rs;
-
- rs = idm_at(&idm, socket);
-- if (rs->state == rs_connected)
-+ if (rs->state & rs_connected)
- rshutdown(socket, SHUT_RDWR);
-
- rs_free(rs);
-@@ -1839,8 +1853,9 @@ int rsetsockopt(int socket, int level, int optname,
- default:
- break;
- }
-+ break;
- case SOL_RDMA:
-- if (rs->state > rs_listening) {
-+ if (rs->state >= rs_opening) {
- ret = ERR(EINVAL);
- break;
- }
Bottom: 938168c7141b54196d5a134aca1fe7c88dcfb3b5
-Top: 2ddb5fbcef7548a1d1c87870ef237d187fba4f30
+Top: 628bde79ad910c1eec4036abc1f102ef7b304bc8
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-06-25 14:19:54 -0700
---
diff --git a/src/rsocket.c b/src/rsocket.c
-index ed994fe..5dfec0b 100644
+index ed994fe..8e8a9d9 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -131,21 +131,21 @@ union rs_wr_id {
- };
+@@ -96,7 +96,8 @@ enum {
+ #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF)
+
+ enum {
+- RS_CTRL_DISCONNECT
++ RS_CTRL_DISCONNECT,
++ RS_CTRL_SHUTDOWN
};
--/*
-- * rsocket states are ordered as passive, connecting, connected, disconnected.
-- */
+ struct rs_msg {
+@@ -136,16 +137,19 @@ union rs_wr_id {
+ */
enum rs_state {
rs_init,
- rs_bound,
+ rs_connected = 0x0100,
+ rs_connect_wr = rs_connected | 0x0200,
+ rs_connect_rd = rs_connected | 0x0400,
-+ rs_disconnected = 0x0800,
-+ rs_error = 0x1000,
-+ rs_connect_error = rs_error | 0x2000
++ rs_connect_error = 0x0800,
++ rs_disconnected = 0x1000,
++ rs_error = 0x2000,
};
#define RS_OPT_SWAP_SGL 1
-@@ -321,7 +321,7 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
+@@ -321,7 +325,7 @@ static int rs_set_nonblocking(struct rsocket *rs, long arg)
if (rs->cm_id->recv_cq_channel)
ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);
ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
return ret;
-@@ -701,9 +701,9 @@ do_connect:
- if (!ret)
- goto connected;
- if (errno == EAGAIN || errno == EWOULDBLOCK)
-- rs->state = rs_connecting;
-+ rs->state = rs_active_connect;
- break;
-- case rs_connecting:
-+ case rs_active_connect:
- ret = ucma_complete(rs->cm_id);
- if (ret)
- break;
-@@ -907,7 +907,7 @@ static int rs_poll_cq(struct rsocket *rs)
+@@ -852,7 +856,7 @@ static int rs_give_credits(struct rsocket *rs)
+ {
+ return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) ||
+ ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) &&
+- rs->ctrl_avail && (rs->state == rs_connected);
++ rs->ctrl_avail && (rs->state & rs_connected);
+ }
+
+ static void rs_update_credits(struct rsocket *rs)
+@@ -900,14 +904,14 @@ static int rs_poll_cq(struct rsocket *rs)
+ } else {
+ rs->ctrl_avail++;
+ }
+- if (wc.status != IBV_WC_SUCCESS && rs->state == rs_connected) {
++ if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) {
+ rs->state = rs_error;
+ rs->err = EIO;
+ }
}
}
- if (rs->state == rs_connected) {
-+ if (rs->state != rs_error) {
++ if (rs->state & rs_connected) {
while (!ret && rcnt--)
ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0);
-@@ -932,7 +932,7 @@ static int rs_get_cq_event(struct rsocket *rs)
+@@ -932,7 +936,7 @@ static int rs_get_cq_event(struct rsocket *rs)
if (!ret) {
ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
rs->cq_armed = 0;
rs->state = rs_error;
}
-@@ -1444,7 +1444,7 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+@@ -1043,7 +1047,7 @@ static int rs_can_send(struct rsocket *rs)
+
+ static int rs_conn_can_send(struct rsocket *rs)
+ {
+- return rs_can_send(rs) || (rs->state != rs_connected);
++ return rs_can_send(rs) || !(rs->state & rs_connect_wr);
+ }
+
+ static int rs_can_send_ctrl(struct rsocket *rs)
+@@ -1058,7 +1062,7 @@ static int rs_have_rdata(struct rsocket *rs)
+
+ static int rs_conn_have_rdata(struct rsocket *rs)
+ {
+- return rs_have_rdata(rs) || (rs->state != rs_connected);
++ return rs_have_rdata(rs) || !(rs->state & rs_connect_rd);
+ }
+
+ static int rs_all_sends_done(struct rsocket *rs)
+@@ -1111,7 +1115,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+ int ret;
+
+ rs = idm_at(&idm, socket);
+- if (rs->state < rs_connected) {
++ if (rs->state & rs_opening) {
+ ret = rs_do_connect(rs);
+ if (ret) {
+ if (errno == EINPROGRESS)
+@@ -1213,7 +1217,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+- if (rs->state < rs_connected) {
++ if (rs->state & rs_opening) {
+ ret = rs_do_connect(rs);
+ if (ret) {
+ if (errno == EINPROGRESS)
+@@ -1229,7 +1233,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (rs->state != rs_connected) {
++ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+@@ -1322,7 +1326,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ int i, ret = 0;
+
+ rs = idm_at(&idm, socket);
+- if (rs->state < rs_connected) {
++ if (rs->state & rs_opening) {
+ ret = rs_do_connect(rs);
+ if (ret) {
+ if (errno == EINPROGRESS)
+@@ -1343,7 +1347,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+ rs_conn_can_send);
+ if (ret)
+ break;
+- if (rs->state != rs_connected) {
++ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+@@ -1435,17 +1439,35 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+ short revents;
+ int ret;
+
+- switch (rs->state) {
+- case rs_listening:
++check_cq:
++ if ((rs->state & rs_connected) || (rs->state == rs_disconnected) ||
++ (rs->state & rs_error)) {
++ rs_process_cq(rs, nonblock, test);
++
++ revents = 0;
++ if ((events & POLLIN) && rs_have_rdata(rs))
++ revents |= POLLIN;
++ if ((events & POLLOUT) && rs_can_send(rs))
++ revents |= POLLOUT;
++ if (!(rs->state & rs_connected)) {
++ if (rs->state == rs_disconnected)
++ revents |= POLLHUP;
++ else
++ revents |= POLLERR;
++ }
++
++ return revents;
++ }
++
++ if (rs->state == rs_listening) {
+ fds.fd = rs->cm_id->channel->fd;
+ fds.events = events;
+ fds.revents = 0;
+ poll(&fds, 1, 0);
return fds.revents;
- case rs_resolving_addr:
- case rs_resolving_route:
+- case rs_resolving_addr:
+- case rs_resolving_route:
- case rs_connecting:
-+ case rs_active_connect:
- case rs_accepting:
+- case rs_accepting:
++ }
++
++ if (rs->state & rs_opening) {
ret = rs_do_connect(rs);
if (ret) {
-@@ -1466,10 +1466,12 @@ static int rs_poll_rs(struct rsocket *rs, int events,
- revents |= POLLIN;
- if ((events & POLLOUT) && rs_can_send(rs))
- revents |= POLLOUT;
+ if (errno == EINPROGRESS) {
+@@ -1455,28 +1477,13 @@ static int rs_poll_rs(struct rsocket *rs, int events,
+ return POLLOUT;
+ }
+ }
+- /* fall through */
+- case rs_connected:
+- case rs_disconnected:
+- case rs_error:
+- rs_process_cq(rs, nonblock, test);
+-
+- revents = 0;
+- if ((events & POLLIN) && rs_have_rdata(rs))
+- revents |= POLLIN;
+- if ((events & POLLOUT) && rs_can_send(rs))
+- revents |= POLLOUT;
- if (rs->state == rs_disconnected)
- revents |= POLLHUP;
- else if (rs->state == rs_error)
- revents |= POLLERR;
-+ if (rs->state > rs_connected) {
-+ if (rs->state == rs_error)
-+ revents |= POLLERR;
-+ else
-+ revents |= POLLHUP;
-+ }
++ goto check_cq;
++ }
+
+- return revents;
+- case rs_connect_error:
++ if (rs->state == rs_connect_error)
+ return (rs->err && events & POLLOUT) ? POLLOUT : 0;
+- default:
+- return 0;
+- }
++
++ return 0;
+ }
- return revents;
- case rs_connect_error:
-@@ -1690,12 +1692,16 @@ int rshutdown(int socket, int how)
+ static int rs_poll_check(struct pollfd *fds, nfds_t nfds)
+@@ -1688,14 +1695,27 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds,
+ int rshutdown(int socket, int how)
+ {
struct rsocket *rs;
- int ret = 0;
+- int ret = 0;
++ int ctrl, ret = 0;
-+ if (how == SHUT_RD)
+ rs = idm_at(&idm, socket);
++ if (how == SHUT_RD) {
++ rs->state &= ~rs_connect_rd;
++ if (rs->state == rs_connected)
++ rs->state = rs_disconnected;
+ return 0;
++ }
+
- rs = idm_at(&idm, socket);
if (rs->fd_flags & O_NONBLOCK)
rs_set_nonblocking(rs, 0);
- if (rs->state == rs_connected) {
+- if (rs->state == rs_connected) {
- rs->state = rs_disconnected;
-+ if (how == SHUT_RDWR)
++ if (rs->state & rs_connected) {
++ if (how == SHUT_RDWR) {
++ ctrl = RS_CTRL_DISCONNECT;
+ rs->state = rs_disconnected;
++ } else {
++ ctrl = RS_CTRL_SHUTDOWN;
++ rs->state &= ~rs_connect_wr;
++ }
if (!rs_can_send_ctrl(rs)) {
ret = rs_process_cq(rs, 0, rs_can_send_ctrl);
if (ret)
-@@ -1711,6 +1717,9 @@ int rshutdown(int socket, int how)
+@@ -1704,13 +1724,16 @@ int rshutdown(int socket, int how)
+
+ rs->ctrl_avail--;
+ ret = rs_post_write(rs, 0, NULL, 0,
+- rs_msg_set(RS_OP_CTRL, RS_CTRL_DISCONNECT),
++ rs_msg_set(RS_OP_CTRL, ctrl),
+ 0, 0, 0);
+ }
+
if (!rs_all_sends_done(rs) && rs->state != rs_error)
rs_process_cq(rs, 0, rs_all_sends_done);
+
return 0;
}
+
+@@ -1719,7 +1742,7 @@ int rclose(int socket)
+ struct rsocket *rs;
+
+ rs = idm_at(&idm, socket);
+- if (rs->state == rs_connected)
++ if (rs->state & rs_connected)
+ rshutdown(socket, SHUT_RDWR);
+
+ rs_free(rs);
+@@ -1830,8 +1853,9 @@ int rsetsockopt(int socket, int level, int optname,
+ default:
+ break;
+ }
++ break;
+ case SOL_RDMA:
+- if (rs->state > rs_listening) {
++ if (rs->state >= rs_opening) {
+ ret = ERR(EINVAL);
+ break;
+ }