]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
dtest: modify rdma_write_with_msg to support uni-direction streaming
authorArlin Davis <arlin.r.davis@intel.com>
Tue, 14 Jul 2015 22:30:16 +0000 (15:30 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Tue, 14 Jul 2015 22:30:16 +0000 (15:30 -0700)
add proper client->server handshake at end of rdma data stream
to insure all data is delivered before disconnecting.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
test/dtest/dtest.c

index bd30207124158f2364d14aa4a2bb42a9c52394a5..cfb074b5ae15a23b9e4005fb9aa9243a6fed79d6 100755 (executable)
@@ -848,11 +848,12 @@ int main(int argc, char **argv)
 
        if (ret != DAT_SUCCESS) {
                fprintf(stderr, "%d Error do_rdma_write_%swith_msg: %s\n",
-                       getpid(), write_immed ? "imm_":"", DT_RetToStr(ret));
+                       getpid(), write_immed && write_only ? "imm_":"",
+                       DT_RetToStr(ret));
                goto cleanup;
        } else
                LOGPRINTF("%d do_rdma_write_%swith_msg complete\n",
-                         getpid(), write_immed ? "imm_":"");
+                         getpid(), write_immed && write_only ? "imm_":"");
 
        if (write_only || !rdma_read)
                goto complete;
@@ -878,7 +879,6 @@ int main(int argc, char **argv)
        }
 
 cleanup:
-       flush_evds();
        failed++;
 complete:
 
@@ -903,6 +903,8 @@ complete:
                        h_ep = DAT_HANDLE_NULL;
                }
        }
+       if (connected)
+               flush_evds();
 
        /* free EVDs */
        LOGPRINTF("%d destroy events\n", getpid());
@@ -1193,6 +1195,7 @@ DAT_RETURN connect_ep(char *hostname,
                printf("%d Server is waiting for client connection to send"
                        " server info\n",
                        getpid());
+               fflush(stdout);
                if (send_server_params(ser_sa)) {
                        printf("%d Failed to send server params\n", getpid());
                        return -1;
@@ -1408,6 +1411,7 @@ no_resolution:
 
        printf("\n%d CONNECTED!\n\n", getpid());
        connected = 1;
+       fflush(stdout);
 
 #if CONNECT_ONLY
        return 0;
@@ -1425,7 +1429,7 @@ no_resolution:
        ret = send_msg(p_rmr_snd,
                       sizeof(DAT_RMR_TRIPLET),
                       lmr_context_send_msg,
-                      cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+                      cookie, DAT_COMPLETION_DEFAULT_FLAG);
 
        if (ret != DAT_SUCCESS) {
                fprintf(stderr, "%d Error send_msg: %s\n",
@@ -1571,15 +1575,17 @@ void disconnect_ep(void)
 DAT_RETURN do_rdma_write_with_msg(void)
 {
        DAT_EVENT event;
+       DAT_DTO_COMPLETION_EVENT_DATA *dto_event;
        DAT_LMR_TRIPLET l_iov[MSG_IOV_COUNT];
        DAT_RMR_TRIPLET r_iov;
        DAT_DTO_COOKIE cookie;
        DAT_RETURN ret;
+       DAT_COMPLETION_FLAGS flags;
        int i;
 
        printf("\n %d RDMA WRITE DATA with SEND MSG\n\n", getpid());
 
-       cookie.as_64 = 0x5555;
+       dto_event = &event.event_data.dto_completion_event_data;
 
        if (recv_msg_index >= MSG_BUF_COUNT)
                return (DAT_ABORT);
@@ -1592,6 +1598,9 @@ DAT_RETURN do_rdma_write_with_msg(void)
        else
                strcpy((char *)sbuf, "client RDMA write data...");
 
+       if  (uni_direction && server)
+               goto rmsg;
+
        for (i = 0; i < MSG_IOV_COUNT; i++) {
                l_iov[i].lmr_context = lmr_context_send;
                l_iov[i].segment_length = buf_len / MSG_IOV_COUNT;
@@ -1605,26 +1614,50 @@ DAT_RETURN do_rdma_write_with_msg(void)
 
        start = get_time();
        for (i = 0; i < burst; i++) {
-               cookie.as_64 = 0x9999;
+               if (!((i+1) % signal_rate))
+                       flags =  DAT_COMPLETION_DEFAULT_FLAG;
+               else
+                       flags = DAT_COMPLETION_SUPPRESS_FLAG;
+
+               cookie.as_64 = i;
+               LOGPRINTF("%d rdma_write # %d %s\n",
+                         getpid(), i + 1, flags ? "SUPPRESS":"SIGNAL");
                ret = dat_ep_post_rdma_write(h_ep,      // ep_handle
                                             MSG_IOV_COUNT,     // num_segments
                                             l_iov,     // LMR
                                             cookie,    // user_cookie
                                             &r_iov,    // RMR
-                                            DAT_COMPLETION_SUPPRESS_FLAG);
+                                            flags);
                if (ret != DAT_SUCCESS) {
                        fprintf(stderr,
                                "%d: ERROR: dat_ep_post_rdma_write() %s\n",
                                getpid(), DT_RetToStr(ret));
                        return (DAT_ABORT);
                }
+               if (flags == DAT_COMPLETION_DEFAULT_FLAG) {
+                       if (collect_event(h_dto_req_evd,
+                                         &event,
+                                         DTO_TIMEOUT,
+                                         &rdma_wr_poll_count) != DAT_SUCCESS) {
+                               printf("%d %s RDMA write buffer contains: %s\n",
+                                      getpid(), server ? "SERVER:" : "CLIENT:", rbuf);
+                               return (DAT_ABORT);
+                       }
+                       if (dto_event->status ||
+                           dto_event->user_cookie.as_64 != i) {
+                               fprintf(stderr, "ERROR rdma_write: cookie="
+                                               " "F64x " exp 0x%x st 0x%x\n",
+                                       dto_event->user_cookie.as_64, i,
+                                       dto_event->status);
+                               return (DAT_ABORT);
+                       }
+               }
                LOGPRINTF("%d rdma_write # %d completed\n", getpid(), i + 1);
        }
 
-       /*
-        *  Send RMR information a 2nd time to indicate completion
-        *  NOTE: already swapped to network order in connect_ep
-        */
+       if (server)
+               goto rmsg;
+smsg:
        printf("%d Sending RDMA WRITE completion message\n", getpid());
 
        ret = send_msg(p_rmr_snd,
@@ -1639,14 +1672,18 @@ DAT_RETURN do_rdma_write_with_msg(void)
        } else {
                LOGPRINTF("%d send_msg completed\n", getpid());
        }
-
+       if (server)
+               goto acked;
+rmsg:
        /* inbound recv event, send completion's suppressed */
        if (collect_event(h_dto_rcv_evd,
                          &event,
                          DTO_TIMEOUT,
-                         &rdma_wr_poll_count) != DAT_SUCCESS)
+                         &rdma_wr_poll_count) != DAT_SUCCESS) {
+               printf("%d %s RDMA write buffer contains: %s\n",
+                       getpid(), server ? "SERVER:" : "CLIENT:", rbuf);
                return (DAT_ABORT);
-
+       }
        stop = get_time();
        ts.rdma_wr = ((stop - start) * 1.0e6);
 
@@ -1663,15 +1700,18 @@ DAT_RETURN do_rdma_write_with_msg(void)
            || (event.event_data.dto_completion_event_data.user_cookie.as_64 !=
                recv_msg_index)) {
                fprintf(stderr,
-                       "unexpected event data for receive: len=%d cookie=" F64x
-                       " exp %d/%d\n",
-                       (int)event.event_data.dto_completion_event_data.
-                       transfered_length,
+                       "unexpected event data for receive: st=%d len=%d"
+                       "cookie=" F64x " exp %d/%d\n",
+                       event.event_data.dto_completion_event_data.status,
+                       (int)event.event_data.dto_completion_event_data.transfered_length,
                        event.event_data.dto_completion_event_data.user_cookie.
                        as_64, (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
 
                return (DAT_ABORT);
        }
+       if (server)
+               goto smsg;
+acked:
 
        /* swap received RMR msg: network order to host order */
        r_iov = p_rmr_rcv[recv_msg_index];
@@ -1787,73 +1827,80 @@ DAT_RETURN do_rdma_write_imm_with_msg(void)
                }
        }
 
+       if (uni_direction && !server)
+               goto smsg;
 done:
-       if ((!uni_direction) || (uni_direction && server)) {
-               /* Wait to RECEIVE the LAST message, immediate data expected */
-               LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid());
-               if (collect_event(h_dto_rcv_evd,
-                                 &event,
-                                 DTO_TIMEOUT,
-                                 &rdma_wr_poll_count) != DAT_SUCCESS)
-                       return (DAT_ABORT);
-
-               if (event.event_number != (int)DAT_IB_DTO_EVENT ||
-                   ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA ||
-                   ext_event->val.immed.data != 0x7777) {
-                       printf("unexpected event 0x%x type 0x%x or idata 0x%x"
-                              ", waiting for RW-IMMED #0x%x\n",
-                              event.event_number, ext_event->type,
-                              ext_event->val.immed.data, DAT_IB_DTO_EVENT);
-                       return (DAT_ABORT);
-               }
-               recv_msg_index++;
+       /* Wait to RECEIVE the LAST message, immediate data expected */
+       LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid());
+       if (collect_event(h_dto_rcv_evd,
+                         &event,
+                         DTO_TIMEOUT,
+                         &rdma_wr_poll_count) != DAT_SUCCESS)
+               return (DAT_ABORT);
 
-               /* Send last message received ACK message back */
-               cookie.as_64 = 0x9999;
-               ret = send_msg(p_rmr_snd,
-                              sizeof(DAT_RMR_TRIPLET),
-                              lmr_context_send_msg,
-                              cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+       if (event.event_number != (int)DAT_IB_DTO_EVENT ||
+           ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA ||
+           ext_event->val.immed.data != 0x7777) {
+               printf("unexpected event 0x%x type 0x%x or idata 0x%x"
+                      ", waiting for RW-IMMED #0x%x\n",
+                      event.event_number, ext_event->type,
+                      ext_event->val.immed.data, DAT_IB_DTO_EVENT);
+               return (DAT_ABORT);
+       }
+       recv_msg_index++;
 
-               if (ret != DAT_SUCCESS) {
-                       fprintf(stderr, "%d Error send_msg: %s\n",
-                               getpid(), DT_RetToStr(ret));
-                       return (ret);
-               } else {
-                       LOGPRINTF("%d send_msg completed\n", getpid());
-               }
+       if (server)
+               goto rmsg;
+smsg:
+       printf("%d sending LAST msg ACK to remote\n", getpid());
+       /* Send last message received ACK message back */
+       cookie.as_64 = 0x9999;
+       ret = send_msg(p_rmr_snd,
+                      sizeof(DAT_RMR_TRIPLET),
+                      lmr_context_send_msg,
+                      cookie, DAT_COMPLETION_SUPPRESS_FLAG);
 
+       if (ret != DAT_SUCCESS) {
+               fprintf(stderr, "%d Error send_msg: %s\n",
+                       getpid(), DT_RetToStr(ret));
+               return (ret);
+       } else {
+               LOGPRINTF("%d send_msg completed\n", getpid());
        }
 
-       if (!uni_direction || !server) {
-               /* Wait for my LAST message ACK from remote side */
-               printf("%d waiting for LAST msg ACK from remote\n", getpid());
-               if (collect_event(h_dto_rcv_evd,
-                                 &event,
-                                 DTO_TIMEOUT,
-                                 &rdma_wr_poll_count) != DAT_SUCCESS)
-                       return (DAT_ABORT);
+       if (server)
+               goto acked;
+rmsg:
+       /* Wait for my LAST message ACK from remote side */
+       printf("%d waiting for LAST msg ACK from remote\n", getpid());
+       if (collect_event(h_dto_rcv_evd,
+                         &event,
+                         DTO_TIMEOUT,
+                         &rdma_wr_poll_count) != DAT_SUCCESS)
+               return (DAT_ABORT);
 
-               printf("%d LAST rdma write ACK message arrived!\n", getpid());
-               if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
-                       fprintf(stderr, "%d Error unexpected DTO event : %s\n",
-                               getpid(), DT_EventToStr(event.event_number));
-                       return (DAT_ABORT);
-               }
+       printf("%d LAST rdma write ACK message arrived!\n", getpid());
+       if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
+               fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+                       getpid(), DT_EventToStr(event.event_number));
+               return (DAT_ABORT);
+       }
 
-               if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET))
-                   || (dto_event->user_cookie.as_64 != recv_msg_index)) {
-                       fprintf(stderr,
-                               "unexpected event data for receive: len=%d "
-                               "cookie=" F64x" exp %d/%d\n",
-                               (int)dto_event->transfered_length,
-                               dto_event->user_cookie.as_64,
-                               (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
-                       return (DAT_ABORT);
-               }
-               printf("%d LAST RDMA_WRITE ACK from remote \n", getpid());
+       if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET))
+           || (dto_event->user_cookie.as_64 != recv_msg_index)) {
+               fprintf(stderr,
+                       "unexpected event data for receive: len=%d "
+                       "cookie=" F64x" exp %d/%d\n",
+                       (int)dto_event->transfered_length,
+                       dto_event->user_cookie.as_64,
+                       (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
+               return (DAT_ABORT);
        }
+       printf("%d LAST RDMA_WRITE ACK from remote \n", getpid());
 
+       if (server)
+               goto smsg;
+acked:
        stop = get_time();
        ts.rdma_wr = ((stop - start) * 1.0e6);