From 4ceaf1011857b66124996948d8f3b93c47619752 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Fri, 16 Aug 2013 22:50:54 -0700 Subject: [PATCH] Refresh of shutdown --- examples/rstream.c | 30 +++++++++++++++++++++++++++++- src/rsocket.c | 26 +++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/examples/rstream.c b/examples/rstream.c index 278437f6..0135927b 100644 --- a/examples/rstream.c +++ b/examples/rstream.c @@ -445,8 +445,20 @@ free: return ret; } +static void *do_shutdown(void *arg) +{ + int rs = (int) arg; + + sleep(1); + printf("calling shutdown from separate thread\n"); + rs_shutdown(rs, SHUT_RDWR); + printf("shutdown completed\n"); + return NULL; +} + static int run(void) { + struct pollfd fds; int i, ret = 0; buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size); @@ -506,7 +518,23 @@ static int run(void) if (fork_pid) wait(NULL); else - rs_shutdown(rs, SHUT_RDWR); + { + if (dst_addr) { + pthread_t thread_id; + ret = pthread_create(&thread_id, NULL, do_shutdown, (void*) rs); + + fds.fd = rs; + fds.events = POLLIN; + printf("calling poll\n"); + ret = rs_poll(&fds, 1, 10000); + printf("poll ret %d (%s) revents 0x%x (POLLHUP 0x%x)\n", + ret, strerror(errno), fds.revents, POLLHUP); + } else { + printf("sleeping for 10 seconds\n"); + sleep(10); + rs_shutdown(rs, SHUT_RDWR); + } + } rs_close(rs); free: free(buf); diff --git a/src/rsocket.c b/src/rsocket.c index d544dd09..6ff8128f 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -133,7 +133,8 @@ enum { enum { RS_CTRL_DISCONNECT, - RS_CTRL_SHUTDOWN + RS_CTRL_SHUTDOWN, + RS_CTRL_SYNC }; struct rs_msg { @@ -1843,7 +1844,10 @@ static int rs_poll_cq(struct rsocket *rs) case RS_OP_CTRL: rs->ctrl_avail++; if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT) + { rs->state = rs_disconnected; + printf("rsocket disconnected - send complete\n"); + } break; case RS_OP_IOMAP_SGL: rs->sqe_avail++; @@ -1879,13 +1883,16 @@ static int rs_get_cq_event(struct rsocket *rs) void *context; int ret; + printf("rs_get_cq_event\n"); if (!rs->cq_armed) return 0; + printf("waiting on cq event\n"); ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context); if (!ret) { ibv_ack_cq_events(rs->cm_id->recv_cq, 1); rs->cq_armed = 0; + printf("retrieved event\n"); } else if (errno != EAGAIN) { rs->state = rs_error; } @@ -2948,10 +2955,12 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) rs = idm_lookup(&idm, fds[i].fd); if (rs) { + fastlock_acquire(&rs->cq_wait_lock); if (rs->type == SOCK_STREAM) rs_get_cq_event(rs); else ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); } else { fds[i].revents = rfds[i].revents; @@ -2997,7 +3006,9 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) if (ret) break; + printf("real poll start\n"); ret = poll(rfds, nfds, timeout); + printf("real poll exit\n"); if (ret <= 0) break; @@ -3096,6 +3107,18 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds, return ret; } +static void rs_unblock_rpoll(struct rsocket *rs) +{ + if (!rs->ctrl_avail) { + if (rs_process_cq(rs, 0, rs_conn_can_send_ctrl)) + return; + } + + ibv_req_notify_cq(rs->cm_id->recv_cq, 0); + rs->ctrl_avail--; + rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, RS_CTRL_SYNC)); +} + /* * For graceful disconnect, notify the remote side that we're * disconnecting and wait until all outstanding sends complete. @@ -3138,6 +3161,7 @@ int rshutdown(int socket, int how) if (rs->state & rs_connected) rs_process_cq(rs, 0, rs_conn_all_sends_done); + rs_unblock_poll(rs); if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) rs_set_nonblocking(rs, rs->fd_flags); -- 2.46.0