]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
dapl: add rdma_write_imm and write only option to dtest
authorArlin Davis <arlin.r.davis@intel.com>
Mon, 15 Dec 2014 20:05:33 +0000 (12:05 -0800)
committerArlin Davis <arlin.r.davis@intel.com>
Mon, 15 Dec 2014 20:05:33 +0000 (12:05 -0800)
New write_only (-w) option with rdma_write_imm can
be used with providers that support IB extensions.
Allows more options for write bandwith profiling
with immediate data and signaling rate options
to increase write data rates, especially on MIC
clients that use proxy services.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
test/dtest/dtest.c

index 1a37c18e836c85aba9def07f03fc65a0ed58c079..c9e58ec245f028a6eb8bbd3d579d196d47d64f5a 100755 (executable)
@@ -78,7 +78,7 @@
 #include <unistd.h>
 #include <stdlib.h>
 
-#define DAPL_PROVIDER "ofa-v2-ib0"
+#define DAPL_PROVIDER "ofa-v2-mlx4_0-1u"
 
 #define F64x "%"PRIx64""
 
 
 /* Header files needed for DAT/uDAPL */
 #include "dat2/udat.h"
+#include "dat2/dat_ib_extensions.h"
 
 /* definitions */
 #define SERVER_CONN_QUAL  45248
 #define DTO_FLUSH_TIMEOUT (1000*1000*2)
 #define CONN_TIMEOUT      (1000*1000*100)
 #define SERVER_TIMEOUT    DAT_TIMEOUT_INFINITE
-#define RDMA_BUFFER_SIZE  (64)
+#define RDMA_BUFFER_SIZE  (4*1024*1024)
 
 /* Global DAT vars */
 static DAT_IA_HANDLE h_ia = DAT_HANDLE_NULL;
@@ -139,8 +140,7 @@ static DAT_VADDR registered_addr_recv;
 
 /* Initial msg receive buf, RMR exchange, and Rdma-write notification */
 #define MSG_BUF_COUNT     3
-#define MSG_IOV_COUNT     2
-static DAT_RMR_TRIPLET rmr_recv_msg[MSG_BUF_COUNT];
+#define MSG_IOV_COUNT     1
 static DAT_LMR_HANDLE h_lmr_recv_msg = DAT_HANDLE_NULL;
 static DAT_LMR_CONTEXT lmr_context_recv_msg;
 static DAT_RMR_CONTEXT rmr_context_recv_msg;
@@ -148,7 +148,6 @@ static DAT_VLEN registered_size_recv_msg;
 static DAT_VADDR registered_addr_recv_msg;
 
 /* message send buffer */
-static DAT_RMR_TRIPLET rmr_send_msg;
 static DAT_LMR_HANDLE h_lmr_send_msg = DAT_HANDLE_NULL;
 static DAT_LMR_CONTEXT lmr_context_send_msg;
 static DAT_RMR_CONTEXT rmr_context_send_msg;
@@ -159,6 +158,10 @@ char hostname[256] = { 0 };
 char provider[64] = DAPL_PROVIDER;
 char addr_str[INET_ADDRSTRLEN];
 
+/* allocate RMR message buffers page aligned */
+static DAT_RMR_TRIPLET *p_rmr_rcv;
+static DAT_RMR_TRIPLET *p_rmr_snd;
+
 /* rdma pointers */
 char *rbuf = NULL;
 char *sbuf = NULL;
@@ -192,9 +195,15 @@ struct dt_time ts;
 
 /* defaults */
 static int failed = 0;
+static int uni_direction = 0;
+static int align_data=1;
+static int rdma_read = 0;
+static int write_only = 0;
+static int write_immed = 0;
 static int performance_times = 0;
 static int connected = 0;
-static int burst = 10;
+static int burst = 100;
+static int signal_rate = 10;
 static int server = 1;
 static int verbose = 0;
 static int polling = 0;
@@ -209,6 +218,7 @@ static int recv_msg_index = 0;
 static int burst_msg_posted = 0;
 static int burst_msg_index = 0;
 static int ucm = 0;
+static int rq_cnt, sq_cnt;
 static DAT_SOCK_ADDR6 remote;
 
 /* forward prototypes */
@@ -231,6 +241,7 @@ DAT_RETURN register_rdma_memory(void);
 DAT_RETURN unregister_rdma_memory(void);
 DAT_RETURN create_events(void);
 DAT_RETURN destroy_events(void);
+DAT_RETURN do_rdma_write_imm_with_msg(void);
 DAT_RETURN do_rdma_write_with_msg(void);
 DAT_RETURN do_rdma_read_with_msg(void);
 DAT_RETURN do_ping_pong_msg(void);
@@ -546,8 +557,20 @@ int main(int argc, char **argv)
        DAT_PROVIDER_ATTR pr_attr;
 
        /* parse arguments */
-       while ((c = getopt(argc, argv, "tscvpb:d:B:h:P:")) != -1) {
+       while ((c = getopt(argc, argv, "auwtscvpb:d:B:h:P:S:")) != -1) {
                switch (c) {
+               case 'a':
+                       align_data = 1;
+                       fflush(stdout);
+                       break;
+               case 'u':
+                       uni_direction = 1;
+                       fflush(stdout);
+                       break;
+               case 'w':
+                       write_only = 1;
+                       fflush(stdout);
+                       break;
                case 't':
                        performance_times = 1;
                        fflush(stdout);
@@ -587,6 +610,9 @@ int main(int argc, char **argv)
                case 'P':
                        strcpy(provider, optarg);
                        break;
+               case 'S':
+                       signal_rate = atoi(optarg);
+                       break;
                default:
                        print_usage();
                        exit(-12);
@@ -606,6 +632,13 @@ int main(int argc, char **argv)
                }
        }
 #endif
+       memset(&ts, 0, sizeof(struct dt_time));
+
+       if (signal_rate > burst)
+               signal_rate = burst;
+
+       rq_cnt = MSG_BUF_COUNT + (burst);
+       sq_cnt = MSG_BUF_COUNT + MAX_RDMA_RD + signal_rate;
 
        if (!server) {
                printf("%d Running as client - waiting for server input\n",
@@ -615,23 +648,39 @@ int main(int argc, char **argv)
                                getpid());
                        exit(1);
                }
-               printf("%d Running as %s client\n", getpid(), provider);
+               printf("%d Running as %s client v2 \n", getpid(), provider);
 
        } else {
-               printf("%d Running as server - %s\n", getpid(), provider);
+               printf("%d Running as server - %s v2 \n", getpid(), provider);
        }
        fflush(stdout);
 
-       /* allocate send and receive buffers */
-       if (((rbuf = malloc(buf_len * (burst+1))) == NULL) ||
-           ((sbuf = malloc(buf_len * (burst+1))) == NULL)) {
-               perror("malloc");
-               exit(1);
+       if (align_data) {
+               /* allocate send and receive buffers */
+               if (posix_memalign((void**)&rbuf, 4096, buf_len * (burst+1)) ||
+                   posix_memalign((void**)&sbuf, 4096, buf_len * (burst+1))) {
+                       perror("malloc");
+                       exit(1);
+               }
+       } else {
+               /* allocate send and receive buffers */
+               if (((rbuf = malloc(buf_len * (burst+1))) == NULL) ||
+                   ((sbuf = malloc(buf_len * (burst+1))) == NULL)) {
+                       perror("malloc");
+                       exit(1);
+               }
        }
-       memset(&ts, 0, sizeof(struct dt_time));
        LOGPRINTF("%d Allocated RDMA buffers (r:%p,s:%p) len %d \n",
                  getpid(), rbuf, sbuf, buf_len);
 
+       if (posix_memalign((void**)&p_rmr_rcv, 4096, 4096) ||
+           posix_memalign((void**)&p_rmr_snd, 4096, 4096)) {
+               perror("malloc");
+               exit(1);
+       }
+       LOGPRINTF("%d Allocated RMR buffers (r:%p,s:%p) len %d \n",
+                 getpid(), p_rmr_rcv, p_rmr_snd, 4096);
+
        /* dat_ia_open, dat_pz_create */
        h_async_evd = DAT_HANDLE_NULL;
        start = get_time();
@@ -656,12 +705,18 @@ int main(int argc, char **argv)
        }
        print_ia_address(ia_attr.ia_address_ptr);
 
+       if (ia_attr.extension_supported == DAT_EXTENSION_IB)
+               write_immed = 1;
+
        /* Provider specific attributes */
        for (i=0; i<pr_attr.num_provider_specific_attr; i++) {
                LOGPRINTF("%d Provider_attr[%d] %s = %s \n",
                          getpid(), i,
                          pr_attr.provider_specific_attr[i].name,
                          pr_attr.provider_specific_attr[i].value);
+               if (!strcmp(pr_attr.provider_specific_attr[i].name,"DAT_IB_RDMA_READ") &&
+                   !strcmp(pr_attr.provider_specific_attr[i].value,"TRUE") && !write_only)
+                       rdma_read = 1;
        }
 
        /* Create Protection Zone */
@@ -784,13 +839,21 @@ int main(int argc, char **argv)
 #endif
 
        /*********** RDMA write data *************/
-       ret = do_rdma_write_with_msg();
+       if ((write_immed) && (write_only))
+               ret = do_rdma_write_imm_with_msg();
+       else
+               ret = do_rdma_write_with_msg();
+
        if (ret != DAT_SUCCESS) {
-               fprintf(stderr, "%d Error do_rdma_write_with_msg: %s\n",
-                       getpid(), DT_RetToStr(ret));
+               fprintf(stderr, "%d Error do_rdma_write_%swith_msg: %s\n",
+                       getpid(), write_immed ? "imm_":"", DT_RetToStr(ret));
                goto cleanup;
        } else
-               LOGPRINTF("%d do_rdma_write_with_msg complete\n", getpid());
+               LOGPRINTF("%d do_rdma_write_%swith_msg complete\n",
+                         getpid(), write_immed ? "imm_":"");
+
+       if (write_only || !rdma_read)
+               goto complete;
 
        /*********** RDMA read data *************/
        ret = do_rdma_read_with_msg();
@@ -886,23 +949,21 @@ complete:
        free(rbuf);
        free(sbuf);
 
-       printf("\n%d: DAPL Test Complete. %s\n\n",
-              getpid(), failed ? "FAILED" : "PASSED");
+       if (ts.rtt)
+               printf("%d: Message RTT: Total=%6.2lf usec, %d bursts, itime=%6.2lf usec, pc=%d\n",
+                       getpid(), ts.rtt, burst, ts.rtt / burst, poll_count);
+       if (ts.rdma_wr && (!server || (server && !uni_direction))) {
+               int msgs = uni_direction ? burst : burst * 2;
 
-       fflush(stderr);
-       fflush(stdout);
+               printf("\n%d: RDMA write (%s): Total=%6.2lf usec, itime=%6.2lf us, poll=%d, %d x %d, %4.2lf MB/sec\n",
+                       getpid(), uni_direction ? "uni-direction" : "bi-direction",
+                       ts.rdma_wr, ts.rdma_wr / msgs, rdma_wr_poll_count, msgs, buf_len,
+                       (double)(1/(ts.rdma_wr/msgs/buf_len)));
+       }
 
        if (!performance_times)
-               exit(0);
+               goto finish;
 
-       printf("\n%d: DAPL Test Complete.\n\n", getpid());
-       printf("%d: Message RTT: Total=%10.2lf usec, %d bursts, itime=%10.2lf"
-              " usec, pc=%d\n",
-              getpid(), ts.rtt, burst, ts.rtt / burst, poll_count);
-       printf("%d: RDMA write:  Total=%10.2lf usec, %d bursts, itime=%10.2lf"
-              " usec, pc=%d\n",
-              getpid(), ts.rdma_wr, burst,
-              ts.rdma_wr / burst, rdma_wr_poll_count);
        for (i = 0; i < MAX_RDMA_RD; i++) {
                printf("%d: RDMA read:   Total=%10.2lf usec,   %d bursts, "
                       "itime=%10.2lf usec, pc=%d\n",
@@ -928,6 +989,13 @@ complete:
                       getpid(), ts.conn, conn_poll_count);
        printf("%d: TOTAL:     %10.2lf usec\n", getpid(), ts.total);
 
+finish:
+       printf("\n%d: DAPL Test Complete. %s\n\n",
+               getpid(), failed ? "FAILED" : "PASSED");
+
+       fflush(stderr);
+       fflush(stdout);
+
 #if defined(_WIN32) || defined(_WIN64)
        WSACleanup();
 #endif
@@ -1036,8 +1104,8 @@ DAT_RETURN connect_ep(char *hostname,
 
        /* Register send message buffer */
        LOGPRINTF("%d Registering send Message Buffer %p, len %d\n",
-                 getpid(), &rmr_send_msg, (int)sizeof(DAT_RMR_TRIPLET));
-       region.for_va = &rmr_send_msg;
+                 getpid(), p_rmr_snd, (int)sizeof(DAT_RMR_TRIPLET));
+       region.for_va = p_rmr_snd;
        ret = dat_lmr_create(h_ia,
                             DAT_MEM_TYPE_VIRTUAL,
                             region,
@@ -1061,8 +1129,8 @@ DAT_RETURN connect_ep(char *hostname,
 
        /* Register Receive buffers */
        LOGPRINTF("%d Registering Receive Message Buffer %p\n",
-                 getpid(), rmr_recv_msg);
-       region.for_va = rmr_recv_msg;
+                 getpid(), p_rmr_rcv);
+       region.for_va = p_rmr_rcv;
        ret = dat_lmr_create(h_ia,
                             DAT_MEM_TYPE_VIRTUAL,
                             region,
@@ -1087,15 +1155,14 @@ DAT_RETURN connect_ep(char *hostname,
                cookie.as_64 = i;
                l_iov.lmr_context = lmr_context_recv_msg;
 #if defined(_WIN32)
-               l_iov.virtual_address = (DAT_VADDR) & rmr_recv_msg[i];
+               l_iov.virtual_address = (DAT_VADDR) &p_rmr_rcv[i];
 #else
-               l_iov.virtual_address =
-                   (DAT_VADDR) (unsigned long)&rmr_recv_msg[i];
+               l_iov.virtual_address = (DAT_VADDR) (unsigned long)&p_rmr_rcv[i];
 #endif
                l_iov.segment_length = sizeof(DAT_RMR_TRIPLET);
 
                LOGPRINTF("%d Posting Receive Message Buffer %p\n",
-                         getpid(), &rmr_recv_msg[i]);
+                         getpid(), &p_rmr_rcv[i]);
                ret = dat_ep_post_recv(h_ep,
                                       1,
                                       &l_iov,
@@ -1343,18 +1410,17 @@ no_resolution:
 #if CONNECT_ONLY
        return 0;
 #endif
-
        /*
         *  Setup our remote memory and tell the other side about it
         */
-       rmr_send_msg.virtual_address = htonll((DAT_VADDR) (uintptr_t) rbuf);
-       rmr_send_msg.segment_length = htonl(RDMA_BUFFER_SIZE);
-       rmr_send_msg.rmr_context = htonl(rmr_context_recv);
+       p_rmr_snd->virtual_address = htonll((DAT_VADDR) (uintptr_t) rbuf);
+       p_rmr_snd->segment_length = htonl(RDMA_BUFFER_SIZE);
+       p_rmr_snd->rmr_context = htonl(rmr_context_recv);
 
        printf("%d Send RMR msg to remote: r_key_ctx=0x%x,va=%p,len=0x%x\n",
               getpid(), rmr_context_recv, rbuf, RDMA_BUFFER_SIZE);
 
-       ret = send_msg(&rmr_send_msg,
+       ret = send_msg(p_rmr_snd,
                       sizeof(DAT_RMR_TRIPLET),
                       lmr_context_send_msg,
                       cookie, DAT_COMPLETION_SUPPRESS_FLAG);
@@ -1378,8 +1444,9 @@ no_resolution:
        printf("%d remote RMR data arrived!\n", getpid());
 
        if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
-               fprintf(stderr, "%d Error unexpected DTO event : %s\n",
-                       getpid(), DT_EventToStr(event.event_number));
+               fprintf(stderr, "%d Error unexpected DTO event: 0x%x %s\n",
+                       getpid(), event.event_number,
+                       DT_EventToStr(event.event_number));
                return (DAT_ABORT);
        }
        if ((event.event_data.dto_completion_event_data.transfered_length !=
@@ -1397,18 +1464,16 @@ no_resolution:
        }
 
        /* swap received RMR msg: network order to host order */
-       r_iov = rmr_recv_msg[recv_msg_index];
-       rmr_recv_msg[recv_msg_index].rmr_context = ntohl(r_iov.rmr_context);
-       rmr_recv_msg[recv_msg_index].virtual_address =
-           ntohll(r_iov.virtual_address);
-       rmr_recv_msg[recv_msg_index].segment_length =
-           ntohl(r_iov.segment_length);
+       r_iov = p_rmr_rcv[recv_msg_index];
+       p_rmr_rcv[recv_msg_index].rmr_context = ntohl(r_iov.rmr_context);
+       p_rmr_rcv[recv_msg_index].virtual_address = ntohll(r_iov.virtual_address);
+       p_rmr_rcv[recv_msg_index].segment_length = ntohl(r_iov.segment_length);
 
        printf("%d Received RMR from remote: "
               "r_iov: r_key_ctx=%x,va=" F64x ",len=0x%x\n",
-              getpid(), rmr_recv_msg[recv_msg_index].rmr_context,
-              rmr_recv_msg[recv_msg_index].virtual_address,
-              rmr_recv_msg[recv_msg_index].segment_length);
+              getpid(), p_rmr_rcv[recv_msg_index].rmr_context,
+              p_rmr_rcv[recv_msg_index].virtual_address,
+              p_rmr_rcv[recv_msg_index].segment_length);
 
        recv_msg_index++;
 
@@ -1518,7 +1583,7 @@ DAT_RETURN do_rdma_write_with_msg(void)
                return (DAT_ABORT);
 
        /* get RMR information from previously received message */
-       r_iov = rmr_recv_msg[recv_msg_index - 1];
+       r_iov = p_rmr_rcv[recv_msg_index - 1];
 
        if (server)
                strcpy((char *)sbuf, "server RDMA write data...");
@@ -1560,7 +1625,7 @@ DAT_RETURN do_rdma_write_with_msg(void)
         */
        printf("%d Sending RDMA WRITE completion message\n", getpid());
 
-       ret = send_msg(&rmr_send_msg,
+       ret = send_msg(p_rmr_snd,
                       sizeof(DAT_RMR_TRIPLET),
                       lmr_context_send_msg,
                       cookie, DAT_COMPLETION_SUPPRESS_FLAG);
@@ -1574,12 +1639,12 @@ DAT_RETURN do_rdma_write_with_msg(void)
        }
 
        /* inbound recv event, send completion's suppressed */
-       if (collect_event(h_dto_rcv_evd, 
-                         &event, 
-                         DTO_TIMEOUT, 
+       if (collect_event(h_dto_rcv_evd,
+                         &event,
+                         DTO_TIMEOUT,
                          &rdma_wr_poll_count) != DAT_SUCCESS)
                return (DAT_ABORT);
-       
+
        stop = get_time();
        ts.rdma_wr = ((stop - start) * 1.0e6);
 
@@ -1607,19 +1672,19 @@ DAT_RETURN do_rdma_write_with_msg(void)
        }
 
        /* swap received RMR msg: network order to host order */
-       r_iov = rmr_recv_msg[recv_msg_index];
-       rmr_recv_msg[recv_msg_index].virtual_address =
-           ntohll(rmr_recv_msg[recv_msg_index].virtual_address);
-       rmr_recv_msg[recv_msg_index].segment_length =
-           ntohl(rmr_recv_msg[recv_msg_index].segment_length);
-       rmr_recv_msg[recv_msg_index].rmr_context =
-           ntohl(rmr_recv_msg[recv_msg_index].rmr_context);
+       r_iov = p_rmr_rcv[recv_msg_index];
+       p_rmr_rcv[recv_msg_index].virtual_address =
+           ntohll(p_rmr_rcv[recv_msg_index].virtual_address);
+       p_rmr_rcv[recv_msg_index].segment_length =
+           ntohl(p_rmr_rcv[recv_msg_index].segment_length);
+       p_rmr_rcv[recv_msg_index].rmr_context =
+           ntohl(p_rmr_rcv[recv_msg_index].rmr_context);
 
        printf("%d Received RMR from remote: "
               "r_iov: r_key_ctx=%x,va=" F64x ",len=0x%x\n",
-              getpid(), rmr_recv_msg[recv_msg_index].rmr_context,
-              rmr_recv_msg[recv_msg_index].virtual_address,
-              rmr_recv_msg[recv_msg_index].segment_length);
+              getpid(), p_rmr_rcv[recv_msg_index].rmr_context,
+              p_rmr_rcv[recv_msg_index].virtual_address,
+              p_rmr_rcv[recv_msg_index].segment_length);
 
        LOGPRINTF("%d inbound rdma_write; send msg event SUCCESS!!\n",
                  getpid());
@@ -1632,6 +1697,177 @@ DAT_RETURN do_rdma_write_with_msg(void)
        return (DAT_SUCCESS);
 }
 
+DAT_RETURN do_rdma_write_imm_with_msg(void)
+{
+       DAT_EVENT event;
+       DAT_LMR_TRIPLET l_iov[MSG_IOV_COUNT];
+       DAT_RMR_TRIPLET r_iov;
+       DAT_DTO_COOKIE cookie;
+       DAT_RETURN ret;
+       int i, flags = DAT_COMPLETION_SUPPRESS_FLAG;
+       DAT_DTO_COMPLETION_EVENT_DATA *dto_event =
+               &event.event_data.dto_completion_event_data;
+       DAT_IB_EXTENSION_EVENT_DATA *ext_event =
+               (DAT_IB_EXTENSION_EVENT_DATA*) event.event_extension_data;
+
+       printf("\n %d RDMA WRITE IMM DATA with SEND MSG\n\n", getpid());
+
+       cookie.as_64 = 0x5555;
+
+       if (recv_msg_index >= MSG_BUF_COUNT)
+               return (DAT_ABORT);
+
+       /* get RMR information from previously received message */
+       r_iov = p_rmr_rcv[recv_msg_index - 1];
+
+       if (server)
+               strcpy((char *)sbuf, "server RDMA write data...");
+       else
+               strcpy((char *)sbuf, "client RDMA write data...");
+
+       for (i = 0; i < MSG_IOV_COUNT; i++) {
+               l_iov[i].lmr_context = lmr_context_send;
+               l_iov[i].segment_length = buf_len / MSG_IOV_COUNT;
+               l_iov[i].virtual_address = (DAT_VADDR) (uintptr_t)
+                   (&sbuf[l_iov[i].segment_length * i]);
+
+               LOGPRINTF("%d rdma_write iov[%d] buf=%p,len=%d\n",
+                         getpid(), i, &sbuf[l_iov[i].segment_length * i],
+                         l_iov[i].segment_length);
+       }
+
+       if  (uni_direction && server)
+               goto done;
+
+       start = get_time();
+       for (i = 0; i < burst; i++) {
+               if (i==0)
+                       sprintf(&sbuf[25],"rdma_writes= ");
+
+               sprintf(&sbuf[25], "rdma writes completed == %d", i+1);
+               sbuf[buf_len-1] = i;
+               if (!((i+1) % signal_rate))
+                       flags =  DAT_COMPLETION_DEFAULT_FLAG;
+               else
+                       flags = DAT_COMPLETION_SUPPRESS_FLAG;
+
+               cookie.as_64 = i;
+               LOGPRINTF("%d rdma_write # %d %s\n", getpid(), i + 1, flags ? "SUPPRESS":"SIGNAL");
+               /* last message is write_immed with buf_len as imm_data */
+               if (i == (burst - 1)) {
+                       ret = dat_ib_post_rdma_write_immed(
+                               h_ep, MSG_IOV_COUNT, l_iov, cookie,
+                               &r_iov,  0x7777, flags);
+               } else {
+                       ret = dat_ep_post_rdma_write(h_ep, MSG_IOV_COUNT,
+                                                    l_iov, cookie, &r_iov,
+                                                    flags);
+               }
+               if (ret != DAT_SUCCESS) {
+                       fprintf(stderr, "%d: ERROR: dat_rdma_write() %s\n",
+                                       getpid(), DT_RetToStr(ret));
+                       return (DAT_ABORT);
+               }
+               LOGPRINTF("%d rdma_write # %d completed\n", getpid(), i + 1);
+
+               if (flags == DAT_COMPLETION_DEFAULT_FLAG) {
+                       if (collect_event(h_dto_req_evd,
+                                         &event,
+                                         DTO_TIMEOUT,
+                                         &rdma_wr_poll_count) != DAT_SUCCESS)
+                               return (DAT_ABORT);
+                       if (dto_event->user_cookie.as_64 != i) {
+                               fprintf(stderr, "ERROR rdma_write: cookie="
+                                               " "F64x " exp 0x%x\n",
+                                       dto_event->user_cookie.as_64, i);
+                               return (DAT_ABORT);
+                       }
+               }
+       }
+
+done:
+       if ((!uni_direction) || (uni_direction && server)) {
+               /* Wait to RECEIVE the LAST message, immediate data expected */
+               LOGPRINTF("%d Waiting for final inbound RW_imm from peer\n", getpid());
+               if (collect_event(h_dto_rcv_evd,
+                                 &event,
+                                 DTO_TIMEOUT,
+                                 &rdma_wr_poll_count) != DAT_SUCCESS)
+                       return (DAT_ABORT);
+
+               if (event.event_number != (int)DAT_IB_DTO_EVENT ||
+                   ext_event->type != DAT_IB_RDMA_WRITE_IMMED_DATA ||
+                   ext_event->val.immed.data != 0x7777) {
+                       printf("unexpected event 0x%x type 0x%x or idata 0x%x"
+                              ", waiting for RW-IMMED #0x%x\n",
+                              event.event_number, ext_event->type,
+                              ext_event->val.immed.data, DAT_IB_DTO_EVENT);
+                       return (DAT_ABORT);
+               }
+               recv_msg_index++;
+
+               /* Send last message received ACK message back */
+               cookie.as_64 = 0x9999;
+               ret = send_msg(p_rmr_snd,
+                              sizeof(DAT_RMR_TRIPLET),
+                              lmr_context_send_msg,
+                              cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+
+               if (ret != DAT_SUCCESS) {
+                       fprintf(stderr, "%d Error send_msg: %s\n",
+                               getpid(), DT_RetToStr(ret));
+                       return (ret);
+               } else {
+                       LOGPRINTF("%d send_msg completed\n", getpid());
+               }
+
+       }
+
+       if (!uni_direction || !server) {
+               /* Wait for my LAST message ACK from remote side */
+               printf("%d waiting for LAST msg ACK from remote\n", getpid());
+               if (collect_event(h_dto_rcv_evd,
+                                 &event,
+                                 DTO_TIMEOUT,
+                                 &rdma_wr_poll_count) != DAT_SUCCESS)
+                       return (DAT_ABORT);
+
+               printf("%d LAST rdma write ACK message arrived!\n", getpid());
+               if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
+                       fprintf(stderr, "%d Error unexpected DTO event : %s\n",
+                               getpid(), DT_EventToStr(event.event_number));
+                       return (DAT_ABORT);
+               }
+
+               if ((dto_event->transfered_length != sizeof(DAT_RMR_TRIPLET))
+                   || (dto_event->user_cookie.as_64 != recv_msg_index)) {
+                       fprintf(stderr,
+                               "unexpected event data for receive: len=%d "
+                               "cookie=" F64x" exp %d/%d\n",
+                               (int)dto_event->transfered_length,
+                               dto_event->user_cookie.as_64,
+                               (int)sizeof(DAT_RMR_TRIPLET), recv_msg_index);
+                       return (DAT_ABORT);
+               }
+               printf("%d LAST RDMA_WRITE ACK from remote \n", getpid());
+       }
+
+       stop = get_time();
+       ts.rdma_wr = ((stop - start) * 1.0e6);
+
+       LOGPRINTF("%d last rdma_write ACK'ed SUCCESS!!\n", getpid());
+
+       if (server || (!server && !uni_direction))
+               printf("%d %s RDMA write buffer contains: %s last byte=%d\n",
+                      getpid(), server ? "SERVER:" : "CLIENT:", rbuf, rbuf[buf_len-1]);
+
+       if (server && uni_direction)
+               sleep(1);
+
+       recv_msg_index++;
+       return (DAT_SUCCESS);
+}
+
 DAT_RETURN do_rdma_read_with_msg(void)
 {
        DAT_EVENT event;
@@ -1647,7 +1883,7 @@ DAT_RETURN do_rdma_read_with_msg(void)
                return (DAT_ABORT);
 
        /* get RMR information from previously received message */
-       r_iov = rmr_recv_msg[recv_msg_index - 1];
+       r_iov = p_rmr_rcv[recv_msg_index - 1];
 
        /* setup rdma read buffer to initial string to be overwritten */
        strcpy((char *)sbuf, "blah, blah, blah\n");
@@ -1731,7 +1967,7 @@ DAT_RETURN do_rdma_read_with_msg(void)
 #endif
        }
 
-       ret = send_msg(&rmr_send_msg,
+       ret = send_msg(p_rmr_snd,
                       sizeof(DAT_RMR_TRIPLET),
                       lmr_context_send_msg,
                       cookie, DAT_COMPLETION_SUPPRESS_FLAG);
@@ -1777,19 +2013,19 @@ DAT_RETURN do_rdma_read_with_msg(void)
        }
 
        /* swap received RMR msg: network order to host order */
-       r_iov = rmr_recv_msg[recv_msg_index];
-       rmr_recv_msg[recv_msg_index].virtual_address =
-           ntohll(rmr_recv_msg[recv_msg_index].virtual_address);
-       rmr_recv_msg[recv_msg_index].segment_length =
-           ntohl(rmr_recv_msg[recv_msg_index].segment_length);
-       rmr_recv_msg[recv_msg_index].rmr_context =
-           ntohl(rmr_recv_msg[recv_msg_index].rmr_context);
+       r_iov = p_rmr_rcv[recv_msg_index];
+       p_rmr_rcv[recv_msg_index].virtual_address =
+           ntohll(p_rmr_rcv[recv_msg_index].virtual_address);
+       p_rmr_rcv[recv_msg_index].segment_length =
+           ntohl(p_rmr_rcv[recv_msg_index].segment_length);
+       p_rmr_rcv[recv_msg_index].rmr_context =
+           ntohl(p_rmr_rcv[recv_msg_index].rmr_context);
 
        printf("%d Received RMR from remote: "
               "r_iov: r_key_ctx=%x,va=" F64x ",len=0x%x\n",
-              getpid(), rmr_recv_msg[recv_msg_index].rmr_context,
-              rmr_recv_msg[recv_msg_index].virtual_address,
-              rmr_recv_msg[recv_msg_index].segment_length);
+              getpid(), p_rmr_rcv[recv_msg_index].rmr_context,
+              p_rmr_rcv[recv_msg_index].virtual_address,
+              p_rmr_rcv[recv_msg_index].segment_length);
 
        LOGPRINTF("%d inbound rdma_write; send msg event SUCCESS!!\n",
                  getpid());
@@ -2313,6 +2549,8 @@ void print_usage(void)
 {
        printf("\n DAPL USAGE \n\n");
        printf("s: server\n");
+       printf("u: unidirectional bandwidth (default=bidirectional\n");
+       printf("w: rdma write only\n");
        printf("t: performance times\n");
        printf("c: use cno\n");
        printf("v: verbose\n");
@@ -2321,7 +2559,8 @@ void print_usage(void)
        printf("b: buf length to allocate\n");
        printf("B: burst count, rdma and msgs \n");
        printf("h: hostname/address of server, specified on client\n");
-       printf("P: provider name (default = OpenIB-cma)\n");
+       printf("P: provider name (default = ofa-v2-mlx4_0-1u)\n");
+       printf("S: signal_rate (default=10, completion every 10 iterations\n");
        printf("\n");
 }