From 992a8c5abcb0199fd6214fd31b6695573ccb5bf5 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 16 May 2012 15:23:41 -0700 Subject: [PATCH] rstream: Set rsocket nonblocking if set to async operation If asynchronous use is specified (use of poll/select), set the rsocket to nonblocking. This matches the common usage case for asynchronous sockets. When asynchronous support is enabled, the nonblocking/blocking test option determines whether the poll/select call will block, or if rstream will spin on the calls. This provides more flexibility with how the rsocket is used. Specifically, MPI often uses nonblocking sockets, but spins on poll/select. However, many apps will use nonblocking sockets, but wait on poll/select. Signed-off-by: Sean Hefty --- examples/rstream.c | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/examples/rstream.c b/examples/rstream.c index 8d5a22dc..2b25ef56 100644 --- a/examples/rstream.c +++ b/examples/rstream.c @@ -57,6 +57,7 @@ static int use_rs = 1; static int use_async; static int verify; static int flags = MSG_DONTWAIT; +static int poll_timeout = 0; static int custom; static int iterations = 1; static int transfer_size = 1000; @@ -190,6 +191,17 @@ static int verify_buf(void *buf, int size) return 0; } +static int do_poll(struct pollfd *fds) +{ + int ret; + + do { + ret = rs_poll(fds, 1, poll_timeout); + } while (!ret); + + return ret == 1 ? 0 : ret; +} + static int send_xfer(int rs, int size) { struct pollfd fds; @@ -205,8 +217,8 @@ static int send_xfer(int rs, int size) for (offset = 0; offset < size; ) { if (use_async) { - ret = rs_poll(&fds, 1, -1); - if (ret != 1) + ret = do_poll(&fds); + if (ret) return ret; } @@ -234,8 +246,8 @@ static int recv_xfer(int rs, int size) for (offset = 0; offset < size; ) { if (use_async) { - ret = rs_poll(&fds, 1, -1); - if (ret != 1) + ret = do_poll(&fds); + if (ret) return ret; } @@ -323,8 +335,8 @@ static void set_options(int rs) break; } - rs_setsockopt(rs, IPPROTO_TCP, TCP_NODELAY, - (void *) &no_delay, sizeof(no_delay)); + val = 1; + rs_setsockopt(rs, IPPROTO_TCP, TCP_NODELAY, (void *) &val, sizeof(val)); if (flags & MSG_DONTWAIT) { rs_fcntl(rs, F_SETFL, O_NONBLOCK); @@ -377,8 +389,8 @@ static int server_connect(void) fds.fd = lrs; fds.events = POLLIN; - ret = rs_poll(&fds, 1, -1); - if (ret != 1) { + ret = do_poll(&fds); + if (ret) { perror("rpoll"); goto close; } @@ -421,21 +433,24 @@ static int client_connect(void) ret = rs_connect(rs, res->ai_addr, res->ai_addrlen); if (ret && (errno != EINPROGRESS)) { perror("rconnect"); - rs_close(rs); - rs = ret; + goto err; } if (errno == EINPROGRESS) { fds.fd = rs; fds.events = POLLOUT; - do { - ret = rs_poll(&fds, 1, -1); - } while (!ret); + ret = do_poll(&fds); + if (ret) + goto err; } free: freeaddrinfo(res); return rs; +err: + freeaddrinfo(res); + rs_close(rs); + return ret; } static int run(void) @@ -562,6 +577,9 @@ int main(int argc, char **argv) } } + if (!(flags & MSG_DONTWAIT)) + poll_timeout = -1; + ret = run(); return ret; } -- 2.46.0