]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
Refresh of shutdown
authorSean Hefty <sean.hefty@intel.com>
Sat, 17 Aug 2013 05:50:54 +0000 (22:50 -0700)
committerSean Hefty <sean.hefty@intel.com>
Sat, 17 Aug 2013 05:50:54 +0000 (22:50 -0700)
examples/rstream.c
src/rsocket.c

index 278437f6e05a48164197907f71d054e2084a89f5..0135927b5aff104b795f77a68834afc05a07a13c 100644 (file)
@@ -445,8 +445,20 @@ free:
        return ret;
 }
 
+static void *do_shutdown(void *arg)
+{
+       int rs = (int) arg;
+
+       sleep(1);
+       printf("calling shutdown from separate thread\n");
+       rs_shutdown(rs, SHUT_RDWR);
+       printf("shutdown completed\n");
+       return NULL;
+}
+
 static int run(void)
 {
+       struct pollfd fds;
        int i, ret = 0;
 
        buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
@@ -506,7 +518,23 @@ static int run(void)
        if (fork_pid)
                wait(NULL);
        else
-               rs_shutdown(rs, SHUT_RDWR);
+       {
+               if (dst_addr) {
+                       pthread_t thread_id;
+                       ret = pthread_create(&thread_id, NULL, do_shutdown, (void*) rs);
+
+                       fds.fd = rs;
+                       fds.events = POLLIN;
+                       printf("calling poll\n");
+                       ret = rs_poll(&fds, 1, 10000);
+                       printf("poll ret %d (%s) revents 0x%x (POLLHUP 0x%x)\n",
+                               ret, strerror(errno), fds.revents, POLLHUP);
+               } else {
+                       printf("sleeping for 10 seconds\n");
+                       sleep(10);
+                       rs_shutdown(rs, SHUT_RDWR);
+               }
+       }
        rs_close(rs);
 free:
        free(buf);
index d544dd097cda228de114173c8fe569dc1881f057..6ff8128f01b6a8180d1f60b552d3e267e7ebb44c 100644 (file)
@@ -133,7 +133,8 @@ enum {
 
 enum {
        RS_CTRL_DISCONNECT,
-       RS_CTRL_SHUTDOWN
+       RS_CTRL_SHUTDOWN,
+       RS_CTRL_SYNC
 };
 
 struct rs_msg {
@@ -1843,7 +1844,10 @@ static int rs_poll_cq(struct rsocket *rs)
                        case RS_OP_CTRL:
                                rs->ctrl_avail++;
                                if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
+                               {
                                        rs->state = rs_disconnected;
+                                       printf("rsocket disconnected - send complete\n");
+                               }
                                break;
                        case RS_OP_IOMAP_SGL:
                                rs->sqe_avail++;
@@ -1879,13 +1883,16 @@ static int rs_get_cq_event(struct rsocket *rs)
        void *context;
        int ret;
 
+       printf("rs_get_cq_event\n");
        if (!rs->cq_armed)
                return 0;
 
+       printf("waiting on cq event\n");
        ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
        if (!ret) {
                ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
                rs->cq_armed = 0;
+               printf("retrieved event\n");
        } else if (errno != EAGAIN) {
                rs->state = rs_error;
        }
@@ -2948,10 +2955,12 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
 
                rs = idm_lookup(&idm, fds[i].fd);
                if (rs) {
+                       fastlock_acquire(&rs->cq_wait_lock);
                        if (rs->type == SOCK_STREAM)
                                rs_get_cq_event(rs);
                        else
                                ds_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
                        fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
                } else {
                        fds[i].revents = rfds[i].revents;
@@ -2997,7 +3006,9 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
                if (ret)
                        break;
 
+               printf("real poll start\n");
                ret = poll(rfds, nfds, timeout);
+               printf("real poll exit\n");
                if (ret <= 0)
                        break;
 
@@ -3096,6 +3107,18 @@ int rselect(int nfds, fd_set *readfds, fd_set *writefds,
        return ret;
 }
 
+static void rs_unblock_rpoll(struct rsocket *rs)
+{
+       if (!rs->ctrl_avail) {
+               if (rs_process_cq(rs, 0, rs_conn_can_send_ctrl))
+                       return;
+       }
+
+       ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+       rs->ctrl_avail--;
+       rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, RS_CTRL_SYNC));
+}
+
 /*
  * For graceful disconnect, notify the remote side that we're
  * disconnecting and wait until all outstanding sends complete.
@@ -3138,6 +3161,7 @@ int rshutdown(int socket, int how)
        if (rs->state & rs_connected)
                rs_process_cq(rs, 0, rs_conn_all_sends_done);
 
+       rs_unblock_poll(rs);
        if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
                rs_set_nonblocking(rs, rs->fd_flags);