From: Sean Hefty Date: Wed, 16 May 2012 22:23:41 +0000 (-0700) Subject: rstream: Set rsocket nonblocking if set to async operation X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=992a8c5abcb0199fd6214fd31b6695573ccb5bf5;p=~shefty%2Flibrdmacm.git rstream: Set rsocket nonblocking if set to async operation If asynchronous use is specified (use of poll/select), set the rsocket to nonblocking. This matches the common usage case for asynchronous sockets. When asynchronous support is enabled, the nonblocking/blocking test option determines whether the poll/select call will block, or if rstream will spin on the calls. This provides more flexibility with how the rsocket is used. Specifically, MPI often uses nonblocking sockets, but spins on poll/select. However, many apps will use nonblocking sockets, but wait on poll/select. Signed-off-by: Sean Hefty --- diff --git a/examples/rstream.c b/examples/rstream.c index 8d5a22dc..2b25ef56 100644 --- a/examples/rstream.c +++ b/examples/rstream.c @@ -57,6 +57,7 @@ static int use_rs = 1; static int use_async; static int verify; static int flags = MSG_DONTWAIT; +static int poll_timeout = 0; static int custom; static int iterations = 1; static int transfer_size = 1000; @@ -190,6 +191,17 @@ static int verify_buf(void *buf, int size) return 0; } +static int do_poll(struct pollfd *fds) +{ + int ret; + + do { + ret = rs_poll(fds, 1, poll_timeout); + } while (!ret); + + return ret == 1 ? 0 : ret; +} + static int send_xfer(int rs, int size) { struct pollfd fds; @@ -205,8 +217,8 @@ static int send_xfer(int rs, int size) for (offset = 0; offset < size; ) { if (use_async) { - ret = rs_poll(&fds, 1, -1); - if (ret != 1) + ret = do_poll(&fds); + if (ret) return ret; } @@ -234,8 +246,8 @@ static int recv_xfer(int rs, int size) for (offset = 0; offset < size; ) { if (use_async) { - ret = rs_poll(&fds, 1, -1); - if (ret != 1) + ret = do_poll(&fds); + if (ret) return ret; } @@ -323,8 +335,8 @@ static void set_options(int rs) break; } - rs_setsockopt(rs, IPPROTO_TCP, TCP_NODELAY, - (void *) &no_delay, sizeof(no_delay)); + val = 1; + rs_setsockopt(rs, IPPROTO_TCP, TCP_NODELAY, (void *) &val, sizeof(val)); if (flags & MSG_DONTWAIT) { rs_fcntl(rs, F_SETFL, O_NONBLOCK); @@ -377,8 +389,8 @@ static int server_connect(void) fds.fd = lrs; fds.events = POLLIN; - ret = rs_poll(&fds, 1, -1); - if (ret != 1) { + ret = do_poll(&fds); + if (ret) { perror("rpoll"); goto close; } @@ -421,21 +433,24 @@ static int client_connect(void) ret = rs_connect(rs, res->ai_addr, res->ai_addrlen); if (ret && (errno != EINPROGRESS)) { perror("rconnect"); - rs_close(rs); - rs = ret; + goto err; } if (errno == EINPROGRESS) { fds.fd = rs; fds.events = POLLOUT; - do { - ret = rs_poll(&fds, 1, -1); - } while (!ret); + ret = do_poll(&fds); + if (ret) + goto err; } free: freeaddrinfo(res); return rs; +err: + freeaddrinfo(res); + rs_close(rs); + return ret; } static int run(void) @@ -562,6 +577,9 @@ int main(int argc, char **argv) } } + if (!(flags & MSG_DONTWAIT)) + poll_timeout = -1; + ret = run(); return ret; }