]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Handle race between rshutdown and rpoll
authorSean Hefty <sean.hefty@intel.com>
Fri, 16 Aug 2013 22:15:12 +0000 (15:15 -0700)
committerSean Hefty <sean.hefty@intel.com>
Fri, 16 Aug 2013 22:15:12 +0000 (15:15 -0700)
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
examples/rstream.c
src/rsocket.c

index 278437f6e05a48164197907f71d054e2084a89f5..0135927b5aff104b795f77a68834afc05a07a13c 100644 (file)
@@ -445,8 +445,20 @@ free:
        return ret;
 }
 
+static void *do_shutdown(void *arg)
+{
+       int rs = (int) arg;
+
+       sleep(1);
+       printf("calling shutdown from separate thread\n");
+       rs_shutdown(rs, SHUT_RDWR);
+       printf("shutdown completed\n");
+       return NULL;
+}
+
 static int run(void)
 {
+       struct pollfd fds;
        int i, ret = 0;
 
        buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
@@ -506,7 +518,23 @@ static int run(void)
        if (fork_pid)
                wait(NULL);
        else
-               rs_shutdown(rs, SHUT_RDWR);
+       {
+               if (dst_addr) {
+                       pthread_t thread_id;
+                       ret = pthread_create(&thread_id, NULL, do_shutdown, (void*) rs);
+
+                       fds.fd = rs;
+                       fds.events = POLLIN;
+                       printf("calling poll\n");
+                       ret = rs_poll(&fds, 1, 10000);
+                       printf("poll ret %d (%s) revents 0x%x (POLLHUP 0x%x)\n",
+                               ret, strerror(errno), fds.revents, POLLHUP);
+               } else {
+                       printf("sleeping for 10 seconds\n");
+                       sleep(10);
+                       rs_shutdown(rs, SHUT_RDWR);
+               }
+       }
        rs_close(rs);
 free:
        free(buf);
index d544dd097cda228de114173c8fe569dc1881f057..e45b26dcd6809c763afe789d400586bb46afbbe6 100644 (file)
@@ -2948,10 +2948,12 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
 
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
+                       fastlock_acquire(&rs->cq_wait_lock);
                        if (rs->type == SOCK_STREAM)
                                rs_get_cq_event(rs);
                        else
                                ds_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
@@ -3098,7 +3100,8 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds,
 
 /*
  * For graceful disconnect, notify the remote side that we're
- * disconnecting and wait until all outstanding sends complete.
+ * disconnecting and wait until all outstanding sends complete, provided
+ * that the remote side has not sent a disconnect message.
  */
 int rshutdown(int socket, int how)
 {
@@ -3138,6 +3141,12 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected)
                rs_process_cq(rs, 0, rs_conn_all_sends_done);
 
+       if (rs->state & rs_disconnected) {
+               /* Generate event by flushing receives to unblock rpoll */
+               ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+               rdma_disconnect(rs->cm_id);
+       }
+
        if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
                rs_set_nonblocking(rs, rs->fd_flags);