return ret;
}
+static void *do_shutdown(void *arg)
+{
+ int rs = (int) arg;
+
+ sleep(1);
+ printf("calling shutdown from separate thread\n");
+ rs_shutdown(rs, SHUT_RDWR);
+ printf("shutdown completed\n");
+ return NULL;
+}
+
static int run(void)
{
+ struct pollfd fds;
int i, ret = 0;
buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
if (fork_pid)
wait(NULL);
else
- rs_shutdown(rs, SHUT_RDWR);
+ {
+ if (dst_addr) {
+ pthread_t thread_id;
+ ret = pthread_create(&thread_id, NULL, do_shutdown, (void*) rs);
+
+ fds.fd = rs;
+ fds.events = POLLIN;
+ printf("calling poll\n");
+ ret = rs_poll(&fds, 1, 10000);
+ printf("poll ret %d (%s) revents 0x%x (POLLHUP 0x%x)\n",
+ ret, strerror(errno), fds.revents, POLLHUP);
+ } else {
+ printf("sleeping for 10 seconds\n");
+ sleep(10);
+ rs_shutdown(rs, SHUT_RDWR);
+ }
+ }
rs_close(rs);
free:
free(buf);
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
+ fastlock_acquire(&rs->cq_wait_lock);
if (rs->type == SOCK_STREAM)
rs_get_cq_event(rs);
else
ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
/*
* For graceful disconnect, notify the remote side that we're
- * disconnecting and wait until all outstanding sends complete.
+ * disconnecting and wait until all outstanding sends complete, provided
+ * that the remote side has not sent a disconnect message.
*/
int rshutdown(int socket, int how)
{
if (rs->state & rs_connected)
rs_process_cq(rs, 0, rs_conn_all_sends_done);
+ if (rs->state & rs_disconnected) {
+ /* Generate event by flushing receives to unblock rpoll */
+ ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ rdma_disconnect(rs->cm_id);
+ }
+
if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
rs_set_nonblocking(rs, rs->fd_flags);