return ret;
}
+static void *do_shutdown(void *arg)
+{
+ int rs = (int) arg;
+
+ sleep(1);
+ printf("calling shutdown from separate thread\n");
+ rs_shutdown(rs, SHUT_RDWR);
+ printf("shutdown completed\n");
+ return NULL;
+}
+
static int run(void)
{
+ struct pollfd fds;
int i, ret = 0;
buf = malloc(!custom ? test_size[TEST_CNT - 1].size : transfer_size);
if (fork_pid)
wait(NULL);
else
- rs_shutdown(rs, SHUT_RDWR);
+ {
+ if (dst_addr) {
+ pthread_t thread_id;
+ ret = pthread_create(&thread_id, NULL, do_shutdown, (void*) rs);
+
+ fds.fd = rs;
+ fds.events = POLLIN;
+ printf("calling poll\n");
+ ret = rs_poll(&fds, 1, 10000);
+ printf("poll ret %d (%s) revents 0x%x (POLLHUP 0x%x)\n",
+ ret, strerror(errno), fds.revents, POLLHUP);
+ } else {
+ printf("sleeping for 10 seconds\n");
+ sleep(10);
+ rs_shutdown(rs, SHUT_RDWR);
+ }
+ }
rs_close(rs);
free:
free(buf);
enum {
RS_CTRL_DISCONNECT,
- RS_CTRL_SHUTDOWN
+ RS_CTRL_SHUTDOWN,
+ RS_CTRL_SYNC
};
struct rs_msg {
case RS_OP_CTRL:
rs->ctrl_avail++;
if (rs_msg_data(rs_wr_data(wc.wr_id)) == RS_CTRL_DISCONNECT)
+ {
rs->state = rs_disconnected;
+ printf("rsocket disconnected - send complete\n");
+ }
break;
case RS_OP_IOMAP_SGL:
rs->sqe_avail++;
void *context;
int ret;
+ printf("rs_get_cq_event\n");
if (!rs->cq_armed)
return 0;
+ printf("waiting on cq event\n");
ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context);
if (!ret) {
ibv_ack_cq_events(rs->cm_id->recv_cq, 1);
rs->cq_armed = 0;
+ printf("retrieved event\n");
} else if (errno != EAGAIN) {
rs->state = rs_error;
}
rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
+ fastlock_acquire(&rs->cq_wait_lock);
if (rs->type == SOCK_STREAM)
rs_get_cq_event(rs);
else
ds_get_cq_event(rs);
+ fastlock_release(&rs->cq_wait_lock);
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
if (ret)
break;
+ printf("real poll start\n");
ret = poll(rfds, nfds, timeout);
+ printf("real poll exit\n");
if (ret <= 0)
break;
return ret;
}
+static void rs_unblock_rpoll(struct rsocket *rs)
+{
+ if (!rs->ctrl_avail) {
+ if (rs_process_cq(rs, 0, rs_conn_can_send_ctrl))
+ return;
+ }
+
+ ibv_req_notify_cq(rs->cm_id->recv_cq, 0);
+ rs->ctrl_avail--;
+ rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, RS_CTRL_SYNC));
+}
+
/*
* For graceful disconnect, notify the remote side that we're
* disconnecting and wait until all outstanding sends complete.
if (rs->state & rs_connected)
rs_process_cq(rs, 0, rs_conn_all_sends_done);
+ rs_unblock_poll(rs);
if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected))
rs_set_nonblocking(rs, rs->fd_flags);