From e70c3e2d2a47ecbb37fe14aa83184022edc5b97e Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Sat, 26 May 2012 00:43:00 -0700 Subject: [PATCH] Refresh of rs-test-nonblock --- src/rsocket.c | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index c8332fee..8cffc667 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -942,6 +942,16 @@ static int rs_can_send(struct rsocket *rs) (rs->target_sgl[rs->target_sge].length != 0); } +static int rs_cq_can_send(struct rsocket *rs) +{ + if (rs_can_send(rs)) + return 0; + else if (rs_nonblocking(rs)) + return ERR(EWOULDBLOCK); + else + return 1; +} + static int rs_can_send_ctrl(struct rsocket *rs) { return !rs->ctrl_avail; @@ -952,9 +962,19 @@ static int rs_have_rdata(struct rsocket *rs) return (rs->rmsg_head != rs->rmsg_tail); } +static int rs_cq_have_rdata(struct rsocket *rs) +{ + if (rs_have_rdata(rs)) + return 0; + else if (rs_nonblocking(rs)) + return ERR(EWOULDBLOCK); + else + return 1; +} + static int rs_all_sends_done(struct rsocket *rs) { - return (rs->sqe_avail + rs->ctrl_avail) == rs->sq_size; + return (rs->sqe_avail + rs->ctrl_avail) != rs->sq_size; } static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) @@ -1012,7 +1032,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); + ret = rs_process_cq(rs, rs_cq_have_rdata); if (ret && errno != ECONNRESET) goto out; } @@ -1116,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); + ret = rs_process_cq(rs, rs_cq_can_send); if (ret) break; } @@ -1213,7 +1233,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { - ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); + ret = rs_process_cq(rs, rs_cq_can_send); if (ret) break; } @@ -1554,8 +1574,8 @@ int rshutdown(int socket, int how) 0, 0, 0); } - if (!rs_all_sends_done(rs) && rs->state != rs_error) - rs_process_cq(rs, 0, rs_all_sends_done); + if (rs_all_sends_done(rs) && rs->state != rs_error) + rs_process_cq(rs, rs_all_sends_done); return 0; } -- 2.45.2