From: Arlin Davis Date: Tue, 14 Jul 2015 22:30:16 +0000 (-0700) Subject: dtest: modify rdma_write_with_msg to support uni-direction streaming X-Git-Tag: dapl-2.1.6-1~21 X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=27fcdc8da49f6af40d84090dfc38b16ddb6c9f61;p=~ardavis%2Fdapl.git dtest: modify rdma_write_with_msg to support uni-direction streaming add proper client->server handshake at end of rdma data stream to insure all data is delivered before disconnecting. Signed-off-by: Arlin Davis --- diff --git a/test/dtest/dtest.c b/test/dtest/dtest.c index bd30207..cfb074b 100755 --- a/test/dtest/dtest.c +++ b/test/dtest/dtest.c @@ -848,11 +848,12 @@ int main(int argc, char **argv) if (ret != DAT_SUCCESS) { fprintf(stderr, "%d Error do_rdma_write_%swith_msg: %s\n", - getpid(), write_immed ? "imm_":"", DT_RetToStr(ret)); + getpid(), write_immed && write_only ? "imm_":"", + DT_RetToStr(ret)); goto cleanup; } else LOGPRINTF("%d do_rdma_write_%swith_msg complete\n", - getpid(), write_immed ? "imm_":""); + getpid(), write_immed && write_only ? "imm_":""); if (write_only || !rdma_read) goto complete; @@ -878,7 +879,6 @@ int main(int argc, char **argv) } cleanup: - flush_evds(); failed++; complete: @@ -903,6 +903,8 @@ complete: h_ep = DAT_HANDLE_NULL; } } + if (connected) + flush_evds(); /* free EVDs */ LOGPRINTF("%d destroy events\n", getpid()); @@ -1193,6 +1195,7 @@ DAT_RETURN connect_ep(char *hostname, printf("%d Server is waiting for client connection to send" " server info\n", getpid()); + fflush(stdout); if (send_server_params(ser_sa)) { printf("%d Failed to send server params\n", getpid()); return -1; @@ -1408,6 +1411,7 @@ no_resolution: printf("\n%d CONNECTED!\n\n", getpid()); connected = 1; + fflush(stdout); #if CONNECT_ONLY return 0; @@ -1425,7 +1429,7 @@ no_resolution: ret = send_msg(p_rmr_snd, sizeof(DAT_RMR_TRIPLET), lmr_context_send_msg, - cookie, DAT_COMPLETION_SUPPRESS_FLAG); + cookie, DAT_COMPLETION_DEFAULT_FLAG); if (ret != DAT_SUCCESS) { fprintf(stderr, "%d Error send_msg: %s\n", @@ -1571,15 +1575,17 @@ void disconnect_ep(void) DAT_RETURN do_rdma_write_with_msg(void) { DAT_EVENT event; + DAT_DTO_COMPLETION_EVENT_DATA *dto_event; DAT_LMR_TRIPLET l_iov[MSG_IOV_COUNT]; DAT_RMR_TRIPLET r_iov; DAT_DTO_COOKIE cookie; DAT_RETURN ret; + DAT_COMPLETION_FLAGS flags; int i; printf("\n %d RDMA WRITE DATA with SEND MSG\n\n", getpid()); - cookie.as_64 = 0x5555; + dto_event = &event.event_data.dto_completion_event_data; if (recv_msg_index >= MSG_BUF_COUNT) return (DAT_ABORT); @@ -1592,6 +1598,9 @@ DAT_RETURN do_rdma_write_with_msg(void) else strcpy((char *)sbuf, "client RDMA write data..."); + if (uni_direction && server) + goto rmsg; + for (i = 0; i < MSG_IOV_COUNT; i++) { l_iov[i].lmr_context = lmr_context_send; l_iov[i].segment_length = buf_len / MSG_IOV_COUNT; @@ -1605,26 +1614,50 @@ DAT_RETURN do_rdma_write_with_msg(void) start = get_time(); for (i = 0; i < burst; i++) { - cookie.as_64 = 0x9999; + if (!((i+1) % signal_rate)) + flags = DAT_COMPLETION_DEFAULT_FLAG; + else + flags = DAT_COMPLETION_SUPPRESS_FLAG; + + cookie.as_64 = i; + LOGPRINTF("%d rdma_write # %d %s\n", + getpid(), i + 1, flags ? "SUPPRESS":"SIGNAL"); ret = dat_ep_post_rdma_write(h_ep, // ep_handle MSG_IOV_COUNT, // num_segments l_iov, // LMR cookie, // user_cookie &r_iov, // RMR - DAT_COMPLETION_SUPPRESS_FLAG); + flags); if (ret != DAT_SUCCESS) { fprintf(stderr, "%d: ERROR: dat_ep_post_rdma_write() %s\n", getpid(), DT_RetToStr(ret)); return (DAT_ABORT); } + if (flags == DAT_COMPLETION_DEFAULT_FLAG) { + if (collect_event(h_dto_req_evd, + &event, + DTO_TIMEOUT, + &rdma_wr_poll_count) != DAT_SUCCESS) { + printf("%d %s RDMA write buffer contains: %s\n", + getpid(), server ? "SERVER:" : "CLIENT:", rbuf); + return (DAT_ABORT); + } + if (dto_event->status || + dto_event->user_cookie.as_64 != i) { + fprintf(stderr, "ERROR rdma_write: cookie=" + " "F64x " exp 0x%x st 0x%x\n", + dto_event->user_cookie.as_64, i, + dto_event->status); + return (DAT_ABORT); + } + } LOGPRINTF("%d rdma_write # %d completed\n", getpid(), i + 1); } - /* - * Send RMR information a 2nd time to indicate completion - * NOTE: already swapped to network order in connect_ep - */ + if (server) + goto rmsg; +smsg: printf("%d Sending RDMA WRITE completion message\n", getpid()); ret = send_msg(p_rmr_snd, @@ -1639,14 +1672,18 @@ DAT_RETURN do_rdma_write_with_msg(void) } else { LOGPRINTF("%d send_msg completed\n", getpid()); } - + if (server) + goto acked; +rmsg: /* inbound recv event, send completion's suppressed */ if (collect_event(h_dto_rcv_evd, &event, DTO_TIMEOUT, - &rdma_wr_poll_count) != DAT_SUCCESS) + &rdma_wr_poll_count) != DAT_SUCCESS) { + printf("%d %s RDMA write buffer contains: %s\n", + getpid(), server ? "SERVER:" : "CLIENT:", rbuf); return (DAT_ABORT); - + } stop = get_time(); ts.rdma_wr = ((stop - start) * 1.0e6); @@ -1663,15 +1700,18 @@ DAT_RETURN do_rdma_write_with_msg(void) || (event.event_data.dto_completion_event_data.user_cookie.as_64 != recv_msg_index)) { fprintf(stderr, - "unexpected event data for receive: len=%d cookie=" F64x - " exp %d/%d\n", - (int)event.event_data.dto_completion_event_data. - transfered_length, + "unexpected event data for receive: st=%d len=%d" + "cookie=" F64x " exp %d/%d\n", + event.event_data.dto_completion_event_data.status, + (int)event.event_data.dto_completion_event_data.transfered_length, event.event_data.dto_completion_event_data.user_cookie. as_64, (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index); return (DAT_ABORT); } + if (server) + goto smsg; +acked: /* swap received RMR msg: network order to host order */ r_iov = p_rmr_rcv[recv_msg_index]; @@ -1787,73 +1827,80 @@ DAT_RETURN do_rdma_write_imm_with_msg(void) } } + if (uni_direction && !server) + goto smsg; done: - if ((!uni_direction) || (uni_direction && server)) { - /* Wait to RECEIVE the LAST message, immediate data expected */ - LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid()); - if (collect_event(h_dto_rcv_evd, - &event, - DTO_TIMEOUT, - &rdma_wr_poll_count) != DAT_SUCCESS) - return (DAT_ABORT); - - if (event.event_number != (int)DAT_IB_DTO_EVENT || - ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA || - ext_event->val.immed.data != 0x7777) { - printf("unexpected event 0x%x type 0x%x or idata 0x%x" - ", waiting for RW-IMMED #0x%x\n", - event.event_number, ext_event->type, - ext_event->val.immed.data, DAT_IB_DTO_EVENT); - return (DAT_ABORT); - } - recv_msg_index++; + /* Wait to RECEIVE the LAST message, immediate data expected */ + LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid()); + if (collect_event(h_dto_rcv_evd, + &event, + DTO_TIMEOUT, + &rdma_wr_poll_count) != DAT_SUCCESS) + return (DAT_ABORT); - /* Send last message received ACK message back */ - cookie.as_64 = 0x9999; - ret = send_msg(p_rmr_snd, - sizeof(DAT_RMR_TRIPLET), - lmr_context_send_msg, - cookie, DAT_COMPLETION_SUPPRESS_FLAG); + if (event.event_number != (int)DAT_IB_DTO_EVENT || + ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA || + ext_event->val.immed.data != 0x7777) { + printf("unexpected event 0x%x type 0x%x or idata 0x%x" + ", waiting for RW-IMMED #0x%x\n", + event.event_number, ext_event->type, + ext_event->val.immed.data, DAT_IB_DTO_EVENT); + return (DAT_ABORT); + } + recv_msg_index++; - if (ret != DAT_SUCCESS) { - fprintf(stderr, "%d Error send_msg: %s\n", - getpid(), DT_RetToStr(ret)); - return (ret); - } else { - LOGPRINTF("%d send_msg completed\n", getpid()); - } + if (server) + goto rmsg; +smsg: + printf("%d sending LAST msg ACK to remote\n", getpid()); + /* Send last message received ACK message back */ + cookie.as_64 = 0x9999; + ret = send_msg(p_rmr_snd, + sizeof(DAT_RMR_TRIPLET), + lmr_context_send_msg, + cookie, DAT_COMPLETION_SUPPRESS_FLAG); + if (ret != DAT_SUCCESS) { + fprintf(stderr, "%d Error send_msg: %s\n", + getpid(), DT_RetToStr(ret)); + return (ret); + } else { + LOGPRINTF("%d send_msg completed\n", getpid()); } - if (!uni_direction || !server) { - /* Wait for my LAST message ACK from remote side */ - printf("%d waiting for LAST msg ACK from remote\n", getpid()); - if (collect_event(h_dto_rcv_evd, - &event, - DTO_TIMEOUT, - &rdma_wr_poll_count) != DAT_SUCCESS) - return (DAT_ABORT); + if (server) + goto acked; +rmsg: + /* Wait for my LAST message ACK from remote side */ + printf("%d waiting for LAST msg ACK from remote\n", getpid()); + if (collect_event(h_dto_rcv_evd, + &event, + DTO_TIMEOUT, + &rdma_wr_poll_count) != DAT_SUCCESS) + return (DAT_ABORT); - printf("%d LAST rdma write ACK message arrived!\n", getpid()); - if (event.event_number != DAT_DTO_COMPLETION_EVENT) { - fprintf(stderr, "%d Error unexpected DTO event : %s\n", - getpid(), DT_EventToStr(event.event_number)); - return (DAT_ABORT); - } + printf("%d LAST rdma write ACK message arrived!\n", getpid()); + if (event.event_number != DAT_DTO_COMPLETION_EVENT) { + fprintf(stderr, "%d Error unexpected DTO event : %s\n", + getpid(), DT_EventToStr(event.event_number)); + return (DAT_ABORT); + } - if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET)) - || (dto_event->user_cookie.as_64 != recv_msg_index)) { - fprintf(stderr, - "unexpected event data for receive: len=%d " - "cookie=" F64x" exp %d/%d\n", - (int)dto_event->transfered_length, - dto_event->user_cookie.as_64, - (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index); - return (DAT_ABORT); - } - printf("%d LAST RDMA_WRITE ACK from remote \n", getpid()); + if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET)) + || (dto_event->user_cookie.as_64 != recv_msg_index)) { + fprintf(stderr, + "unexpected event data for receive: len=%d " + "cookie=" F64x" exp %d/%d\n", + (int)dto_event->transfered_length, + dto_event->user_cookie.as_64, + (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index); + return (DAT_ABORT); } + printf("%d LAST RDMA_WRITE ACK from remote \n", getpid()); + if (server) + goto smsg; +acked: stop = get_time(); ts.rdma_wr = ((stop - start) * 1.0e6);