static int align_data=1;
static int rdma_read = 0;
static int write_only = 0;
+static int write_only_pp = 0;
static int write_immed = 0;
static int performance_times = 0;
static int connected = 0;
-static int burst = 100;
+static int burst = 1000;
+static int msg_burst = 100;
static int signal_rate = 10;
static int server = 1;
static int verbose = 0;
DAT_RETURN destroy_events(void);
DAT_RETURN do_rdma_write_imm_with_msg(void);
DAT_RETURN do_rdma_write_with_msg(void);
+DAT_RETURN do_rdma_write_ping_pong(void);
DAT_RETURN do_rdma_read_with_msg(void);
DAT_RETURN do_ping_pong_msg(void);
DAT_EVENT event;
/* Flush async error queue */
- printf("%d: Checking ASYNC EVD...\n", getpid());
+ LOGPRINTF("%d: Checking ASYNC EVD...\n", getpid());
while (dat_evd_dequeue(h_async_evd, &event) == DAT_SUCCESS) {
- printf("%d ERR: ASYNC EVD ENTRY: handle=%p reason=%d\n", getpid(),
+ LOGPRINTF("%d ERR: ASYNC EVD ENTRY: handle=%p reason=%d\n", getpid(),
event.event_data.asynch_error_event_data.dat_handle,
event.event_data.asynch_error_event_data.reason);
}
/* Flush receive queue */
- printf("%d: Checking RECEIVE EVD...\n", getpid());
+ LOGPRINTF("%d: Checking RECEIVE EVD...\n", getpid());
while (dat_evd_dequeue(h_dto_rcv_evd, &event) == DAT_SUCCESS) {
- printf(" RCV EVD ENTRY: op=%d stat=%d ln=%d ck="F64x"\n",
+ LOGPRINTF(" RCV EVD ENTRY: op=%d stat=%d ln=%d ck="F64x"\n",
event.event_data.dto_completion_event_data.operation,
event.event_data.dto_completion_event_data.status,
event.event_data.dto_completion_event_data.transfered_length,
event.event_data.dto_completion_event_data.user_cookie.as_64);
}
/* Flush request queue */
- printf("%d: Checking REQUEST EVD...\n", getpid());
+ LOGPRINTF("%d: Checking REQUEST EVD...\n", getpid());
while (dat_evd_dequeue(h_dto_req_evd, &event) == DAT_SUCCESS) {
- printf(" REQ EVD ENTRY: op=%d stat=%d ln=%d ck="F64x"\n",
+ LOGPRINTF(" REQ EVD ENTRY: op=%d stat=%d ln=%d ck="F64x"\n",
event.event_data.dto_completion_event_data.operation,
event.event_data.dto_completion_event_data.status,
event.event_data.dto_completion_event_data.transfered_length,
DAT_PROVIDER_ATTR pr_attr;
/* parse arguments */
- while ((c = getopt(argc, argv, "auwtscvpb:d:B:h:P:S:")) != -1) {
+ while ((c = getopt(argc, argv, "auwWtscvpb:d:B:h:P:S:")) != -1) {
switch (c) {
case 'a':
align_data = 1;
write_only = 1;
fflush(stdout);
break;
+ case 'W':
+ write_only_pp = 1;
+ uni_direction = 1;
+ if (buf_len == RDMA_BUFFER_SIZE)
+ buf_len = 1;
+ signal_rate = 1;
+ burst = 1000;
+ fflush(stdout);
+ break;
case 't':
performance_times = 1;
fflush(stdout);
if (signal_rate > burst)
signal_rate = burst;
- rq_cnt = MSG_BUF_COUNT + (burst);
- sq_cnt = MSG_BUF_COUNT + MAX_RDMA_RD + signal_rate;
+ if (write_only || write_only_pp) {
+ rq_cnt = MSG_BUF_COUNT * 2;
+ sq_cnt = MSG_BUF_COUNT + MAX_RDMA_RD + signal_rate;
+ } else {
+ rq_cnt = MSG_BUF_COUNT + msg_burst;
+ sq_cnt = MSG_BUF_COUNT + MAX_RDMA_RD + msg_burst;
+ }
if (!server) {
printf("%d Running as client - waiting for server input\n",
if (align_data) {
/* allocate send and receive buffers */
- if (posix_memalign((void**)&rbuf, 4096, max(64, buf_len * (burst+1))) ||
- posix_memalign((void**)&sbuf, 4096, max(64, buf_len * (burst+1)))) {
+ if (posix_memalign((void**)&rbuf, 4096, max(64, buf_len * rq_cnt)) ||
+ posix_memalign((void**)&sbuf, 4096, max(64, buf_len * rq_cnt))) {
perror("malloc");
exit(1);
}
} else {
/* allocate send and receive buffers */
- if (((rbuf = malloc(max(64, buf_len * (burst+1)))) == NULL) ||
- ((sbuf = malloc(max(64, buf_len * (burst+1)))) == NULL)) {
+ if (((rbuf = malloc(max(64, buf_len * rq_cnt))) == NULL) ||
+ ((sbuf = malloc(max(64, buf_len * rq_cnt))) == NULL)) {
perror("malloc");
exit(1);
}
ret = dat_ia_open(provider, 8, &h_async_evd, &h_ia);
stop = get_time();
ts.open += ((stop - start) * 1.0e6);
+ ts.total += ts.open;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d: Error Adaptor open: %s\n",
getpid(), DT_RetToStr(ret));
ret = dat_pz_create(h_ia, &h_pz);
stop = get_time();
ts.pzc += ((stop - start) * 1.0e6);
+ ts.total += ts.pzc;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error creating Protection Zone: %s\n",
getpid(), DT_RetToStr(ret));
ep_attr.max_rdma_size = 0x10000;
ep_attr.qos = 0;
ep_attr.recv_completion_flags = 0;
- ep_attr.max_recv_dtos = MSG_BUF_COUNT + (burst * 3);
- ep_attr.max_request_dtos = MSG_BUF_COUNT + (burst * 3) + MAX_RDMA_RD;
+ ep_attr.max_recv_dtos = rq_cnt;
+ ep_attr.max_request_dtos = sq_cnt;
ep_attr.max_recv_iov = MSG_IOV_COUNT;
ep_attr.max_request_iov = MSG_IOV_COUNT;
ep_attr.max_rdma_read_in = MAX_RDMA_RD;
#endif
/*********** RDMA write data *************/
- if ((write_immed) && (write_only))
+ if (write_only_pp) {
+ ret = do_rdma_write_ping_pong();
+ }
+ else if (write_immed && write_only) {
ret = do_rdma_write_imm_with_msg();
- else
+ }
+ else {
ret = do_rdma_write_with_msg();
+ }
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error do_rdma_write_%swith_msg: %s\n",
DT_RetToStr(ret));
goto cleanup;
} else
- LOGPRINTF("%d do_rdma_write_%swith_msg complete\n",
- getpid(), write_immed && write_only ? "imm_":"");
+ LOGPRINTF("%d rdma_write test complete\n", getpid());
- if (write_only || !rdma_read)
+ if (write_only_pp || write_only || !rdma_read)
goto complete;
/*********** RDMA read data *************/
ret = dat_pz_free(h_pz);
stop = get_time();
ts.pzf += ((stop - start) * 1.0e6);
+ ts.total += ts.pzf;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error freeing PZ: %s\n",
getpid(), DT_RetToStr(ret));
ret = dat_ia_close(h_ia, DAT_CLOSE_ABRUPT_FLAG);
stop = get_time();
ts.close += ((stop - start) * 1.0e6);
+ ts.total += ts.close;
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d: Error Adaptor close: %s\n",
getpid(), DT_RetToStr(ret));
free(sbuf);
if (ts.rtt)
- printf("%d: Message RTT: Total=%6.2lf usec, %d bursts, itime=%6.2lf usec, pc=%d\n",
- getpid(), ts.rtt, burst, ts.rtt / burst, poll_count);
+ printf( "%d: %s PingPong: (%d x %d) Total %6.2lf us:"
+ " latency %3.2lf us, BW %4.2lf MB/s\n",
+ getpid(), write_only_pp ? "RDMA write":"Message",
+ write_only_pp ? burst : msg_burst, buf_len, ts.rtt,
+ write_only_pp ? ts.rtt/burst/2:ts.rtt/msg_burst/2,
+ write_only_pp ? (double)(1/(ts.rtt/burst/2/buf_len)):
+ (double)(1/(ts.rtt/msg_burst/2/buf_len)));
+
if (ts.rdma_wr && (!server || (server && !uni_direction))) {
int msgs = uni_direction ? burst : burst * 2;
return (DAT_SUCCESS);
}
+/* always uni-direction */
+DAT_RETURN do_rdma_write_ping_pong(void)
+{
+ DAT_EVENT event;
+ DAT_LMR_TRIPLET l_iov[MSG_IOV_COUNT];
+ DAT_RMR_TRIPLET r_iov;
+ DAT_DTO_COOKIE cookie;
+ DAT_RETURN ret;
+ int i, suppress = DAT_COMPLETION_SUPPRESS_FLAG;
+ DAT_DTO_COMPLETION_EVENT_DATA *dto_event =
+ &event.event_data.dto_completion_event_data;
+ volatile char *tx_buf = (char*)&sbuf[buf_len-1]; /* last byte */
+ volatile char *rx_buf = (char*)&rbuf[buf_len-1]; /* last byte */
+ uint32_t rx_cnt = 0;
+ uint32_t tx_cnt = 0;
+
+ printf("\n %d RDMA WRITE PINGPONG\n\n", getpid());
+
+ /* RMR information from previously received message */
+ r_iov = p_rmr_rcv[recv_msg_index - 1];
+
+ for (i = 0; i < MSG_IOV_COUNT; i++) {
+ l_iov[i].lmr_context = lmr_context_send;
+ l_iov[i].segment_length = buf_len / MSG_IOV_COUNT;
+ l_iov[i].virtual_address = (DAT_VADDR) (uintptr_t)
+ (&sbuf[l_iov[i].segment_length*i]);
+ LOGPRINTF("%d rdma_write iov[%d] buf=%p,len=%d\n",
+ getpid(), i,
+ &sbuf[l_iov[i].segment_length * i],
+ l_iov[i].segment_length);
+ }
+ start = get_time();
+ for (i = 0; i < burst; i++) {
+ if (rx_cnt < burst && !(server && !tx_cnt)) {
+ rx_cnt++;
+ while (*rx_buf != (char)rx_cnt);
+ }
+ LOGPRINTF("%d rdma_write # %d data rcv'ed\n", getpid(), rx_cnt);
+
+ if (!((i+1) % signal_rate))
+ suppress = DAT_COMPLETION_DEFAULT_FLAG;
+ else
+ suppress = DAT_COMPLETION_SUPPRESS_FLAG;
+
+ *tx_buf = (char)++tx_cnt;
+ cookie.as_64 = tx_cnt;
+ ret = dat_ep_post_rdma_write(h_ep, MSG_IOV_COUNT,
+ l_iov, cookie, &r_iov,
+ suppress);
+ if (ret) {
+ fprintf(stderr, "%d: ERROR: dat_rdma_write() %s\n",
+ getpid(), DT_RetToStr(ret));
+ return (DAT_ABORT);
+ }
+ LOGPRINTF("%d rdma_write # %d data sent\n", getpid(), tx_cnt);
+ if (!suppress) {
+ while (dat_evd_dequeue(h_dto_req_evd, &event));
+ if (dto_event->status) {
+ fprintf(stderr,
+ "ERROR rdma_write: status=0x%x ck="
+ " "F64x " exp 0x%x\n",
+ dto_event->status,
+ dto_event->user_cookie.as_64, tx_cnt);
+ return (DAT_ABORT);
+ }
+ LOGPRINTF("%d rdma_write # %d data sent event\n", getpid(), tx_cnt);
+ }
+ }
+ stop = get_time();
+ ts.rtt = ((stop - start) * 1.0e6);
+
+ if((unsigned char)*rx_buf != (unsigned char)rx_cnt) {
+ printf( "%d %s RW pingpong: %p, last *buf %d != cnt %d\n",
+ getpid(), server ? "SERVER:" : "CLIENT:",
+ rx_buf, (unsigned char)*rx_buf,
+ (unsigned char)rx_cnt);
+ return (DAT_ABORT);
+ }
+
+ return (DAT_SUCCESS);
+}
+
DAT_RETURN do_rdma_read_with_msg(void)
{
DAT_EVENT event;
rcv_buf = rbuf;
/* pre-post all buffers */
- for (i = 0; i < burst; i++) {
+ for (i = 0; i < msg_burst; i++) {
burst_msg_posted++;
cookie.as_64 = i;
l_iov.lmr_context = lmr_context_recv;
l_iov.virtual_address = (DAT_VADDR) (uintptr_t) rcv_buf;
l_iov.segment_length = buf_len;
- LOGPRINTF("%d Pre-posting Receive Message Buffers %p\n",
- getpid(), rcv_buf);
+ LOGPRINTF("%d Pre-posting Receive Message Buffer[%d] %p\n",
+ getpid(), i, rcv_buf);
ret = dat_ep_post_recv(h_ep,
1,
/* client ping 0x55, server pong 0xAA in first byte */
start = get_time();
- for (i = 0; i < burst; i++) {
+ for (i = 0; i < msg_burst; i++) {
/* walk the send and recv buffers */
if (!server) {
*snd_buf = 0x55;
LOGPRINTF("%d %s SND buffer %p contains: 0x%x len=%d\n",
getpid(), server ? "SERVER:" : "CLIENT:",
- snd_buf, *snd_buf, buf_len);
+ snd_buf, *(unsigned char *)snd_buf, buf_len);
ret = send_msg(snd_buf,
buf_len,
}
/* recv message, send completions suppressed */
+ event.event_number = 0;
if (collect_event(h_dto_rcv_evd,
&event,
DTO_TIMEOUT,
/* validate event number and status */
LOGPRINTF("%d inbound message; message arrived!\n", getpid());
if (event.event_number != DAT_DTO_COMPLETION_EVENT) {
- fprintf(stderr, "%d Error unexpected DTO event : %s\n",
- getpid(), DT_EventToStr(event.event_number));
+ fprintf(stderr, "%d Error DTO event (0x%x): %s\n",
+ getpid(), event.event_number,
+ DT_EventToStr(event.event_number));
return (DAT_ABORT);
}
if ((event.event_data.dto_completion_event_data.
return (DAT_ABORT);
}
- LOGPRINTF("%d %s RCV buffer %p contains: 0x%x len=%d\n",
+ LOGPRINTF("%d %s RCV buffer[%d] %p contains: 0x%x len=%d\n",
getpid(), server ? "SERVER:" : "CLIENT:",
- rcv_buf, *rcv_buf, buf_len);
+ i, rcv_buf, *(unsigned char *)rcv_buf, buf_len);
burst_msg_index++;
if (server) {
*snd_buf = 0xaa;
- LOGPRINTF("%d %s SND buffer %p contains: 0x%x len=%d\n",
+ LOGPRINTF("%d %s SND buffer[%d] %p contains: 0x%x len=%d\n",
getpid(), server ? "SERVER:" : "CLIENT:",
- snd_buf, *snd_buf, buf_len);
+ i, snd_buf, *(unsigned char *)snd_buf, buf_len);
ret = send_msg(snd_buf,
buf_len,
ret = dat_lmr_create(h_ia,
DAT_MEM_TYPE_VIRTUAL,
region,
- buf_len * (burst+1),
+ buf_len * rq_cnt,
h_pz,
DAT_MEM_PRIV_ALL_FLAG,
DAT_VA_TYPE_VA,
ret = dat_lmr_create(h_ia,
DAT_MEM_TYPE_VIRTUAL,
region,
- buf_len * (burst + 1),
+ buf_len * rq_cnt,
h_pz,
DAT_MEM_PRIV_ALL_FLAG,
DAT_VA_TYPE_VA,
printf("\n DAPL USAGE \n\n");
printf("s: server\n");
printf("u: unidirectional bandwidth (default=bidirectional\n");
- printf("w: rdma write only\n");
+ printf("w: rdma write only, streaming\n");
+ printf("W: rdma write only, ping pong\n");
printf("t: performance times\n");
printf("c: use cno\n");
printf("v: verbose\n");