if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error do_rdma_write_%swith_msg: %s\n",
- getpid(), write_immed ? "imm_":"", DT_RetToStr(ret));
+ getpid(), write_immed && write_only ? "imm_":"",
+ DT_RetToStr(ret));
goto cleanup;
} else
LOGPRINTF("%d do_rdma_write_%swith_msg complete\n",
- getpid(), write_immed ? "imm_":"");
+ getpid(), write_immed && write_only ? "imm_":"");
if (write_only || !rdma_read)
goto complete;
}
cleanup:
- flush_evds();
failed++;
complete:
h_ep = DAT_HANDLE_NULL;
}
}
+ if (connected)
+ flush_evds();
/* free EVDs */
LOGPRINTF("%d destroy events\n", getpid());
printf("%d Server is waiting for client connection to send"
" server info\n",
getpid());
+ fflush(stdout);
if (send_server_params(ser_sa)) {
printf("%d Failed to send server params\n", getpid());
return -1;
printf("\n%d CONNECTED!\n\n", getpid());
connected = 1;
+ fflush(stdout);
#if CONNECT_ONLY
return 0;
ret = send_msg(p_rmr_snd,
sizeof(DAT_RMR_TRIPLET),
lmr_context_send_msg,
- cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+ cookie, DAT_COMPLETION_DEFAULT_FLAG);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error send_msg: %s\n",
DAT_RETURN do_rdma_write_with_msg(void)
{
DAT_EVENT event;
+ DAT_DTO_COMPLETION_EVENT_DATA *dto_event;
DAT_LMR_TRIPLET l_iov[MSG_IOV_COUNT];
DAT_RMR_TRIPLET r_iov;
DAT_DTO_COOKIE cookie;
DAT_RETURN ret;
+ DAT_COMPLETION_FLAGS flags;
int i;
printf("\n %d RDMA WRITE DATA with SEND MSG\n\n", getpid());
- cookie.as_64 = 0x5555;
+ dto_event = &event.event_data.dto_completion_event_data;
if (recv_msg_index >= MSG_BUF_COUNT)
return (DAT_ABORT);
else
strcpy((char *)sbuf, "client RDMA write data...");
+ if (uni_direction && server)
+ goto rmsg;
+
for (i = 0; i < MSG_IOV_COUNT; i++) {
l_iov[i].lmr_context = lmr_context_send;
l_iov[i].segment_length = buf_len / MSG_IOV_COUNT;
start = get_time();
for (i = 0; i < burst; i++) {
- cookie.as_64 = 0x9999;
+ if (!((i+1) % signal_rate))
+ flags = DAT_COMPLETION_DEFAULT_FLAG;
+ else
+ flags = DAT_COMPLETION_SUPPRESS_FLAG;
+
+ cookie.as_64 = i;
+ LOGPRINTF("%d rdma_write # %d %s\n",
+ getpid(), i + 1, flags ? "SUPPRESS":"SIGNAL");
ret = dat_ep_post_rdma_write(h_ep, // ep_handle
MSG_IOV_COUNT, // num_segments
l_iov, // LMR
cookie, // user_cookie
&r_iov, // RMR
- DAT_COMPLETION_SUPPRESS_FLAG);
+ flags);
if (ret != DAT_SUCCESS) {
fprintf(stderr,
"%d: ERROR: dat_ep_post_rdma_write() %s\n",
getpid(), DT_RetToStr(ret));
return (DAT_ABORT);
}
+ if (flags == DAT_COMPLETION_DEFAULT_FLAG) {
+ if (collect_event(h_dto_req_evd,
+ &event,
+ DTO_TIMEOUT,
+ &rdma_wr_poll_count) != DAT_SUCCESS) {
+ printf("%d %s RDMA write buffer contains: %s\n",
+ getpid(), server ? "SERVER:" : "CLIENT:", rbuf);
+ return (DAT_ABORT);
+ }
+ if (dto_event->status ||
+ dto_event->user_cookie.as_64 != i) {
+ fprintf(stderr, "ERROR rdma_write: cookie="
+ " "F64x " exp 0x%x st 0x%x\n",
+ dto_event->user_cookie.as_64, i,
+ dto_event->status);
+ return (DAT_ABORT);
+ }
+ }
LOGPRINTF("%d rdma_write # %d completed\n", getpid(), i + 1);
}
- /*
- * Send RMR information a 2nd time to indicate completion
- * NOTE: already swapped to network order in connect_ep
- */
+ if (server)
+ goto rmsg;
+smsg:
printf("%d Sending RDMA WRITE completion message\n", getpid());
ret = send_msg(p_rmr_snd,
} else {
LOGPRINTF("%d send_msg completed\n", getpid());
}
-
+ if (server)
+ goto acked;
+rmsg:
/* inbound recv event, send completion's suppressed */
if (collect_event(h_dto_rcv_evd,
&event,
DTO_TIMEOUT,
- &rdma_wr_poll_count) != DAT_SUCCESS)
+ &rdma_wr_poll_count) != DAT_SUCCESS) {
+ printf("%d %s RDMA write buffer contains: %s\n",
+ getpid(), server ? "SERVER:" : "CLIENT:", rbuf);
return (DAT_ABORT);
-
+ }
stop = get_time();
ts.rdma_wr = ((stop - start) * 1.0e6);
|| (event.event_data.dto_completion_event_data.user_cookie.as_64 !=
recv_msg_index)) {
fprintf(stderr,
- "unexpected event data for receive: len=%d cookie=" F64x
- " exp %d/%d\n",
- (int)event.event_data.dto_completion_event_data.
- transfered_length,
+ "unexpected event data for receive: st=%d len=%d"
+ "cookie=" F64x " exp %d/%d\n",
+ event.event_data.dto_completion_event_data.status,
+ (int)event.event_data.dto_completion_event_data.transfered_length,
event.event_data.dto_completion_event_data.user_cookie.
as_64, (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
return (DAT_ABORT);
}
+ if (server)
+ goto smsg;
+acked:
/* swap received RMR msg: network order to host order */
r_iov = p_rmr_rcv[recv_msg_index];
}
}
+ if (uni_direction && !server)
+ goto smsg;
done:
- if ((!uni_direction) || (uni_direction && server)) {
- /* Wait to RECEIVE the LAST message, immediate data expected */
- LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid());
- if (collect_event(h_dto_rcv_evd,
- &event,
- DTO_TIMEOUT,
- &rdma_wr_poll_count) != DAT_SUCCESS)
- return (DAT_ABORT);
-
- if (event.event_number != (int)DAT_IB_DTO_EVENT ||
- ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA ||
- ext_event->val.immed.data != 0x7777) {
- printf("unexpected event 0x%x type 0x%x or idata 0x%x"
- ", waiting for RW-IMMED #0x%x\n",
- event.event_number, ext_event->type,
- ext_event->val.immed.data, DAT_IB_DTO_EVENT);
- return (DAT_ABORT);
- }
- recv_msg_index++;
+ /* Wait to RECEIVE the LAST message, immediate data expected */
+ LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid());
+ if (collect_event(h_dto_rcv_evd,
+ &event,
+ DTO_TIMEOUT,
+ &rdma_wr_poll_count) != DAT_SUCCESS)
+ return (DAT_ABORT);
- /* Send last message received ACK message back */
- cookie.as_64 = 0x9999;
- ret = send_msg(p_rmr_snd,
- sizeof(DAT_RMR_TRIPLET),
- lmr_context_send_msg,
- cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+ if (event.event_number != (int)DAT_IB_DTO_EVENT ||
+ ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA ||
+ ext_event->val.immed.data != 0x7777) {
+ printf("unexpected event 0x%x type 0x%x or idata 0x%x"
+ ", waiting for RW-IMMED #0x%x\n",
+ event.event_number, ext_event->type,
+ ext_event->val.immed.data, DAT_IB_DTO_EVENT);
+ return (DAT_ABORT);
+ }
+ recv_msg_index++;
- if (ret != DAT_SUCCESS) {
- fprintf(stderr, "%d Error send_msg: %s\n",
- getpid(), DT_RetToStr(ret));
- return (ret);
- } else {
- LOGPRINTF("%d send_msg completed\n", getpid());
- }
+ if (server)
+ goto rmsg;
+smsg:
+ printf("%d sending LAST msg ACK to remote\n", getpid());
+ /* Send last message received ACK message back */
+ cookie.as_64 = 0x9999;
+ ret = send_msg(p_rmr_snd,
+ sizeof(DAT_RMR_TRIPLET),
+ lmr_context_send_msg,
+ cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+ if (ret != DAT_SUCCESS) {
+ fprintf(stderr, "%d Error send_msg: %s\n",
+ getpid(), DT_RetToStr(ret));
+ return (ret);
+ } else {
+ LOGPRINTF("%d send_msg completed\n", getpid());
}
- if (!uni_direction || !server) {
- /* Wait for my LAST message ACK from remote side */
- printf("%d waiting for LAST msg ACK from remote\n", getpid());
- if (collect_event(h_dto_rcv_evd,
- &event,
- DTO_TIMEOUT,
- &rdma_wr_poll_count) != DAT_SUCCESS)
- return (DAT_ABORT);
+ if (server)
+ goto acked;
+rmsg:
+ /* Wait for my LAST message ACK from remote side */
+ printf("%d waiting for LAST msg ACK from remote\n", getpid());
+ if (collect_event(h_dto_rcv_evd,
+ &event,
+ DTO_TIMEOUT,
+ &rdma_wr_poll_count) != DAT_SUCCESS)
+ return (DAT_ABORT);
- printf("%d LAST rdma write ACK message arrived!\n", getpid());
- if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
- fprintf(stderr, "%d Error unexpected DTO event : %s\n",
- getpid(), DT_EventToStr(event.event_number));
- return (DAT_ABORT);
- }
+ printf("%d LAST rdma write ACK message arrived!\n", getpid());
+ if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
+ fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+ getpid(), DT_EventToStr(event.event_number));
+ return (DAT_ABORT);
+ }
- if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET))
- || (dto_event->user_cookie.as_64 != recv_msg_index)) {
- fprintf(stderr,
- "unexpected event data for receive: len=%d "
- "cookie=" F64x" exp %d/%d\n",
- (int)dto_event->transfered_length,
- dto_event->user_cookie.as_64,
- (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
- return (DAT_ABORT);
- }
- printf("%d LAST RDMA_WRITE ACK from remote \n", getpid());
+ if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET))
+ || (dto_event->user_cookie.as_64 != recv_msg_index)) {
+ fprintf(stderr,
+ "unexpected event data for receive: len=%d "
+ "cookie=" F64x" exp %d/%d\n",
+ (int)dto_event->transfered_length,
+ dto_event->user_cookie.as_64,
+ (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
+ return (DAT_ABORT);
}
+ printf("%d LAST RDMA_WRITE ACK from remote \n", getpid());
+ if (server)
+ goto smsg;
+acked:
stop = get_time();
ts.rdma_wr = ((stop - start) * 1.0e6);