From: Sean Hefty Date: Tue, 26 Jun 2012 23:39:39 +0000 (-0700) Subject: Refresh of shut_wr X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=ee6157aea4612de8e15bf5cb91e48d97eb884f63;p=~shefty%2Flibrdmacm.git Refresh of shut_wr --- diff --git a/src/rsocket.c b/src/rsocket.c index 5dfec0ba..8e8a9d9e 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -96,7 +96,8 @@ enum { #define rs_msg_data(imm_data) (imm_data & 0x1FFFFFFF) enum { - RS_CTRL_DISCONNECT + RS_CTRL_DISCONNECT, + RS_CTRL_SHUTDOWN }; struct rs_msg { @@ -131,6 +132,9 @@ union rs_wr_id { }; }; +/* + * rsocket states are ordered as passive, connecting, connected, disconnected. + */ enum rs_state { rs_init, rs_bound = 0x0001, @@ -143,9 +147,9 @@ enum rs_state { rs_connected = 0x0100, rs_connect_wr = rs_connected | 0x0200, rs_connect_rd = rs_connected | 0x0400, - rs_disconnected = 0x0800, - rs_error = 0x1000, - rs_connect_error = rs_error | 0x2000 + rs_connect_error = 0x0800, + rs_disconnected = 0x1000, + rs_error = 0x2000, }; #define RS_OPT_SWAP_SGL 1 @@ -701,9 +705,9 @@ do_connect: if (!ret) goto connected; if (errno == EAGAIN || errno == EWOULDBLOCK) - rs->state = rs_active_connect; + rs->state = rs_connecting; break; - case rs_active_connect: + case rs_connecting: ret = ucma_complete(rs->cm_id); if (ret) break; @@ -852,7 +856,7 @@ static int rs_give_credits(struct rsocket *rs) { return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && - rs->ctrl_avail && (rs->state == rs_connected); + rs->ctrl_avail && (rs->state & rs_connected); } static void rs_update_credits(struct rsocket *rs) @@ -900,14 +904,14 @@ static int rs_poll_cq(struct rsocket *rs) } else { rs->ctrl_avail++; } - if (wc.status != IBV_WC_SUCCESS && rs->state == rs_connected) { + if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) { rs->state = rs_error; rs->err = EIO; } } } - if (rs->state != rs_error) { + if (rs->state & rs_connected) { while (!ret && rcnt--) ret = rdma_post_recvv(rs->cm_id, NULL, NULL, 0); @@ -1043,7 +1047,7 @@ static int rs_can_send(struct rsocket *rs) static int rs_conn_can_send(struct rsocket *rs) { - return rs_can_send(rs) || (rs->state != rs_connected); + return rs_can_send(rs) || !(rs->state & rs_connect_wr); } static int rs_can_send_ctrl(struct rsocket *rs) @@ -1058,7 +1062,7 @@ static int rs_have_rdata(struct rsocket *rs) static int rs_conn_have_rdata(struct rsocket *rs) { - return rs_have_rdata(rs) || (rs->state != rs_connected); + return rs_have_rdata(rs) || !(rs->state & rs_connect_rd); } static int rs_all_sends_done(struct rsocket *rs) @@ -1111,7 +1115,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) int ret; rs = idm_at(&idm, socket); - if (rs->state < rs_connected) { + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { if (errno == EINPROGRESS) @@ -1213,7 +1217,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) int ret = 0; rs = idm_at(&idm, socket); - if (rs->state < rs_connected) { + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { if (errno == EINPROGRESS) @@ -1229,7 +1233,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) rs_conn_can_send); if (ret) break; - if (rs->state != rs_connected) { + if (!(rs->state & rs_connect_wr)) { ret = ERR(ECONNRESET); break; } @@ -1322,7 +1326,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags int i, ret = 0; rs = idm_at(&idm, socket); - if (rs->state < rs_connected) { + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { if (errno == EINPROGRESS) @@ -1343,7 +1347,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags rs_conn_can_send); if (ret) break; - if (rs->state != rs_connected) { + if (!(rs->state & rs_connect_wr)) { ret = ERR(ECONNRESET); break; } @@ -1435,17 +1439,35 @@ static int rs_poll_rs(struct rsocket *rs, int events, short revents; int ret; - switch (rs->state) { - case rs_listening: +check_cq: + if ((rs->state & rs_connected) || (rs->state == rs_disconnected) || + (rs->state & rs_error)) { + rs_process_cq(rs, nonblock, test); + + revents = 0; + if ((events & POLLIN) && rs_have_rdata(rs)) + revents |= POLLIN; + if ((events & POLLOUT) && rs_can_send(rs)) + revents |= POLLOUT; + if (!(rs->state & rs_connected)) { + if (rs->state == rs_disconnected) + revents |= POLLHUP; + else + revents |= POLLERR; + } + + return revents; + } + + if (rs->state == rs_listening) { fds.fd = rs->cm_id->channel->fd; fds.events = events; fds.revents = 0; poll(&fds, 1, 0); return fds.revents; - case rs_resolving_addr: - case rs_resolving_route: - case rs_active_connect: - case rs_accepting: + } + + if (rs->state & rs_opening) { ret = rs_do_connect(rs); if (ret) { if (errno == EINPROGRESS) { @@ -1455,30 +1477,13 @@ static int rs_poll_rs(struct rsocket *rs, int events, return POLLOUT; } } - /* fall through */ - case rs_connected: - case rs_disconnected: - case rs_error: - rs_process_cq(rs, nonblock, test); - - revents = 0; - if ((events & POLLIN) && rs_have_rdata(rs)) - revents |= POLLIN; - if ((events & POLLOUT) && rs_can_send(rs)) - revents |= POLLOUT; - if (rs->state > rs_connected) { - if (rs->state == rs_error) - revents |= POLLERR; - else - revents |= POLLHUP; - } + goto check_cq; + } - return revents; - case rs_connect_error: + if (rs->state == rs_connect_error) return (rs->err && events & POLLOUT) ? POLLOUT : 0; - default: - return 0; - } + + return 0; } static int rs_poll_check(struct pollfd *fds, nfds_t nfds) @@ -1690,18 +1695,27 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds, int rshutdown(int socket, int how) { struct rsocket *rs; - int ret = 0; + int ctrl, ret = 0; - if (how == SHUT_RD) + rs = idm_at(&idm, socket); + if (how == SHUT_RD) { + rs->state &= ~rs_connect_rd; + if (rs->state == rs_connected) + rs->state = rs_disconnected; return 0; + } - rs = idm_at(&idm, socket); if (rs->fd_flags & O_NONBLOCK) rs_set_nonblocking(rs, 0); - if (rs->state == rs_connected) { - if (how == SHUT_RDWR) + if (rs->state & rs_connected) { + if (how == SHUT_RDWR) { + ctrl = RS_CTRL_DISCONNECT; rs->state = rs_disconnected; + } else { + ctrl = RS_CTRL_SHUTDOWN; + rs->state &= ~rs_connect_wr; + } if (!rs_can_send_ctrl(rs)) { ret = rs_process_cq(rs, 0, rs_can_send_ctrl); if (ret) @@ -1710,7 +1724,7 @@ int rshutdown(int socket, int how) rs->ctrl_avail--; ret = rs_post_write(rs, 0, NULL, 0, - rs_msg_set(RS_OP_CTRL, RS_CTRL_DISCONNECT), + rs_msg_set(RS_OP_CTRL, ctrl), 0, 0, 0); } @@ -1728,7 +1742,7 @@ int rclose(int socket) struct rsocket *rs; rs = idm_at(&idm, socket); - if (rs->state == rs_connected) + if (rs->state & rs_connected) rshutdown(socket, SHUT_RDWR); rs_free(rs); @@ -1839,8 +1853,9 @@ int rsetsockopt(int socket, int level, int optname, default: break; } + break; case SOL_RDMA: - if (rs->state > rs_listening) { + if (rs->state >= rs_opening) { ret = ERR(EINVAL); break; }