From: Amir Hanania Date: Wed, 23 Sep 2015 21:43:38 +0000 (-0700) Subject: mpxyd: add P2P inline support for data size <= 96 bytes X-Git-Tag: dapl-2.1.7-1~3 X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=fb64e157b9dd741ba942db00ceee37ea0f4ddcab;p=~ardavis%2Fdapl.git mpxyd: add P2P inline support for data size <= 96 bytes Improve small message latency for proxy to proxy service by including data with the proxy work request. Necessary changes made to preservie order across WR's regardless of size. Additional logging included. Improves single byte one-way latency of about 27% on MFO configurations. Changes made to avoid forwarding 0-byte rdma write to scif_writeto, remove CPU hand copies, and order. Changes for numa_node == -1 such that mic0 assumes MSS and mic1 assumes MXS modes. Signed-off-by: Arlin Davis Signed-off-by: Amir Hanania --- diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h index 1ac0c12..69ec31b 100644 --- a/dapl/openib_common/dapl_ib_common.h +++ b/dapl/openib_common/dapl_ib_common.h @@ -65,6 +65,7 @@ struct dcm_ib_qp { struct mcm_wrc_info wrc; /* local WC info */ struct mcm_wrc_info wrc_rem; /* remote WR info */ DAPL_OS_LOCK lock; /* Proxy WR and WC queues */ + uint8_t p2p_data; /* Max number of bytes to pass to proxy in the WR */ uint8_t ep_map; /* Peer EP mapping, MXS, MSS, HST */ uint32_t seg_sz; /* Peer MXS Proxy-in segment size */ char *wr_buf_rx; /* mcm_wr_rx_t entries, devices without inline data */ diff --git a/dapl/openib_common/dapl_mic_common.h b/dapl/openib_common/dapl_mic_common.h index 5afc8ec..86a815e 100755 --- a/dapl/openib_common/dapl_mic_common.h +++ b/dapl/openib_common/dapl_mic_common.h @@ -58,6 +58,7 @@ #define DAT_MCM_PDATA_SIZE 64 #define DAT_MCM_PROXY_DATA 40 #define DAT_MCM_SEG_PO2 17 +#define DAT_MCM_P2P_INLINE 96 #define ALIGN_64(o) ((o + 64 - 1) & ~(64-1)) #define ALIGN_P64(o) ((((uintptr_t)o) + 64 - 1)& ~(64-1)) @@ -843,6 +844,7 @@ enum mcm_wr_flags { M_READ_FROM_DONE = 1 << 16, /* m_wr mpxyd read_from_done, ready for posting */ M_SEND_DIRECT = 1 << 17, /* m_wr SEND direct from host memory, no proxy out buffer */ + M_PROXY_INLINE = 1 << 18, /* m_wr contains the data */ }; /* 80 bytes */ @@ -881,7 +883,7 @@ typedef struct mcm_wrc_info { uint16_t wc_end; } __attribute__((packed)) mcm_wrc_info_t; -/* WR: 160 bytes, direct RDMA write from remote Proxy-in service */ +/* WR: 256 bytes, direct RDMA write from remote Proxy-in service */ typedef struct mcm_wr_rx { struct dat_mix_wr wr; struct dat_mix_sge sg[DAT_MIX_SGE_MAX]; @@ -893,8 +895,11 @@ typedef struct mcm_wr_rx { uint32_t flags; uint32_t time; uint32_t qcnt; + char inline_data[DAT_MCM_P2P_INLINE]; } __attribute__((packed)) mcm_wr_rx_t; +#define MCM_WR_RX_NO_DATA (sizeof(mcm_wr_rx_t) - DAT_MCM_P2P_INLINE) + /* WC: 80 bytes, direct RDMA write from remote Proxy-in service */ typedef struct mcm_wc_rx { struct dat_mix_wc wc; @@ -948,7 +953,7 @@ static inline void mcm_hton_wr_rx(struct mcm_wr_rx *m_wr_rx, struct mcm_wr *m_wr { int i; - memset((void*)m_wr_rx, 0, sizeof(*m_wr_rx)); + memset((void*)m_wr_rx, 0, MCM_WR_RX_NO_DATA); m_wr_rx->org_id = (uint64_t) htonll((uint64_t)m_wr); /* proxy_out WR */ m_wr_rx->flags = htonl(m_wr->flags); m_wr_rx->w_idx = htonl(wc_tl); /* snd back wc tail */ diff --git a/dapl/openib_mcm/cm.c b/dapl/openib_mcm/cm.c index 56ed23a..f2a4b8d 100644 --- a/dapl/openib_mcm/cm.c +++ b/dapl/openib_mcm/cm.c @@ -1198,6 +1198,14 @@ void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg) if (MXF_EP(&cm->msg.daddr1)) { /* save PI WR info, create local WC_q, send back WC info */ mcm_ntoh_wrc(&ep->qp_handle->wrc_rem, (mcm_wrc_info_t*)cm->msg.p_proxy); + if (ep->qp_handle->wrc_rem.wr_sz > MCM_WR_RX_NO_DATA) + ep->qp_handle->p2p_data = DAT_MCM_P2P_INLINE; + else + ep->qp_handle->p2p_data = 0; + + dapl_log(DAPL_DBG_TYPE_CM, "CONN_RTU: qp %p set proxy max inline to %d\n", + ep->qp_handle, ep->qp_handle->p2p_data); + mcm_create_wc_q(ep->qp_handle, ep->qp_handle->wrc_rem.wr_end + 1); mcm_hton_wrc((mcm_wrc_info_t*)cm->msg.p_proxy, &ep->qp_handle->wrc); ep->qp_handle->ep_map = cm->msg.daddr1.ep_map; @@ -1522,6 +1530,14 @@ dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data) if (MXF_EP(&cm->msg.daddr1)) { /* save PI WR info, create local WC_q, send back WC info */ mcm_ntoh_wrc(&ep->qp_handle->wrc_rem, (mcm_wrc_info_t*)cm->msg.p_proxy); + if (ep->qp_handle->wrc_rem.wr_sz > MCM_WR_RX_NO_DATA) + ep->qp_handle->p2p_data = DAT_MCM_P2P_INLINE; + else + ep->qp_handle->p2p_data = 0; + + dapl_log(DAPL_DBG_TYPE_CM, "ACCEPT_USR: qp %p set proxy max inline to %d\n", + ep->qp_handle, ep->qp_handle->p2p_data); + mcm_create_wc_q(ep->qp_handle, ep->qp_handle->wrc_rem.wr_end + 1); mcm_hton_wrc((mcm_wrc_info_t*)cm->msg.p_proxy, &ep->qp_handle->wrc); ep->qp_handle->ep_map = cm->msg.daddr1.ep_map; diff --git a/dapl/openib_mcm/device.c b/dapl/openib_mcm/device.c index 92ab201..86c4565 100644 --- a/dapl/openib_mcm/device.c +++ b/dapl/openib_mcm/device.c @@ -191,8 +191,6 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, flags & DAPL_OPEN_QUERY ? "QUERY MODE":"STD MODE"); /* set RC tunables via enviroment or default */ - hca_ptr->ib_trans.ib_cm.max_inline = - dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_IB_DEFAULT); hca_ptr->ib_trans.ib_cm.ack_retry = dapl_os_get_env_val("DAPL_ACK_RETRY", DCM_ACK_RETRY); hca_ptr->ib_trans.ib_cm.ack_timer = @@ -316,7 +314,7 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, if (dapl_ib_inline_data(hca_ptr->ib_hca_handle)) { hca_ptr->ib_trans.ib_cm.max_inline = dapl_os_get_env_val("DAPL_MAX_INLINE", - INLINE_SEND_IB_DEFAULT); + sizeof(mcm_wr_rx_t)); } else hca_ptr->ib_trans.ib_cm.max_inline = 0; diff --git a/dapl/openib_mcm/proxy.c b/dapl/openib_mcm/proxy.c index 824f575..5abb8b1 100644 --- a/dapl/openib_mcm/proxy.c +++ b/dapl/openib_mcm/proxy.c @@ -110,14 +110,21 @@ int mcm_send_pi(struct dcm_ib_qp *m_qp, sge.lkey = m_qp->wr_buf_rx_mr->lkey; } sge.addr = (uint64_t)(uintptr_t) wr_rx_ptr; - sge.length = (uint32_t) sizeof(struct mcm_wr_rx); /* 160 byte WR */ + sge.length = (uint32_t)m_qp->wrc_rem.wr_sz; + + if (m_qp->p2p_data && seg_len < m_qp->p2p_data) { + dapl_log(DAPL_DBG_TYPE_EP, + " Sending p2p data, len %d\n", seg_len); + wr_flags |= M_PROXY_INLINE; + memcpy(wr_rx_ptr->inline_data, (void *)(l_addr + l_off), seg_len); + } dapl_log(DAPL_DBG_TYPE_EP, " mcm_send_pi[%d]: seg_ln %d wr_idx %d, tl %d hd %d\n", i, seg_len, wr_idx, m_qp->wr_tl, m_qp->wr_hd); /* build local m_wr_rx for remote PI */ - memset((void*)wr_rx_ptr, 0, sizeof(struct mcm_wr_rx)); + memset((void*)wr_rx_ptr, 0, MCM_WR_RX_NO_DATA); wr_rx_ptr->org_id = (uint64_t) htonll((uint64_t)wr->wr_id); wr_rx_ptr->flags = htonl(wr_flags); wr_rx_ptr->w_idx = htonl(m_qp->wc_tl); /* snd back wc tail */ @@ -168,7 +175,7 @@ int mcm_send_pi(struct dcm_ib_qp *m_qp, i, wr_imm.wr_id, m_qp->qp2->qp_num, wr_imm.opcode, wr_flags, ntohl(wr_imm.imm_data), l_addr + l_off, wr_imm.wr.rdma.remote_addr, - wr_imm.wr.rdma.rkey, sizeof(struct mcm_wr_rx), l_len); + wr_imm.wr.rdma.rkey, sge.length, l_len); dapl_log(DAPL_DBG_TYPE_EP, " mcm_send_pi[%d]: WR wr_id %Lx qn %x op %d flgs %x" " imm %x raddr %p rkey %x sg_ln %d tl %d me %d hd %d\n", @@ -437,7 +444,7 @@ int mcm_create_wc_q(struct dcm_ib_qp *m_qp, int entries) m_qp->wrc.wc_addr, m_qp->wc_mr->addr, ALIGN_PAGE(m_qp->wrc.wc_len), entries, m_qp->wc_mr->rkey, m_qp->wc_mr->lkey); - if (!m_qp->ep->header.owner_ia->hca_ptr->ib_trans.ib_cm.max_inline) { + if (!m_qp->tp->ib_cm.max_inline) { if (posix_memalign((void **)&m_qp->wr_buf_rx, 4096, entries * sizeof(mcm_wr_rx_t))) { diff --git a/dapl/svc/mcm.c b/dapl/svc/mcm.c index 7b7a6a5..4b91090 100644 --- a/dapl/svc/mcm.c +++ b/dapl/svc/mcm.c @@ -49,6 +49,8 @@ int mcm_rep_ms = 4000; int mcm_rtu_ms = 2000; int mcm_dreq_ms = 1000; int mcm_proxy_in = 1; +int mcm_mic0_mss = 1; /* numa_node invalid, default mic0 == MSS */ +int mcm_mic1_mss = 0; /* numa_node invalid, default mic1 == MXS */ extern int mix_buffer_sg_po2; extern int mcm_rx_entries; @@ -199,7 +201,13 @@ int mcm_init_cm_service(mcm_ib_dev_t *md) md->mc->ver == MIX_COMP || mcm_proxy_in == 0) md->addr.ep_map = MIC_SSOCK_DEV; else - md->addr.ep_map = MIC_XSOCK_DEV; + md->addr.ep_map = MIC_XSOCK_DEV; + + /* Invalid numa, check settings MSS for mic0, MXS for mic1 */ + if (md->numa_node == -1 && md->mc->scif_id == 1 && mcm_mic0_mss) + md->addr.ep_map = MIC_SSOCK_DEV; + if (md->numa_node == -1 && md->mc->scif_id == 2 && mcm_mic1_mss) + md->addr.ep_map = MIC_SSOCK_DEV; } /* setup CM timers and queue sizes */ diff --git a/dapl/svc/mix.c b/dapl/svc/mix.c index ec715f3..cb82499 100644 --- a/dapl/svc/mix.c +++ b/dapl/svc/mix.c @@ -207,7 +207,6 @@ void mix_scif_accept(scif_epd_t listen_ep) if (smd) return; err: - mlog(0, " ERR: open_device -> closing SCIF client EPs %d %d %d \n", op_ep, tx_ep, ev_ep); out_close_tx_ep: scif_close(tx_ep); @@ -1594,6 +1593,10 @@ int mix_cm_rep_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) memcpy(&m_cm->msg.daddr1, &pkt->saddr1, sizeof(dat_mcm_addr_t)); memcpy(&m_cm->msg.daddr2, &pkt->saddr2, sizeof(dat_mcm_addr_t)); mcm_ntoh_wrc(&m_cm->m_qp->wrc_rem, (mcm_wrc_info_t *)m_cm->msg.p_proxy); /* peer RI WRC info */ + if (m_cm->m_qp->wrc_rem.wr_sz > MCM_WR_RX_NO_DATA) + m_cm->m_qp->p2p_data = DAT_MCM_P2P_INLINE; + else + m_cm->m_qp->p2p_data = 0; mlog(2, " WRC: m_qp %p - WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d\n", @@ -1603,13 +1606,14 @@ int mix_cm_rep_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) m_cm->m_qp->wrc.wc_rkey, m_cm->m_qp->wrc.wc_len, m_cm->m_qp->wrc.wc_sz, m_cm->m_qp->wrc.wc_end); - mlog(2, " WRC_rem: m_qp %p - WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" - " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d\n", - m_cm->m_qp, m_cm->m_qp->wrc_rem.wr_addr, m_cm->m_qp->wrc_rem.wr_rkey, + mlog(2, " WRC_rem: WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" + " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d p2p %d\n", + m_cm->m_qp->wrc_rem.wr_addr, m_cm->m_qp->wrc_rem.wr_rkey, m_cm->m_qp->wrc_rem.wr_len, m_cm->m_qp->wrc_rem.wr_sz, m_cm->m_qp->wrc_rem.wr_end, m_cm->m_qp->wrc_rem.wc_addr, m_cm->m_qp->wrc_rem.wc_rkey, m_cm->m_qp->wrc_rem.wc_len, - m_cm->m_qp->wrc_rem.wc_sz, m_cm->m_qp->wrc_rem.wc_end); + m_cm->m_qp->wrc_rem.wc_sz, m_cm->m_qp->wrc_rem.wc_end, + m_cm->m_qp->p2p_data); /* MXS <- MSS or HOST, fabric: TX: QP2->QP1 direct, RX: QP1<-QP2 proxy */ if ((MXF_EP(&m_cm->md->addr) && !MXF_EP(&m_cm->msg.daddr1)) && @@ -1815,10 +1819,15 @@ int mix_cm_rtu_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) /* MXF_EP <- HST_EP, host sends WC on RTU, save WRC info */ if (MXF_EP(&pkt->daddr1) && HST_EP(&pkt->saddr2)) { mcm_ntoh_wrc(&m_cm->m_qp->wrc_rem, (mcm_wrc_info_t *)pkt->p_proxy); - mlog(2, " WRC_rem: m_qp %p - addr 0x%Lx rkey 0x%x len %d, sz %d end %d\n", + if (m_cm->m_qp->wrc_rem.wr_sz > MCM_WR_RX_NO_DATA) + m_cm->m_qp->p2p_data = DAT_MCM_P2P_INLINE; + else + m_cm->m_qp->p2p_data = 0; + + mlog(2, " WRC_rem: WC addr 0x%Lx rkey 0x%x len %d, sz %d end %d p2p %d\n", m_cm->m_qp, m_cm->m_qp->wrc_rem.wc_addr, m_cm->m_qp->wrc_rem.wc_rkey, m_cm->m_qp->wrc_rem.wc_len, m_cm->m_qp->wrc_rem.wc_sz, - m_cm->m_qp->wrc_rem.wc_end); + m_cm->m_qp->wrc_rem.wc_end, m_cm->m_qp->p2p_data); } mpxy_lock(&m_cm->lock); @@ -1902,6 +1911,10 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t sc m_cm->ref_cnt++; /* Passive: QP ref */ m_cm->m_qp->cm = m_cm; mcm_ntoh_wrc(&m_cm->m_qp->wrc_rem, (mcm_wrc_info_t *)m_cm->msg.p_proxy); /* save peer PI WRC info */ + if (m_cm->m_qp->wrc_rem.wr_sz > MCM_WR_RX_NO_DATA) + m_cm->m_qp->p2p_data = DAT_MCM_P2P_INLINE; + else + m_cm->m_qp->p2p_data = 0; mlog(2, " WRC: m_qp %p - WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d sg_po2 %d\n", @@ -1912,13 +1925,14 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t sc m_cm->m_qp->wrc.wc_sz, m_cm->m_qp->wrc.wc_end, m_cm->msg.seg_sz); - mlog(2, " WRC_rem: m_qp %p - WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" - " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d\n", - m_cm->m_qp, m_cm->m_qp->wrc_rem.wr_addr, m_cm->m_qp->wrc_rem.wr_rkey, + mlog(2, " WRC_rem: WR 0x%Lx rkey 0x%x ln %d, sz %d end %d" + " WC 0x%Lx rkey 0x%x ln %d, sz %d end %d p2p %d\n", + m_cm->m_qp->wrc_rem.wr_addr, m_cm->m_qp->wrc_rem.wr_rkey, m_cm->m_qp->wrc_rem.wr_len, m_cm->m_qp->wrc_rem.wr_sz, m_cm->m_qp->wrc_rem.wr_end, m_cm->m_qp->wrc_rem.wc_addr, m_cm->m_qp->wrc_rem.wc_rkey, m_cm->m_qp->wrc_rem.wc_len, - m_cm->m_qp->wrc_rem.wc_sz, m_cm->m_qp->wrc_rem.wc_end); + m_cm->m_qp->wrc_rem.wc_sz, m_cm->m_qp->wrc_rem.wc_end, + m_cm->m_qp->p2p_data); /* MXS -> MSS or HOST, remote: need QPr1, saddr1 on mpxyd */ if ((MXF_EP(&m_cm->md->addr) && !MXF_EP(&m_cm->msg.daddr1)) && @@ -2174,17 +2188,14 @@ static int mix_proxy_out(mcm_scif_dev_t *smd, dat_mix_sr_t *pmsg, mcm_qp_t *m_qp m_qp->wr_hd = (m_qp->wr_hd + 1) & m_qp->wr_end; /* move hd */ m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd)); - mlog(4, " inline, m_wr %p m_sge %p len %d hd %d tl %d\n", - m_wr, m_wr->sg, len, m_qp->wr_hd, m_qp->wr_tl); - /* IB rdma write WR */ const_ib_rw(&m_wr->wr, &pmsg->wr, m_wr->sg); m_wr->wr.sg_list = m_wr->sg; m_wr->wr.num_sge = len ? 1:0; - mlog(4, " INLINE m_wr (%p)raddr %p rkey 0x%llx, ib_wr raddr %p rkey 0x%llx \n", - &pmsg->wr.wr.rdma.remote_addr, pmsg->wr.wr.rdma.remote_addr, pmsg->wr.wr.rdma.rkey, - &m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey); + mlog(4, " INLINE m_wr[%d] %p raddr %p rkey 0x%x, ib_wr raddr %p rkey 0x%x %d bytes\n", + m_qp->wr_hd, m_wr, pmsg->wr.wr.rdma.remote_addr, pmsg->wr.wr.rdma.rkey, + m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey, len); /* M_WR */ m_wr->org_id = pmsg->wr.wr_id; @@ -2290,7 +2301,6 @@ retry_mr: goto bail; } } - mlog(4, " inline data rcv'ed %d bytes\n", len); if ((smd->md->indata) && (len <= mcm_ib_inline)) m_wr->wr.send_flags |= IBV_SEND_INLINE; diff --git a/dapl/svc/mpxy_in.c b/dapl/svc/mpxy_in.c index adf5021..54cc62a 100644 --- a/dapl/svc/mpxy_in.c +++ b/dapl/svc/mpxy_in.c @@ -529,10 +529,10 @@ static int m_pi_send_wc(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_rx, int status wr.send_flags |= IBV_SEND_INLINE; mlog(4, " WC: RW_imm post: wr_id[%d] %Lx sglist %p sge %d op %d flgs %x" - " idata %x WR_rem = raddr %p rkey %x ln %d op %x\n", + " idata %x WR_rem = raddr %p rkey %x io_ln %d op %x\n", wr_rx->w_idx, wr.wr_id, wr.sg_list, wr.num_sge, wr.opcode, wr.send_flags, ntohl(wr.imm_data), wr.wr.rdma.remote_addr, - wr.wr.rdma.rkey, sge.length, wr_rx->wr.opcode); + wr.wr.rdma.rkey, wr_rx->sg[0].length, wr_rx->wr.opcode); /* MXS -> MSS or HST, PI service will be on QP1 */ if (MXF_EP(&m_qp->smd->md->addr) && @@ -562,7 +562,7 @@ static int m_pi_send_wc(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_rx, int status } /* called with rxlock, process all RR's up to signal marker at wr_last */ -static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, struct ibv_wc *wc) +static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig) { mcm_scif_dev_t *smd = m_qp->smd; struct mcm_wr_rx *wr_rx; @@ -576,9 +576,13 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str while (m_qp->pi_rr_cnt) { /* RR's pending */ wr_rx = (struct mcm_wr_rx *)(m_qp->wrc.wr_addr + (m_qp->wrc.wr_sz * wr_idx)); - if (!(wr_rx->flags & M_READ_POSTED)) { - /* reached RR signaled marker, or head pointer */ - if (wr_idx == wr_sig->w_idx || wr_idx == m_qp->wr_hd_r) + if (!(wr_rx->flags & M_READ_DONE)) { + /* reached head pointer */ + if (wr_idx == m_qp->wr_hd_r) + break; + + /* maintain order */ + if (wr_rx->flags & M_READ_POSTED) break; wr_idx = (wr_idx + 1) & m_qp->wrc.wr_end; /* next WR */ @@ -589,9 +593,8 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str if (wr_rx == wr_sig) mcm_qp_prof_ts(m_qp, MCM_QP_IB_RR, wr_rx->time, wr_rx->qcnt, wr_cnt); #endif - mlog(4, " WR_rx[%d-%d] %p m_qp %p wc %p wc->op %x wr_rx->wr.op %x\n", - wr_rx->w_idx, wr_sig->w_idx, wr_rx, m_qp, wc, - wc->opcode, wr_rx->wr.opcode); + mlog(4, " WR_rx[%d-%d] %p m_qp %p wr_rx->wr.op %x\n", + wr_rx->w_idx, wr_sig->w_idx, wr_rx, m_qp, wr_rx->wr.opcode); m_qp->pi_rr_cnt--; /* rdma read complete */ MCNTR(smd->md, MCM_QP_READ_DONE); @@ -622,11 +625,27 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str sg_len = wr_rx->sg[2].length; r_off = m_pi_mr_trans(smd, wr_rx->wr.wr.rdma.remote_addr, wr_rx->wr.wr.rdma.rkey, sg_len); - if (!r_off) + if (!r_off && sg_len) goto bail; - mlog(4, " RDMA_WRITE op: wr_rx[%d] %p -> scif r_off %Lx len %d\n", - wr_rx->w_idx, wr_rx, r_off, sg_len, 0); + if (!sg_len) { /* 0 byte rdma write, no scif */ + mlog(1, " RDMA_WRITE op: wr_rx[%d] %p," + " raddr %p rkey %x 0 bytes\n", + wr_rx->w_idx, wr_rx, + wr_rx->wr.wr.rdma.remote_addr, + wr_rx->wr.wr.rdma.rkey); + + m_qp->post_cnt_wt++; + wr_rx->flags &= ~M_READ_DONE; + wr_rx->flags |= M_READ_WRITE_TO; + wr_rx->wr.wr_id = wr_rx->org_id; /* mark done */ + + if (wr_idx == m_qp->wr_hd_r) + break; + + wr_idx = (wr_idx + 1) & m_qp->wrc.wr_end; /* next WR */ + continue; + } } /* sg[0] entry == proxy-out buffer, src for IB RR */ @@ -650,8 +669,8 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str wr_rx->time = mcm_ts_us(); wr_rx->qcnt = m_qp->post_cnt_wt; #endif - if (w_len < 256) - wt_flag = SCIF_RMA_USECPU; + if (wr_rx->flags & M_SEND_LS) + wt_flag |= SCIF_RMA_ORDERED; ret = scif_writeto(smd->scif_tx_ep, l_off, w_len, r_off, wt_flag); @@ -688,13 +707,12 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str goto bail; } MCNTR(smd->md, MCM_SCIF_SIGNAL); - wr_rx->flags &= ~M_READ_POSTED; /* reset READ_POSTED */ - wr_rx->flags |= M_READ_DONE; + wr_rx->flags &= ~M_READ_DONE; wr_rx->flags |= M_READ_WRITE_TO; m_qp->post_cnt_wt++; - /* reached RR signaled marker, or head */ - if (wr_idx == wr_sig->w_idx || wr_idx == m_qp->wr_hd_r) + /* reached head */ + if (wr_idx == m_qp->wr_hd_r) break; wr_idx = (wr_idx + 1) & m_qp->wrc.wr_end; /* next WR */ @@ -702,10 +720,13 @@ static void m_pi_post_writeto(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_sig, str return; bail: /* report error via WC back to proxy-out */ - mlog(0, " ERR: writeto: wr_rx[%d] %p -> raddr %Lx rkey %x (scif r_off %Lx) len %d\n", + mlog(0, " ERR: writeto: wr_rx[%d] %p -> IB raddr %Lx rkey %x" + " SCIF r_off %Lx, len %d wr_flags %x wt_pend %d\n", wr_rx->w_idx, wr_rx, wr_rx->wr.wr.rdma.remote_addr, - wr_rx->wr.wr.rdma.rkey, r_off, sg_len); + wr_rx->wr.wr.rdma.rkey, r_off, sg_len, wr_rx->flags, + m_qp->post_cnt_wt); + m_pi_send_wc(m_qp, wr_rx, IBV_WC_REM_ACCESS_ERR); return; } @@ -750,7 +771,9 @@ void m_pi_req_event(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_rx, struct ibv_wc /* RR complete, ready for SCIF_writeto to complete RW or SR */ if (type == WRID_RX_RR) { mpxy_lock(&m_qp->rxlock); - m_pi_post_writeto(m_qp, wr_rx, wc); + wr_rx->flags &= ~M_READ_POSTED; /* reset READ_POSTED */ + wr_rx->flags |= M_READ_DONE; + m_pi_post_writeto(m_qp, wr_rx); mpxy_unlock(&m_qp->rxlock); write(m_qp->smd->md->mc->rx_pipe[1], "w", sizeof "w"); /* signal rx_thread */ sched_yield(); @@ -923,25 +946,35 @@ static void m_pi_post_read(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_rx) wr_rx->time = mcm_ts_us(); wr_rx->qcnt = m_qp->pi_rr_cnt; #endif - wr_rx->flags |= M_READ_POSTED; - errno = 0; - ret = ibv_post_send(ib_qp, &ib_wr, &bad_wr); - if (ret) - goto bail; - m_qp->pi_rr_cnt++; - m_qp->post_cnt_rr++; - MCNTR(smd->md, MCM_QP_READ); + if (wr_rx->flags & M_PROXY_INLINE) { + mlog(0x10, "wr_rx flag PROXY_INLINE is set. data len %d\n", wr_rx->sg[0].length); + memcpy((void *)rbuf, wr_rx->inline_data, wr_rx->sg[0].length); + m_qp->pi_rr_cnt++; + m_qp->post_cnt_rr++; + wr_rx->flags |= M_READ_DONE; + m_pi_post_writeto(m_qp, wr_rx); + } + else { + errno = 0; + wr_rx->flags |= M_READ_POSTED; + ret = ibv_post_send(ib_qp, &ib_wr, &bad_wr); + if (ret) + goto bail; - mlog(0x10, "[%d:%d:%d] WR[%d] %p RR(%d,%d,%d): wr_id %Lx qn %x flgs %x,%x ln %d " - "r_addr,key %Lx %x to l_addr,key %Lx %x tl %d hd %d, m_idx %x\n", - smd->md->mc->scif_id, smd->entry.tid, m_qp->r_entry.tid, - wr_rx->w_idx, wr_rx, m_qp->post_cnt_rr, m_qp->stall_cnt_rr, - m_qp->pi_rr_cnt, ib_wr.wr_id, ib_qp->qp_num, ib_wr.send_flags, - wr_rx->flags, l_len, ib_wr.wr.rdma.remote_addr, - ib_wr.wr.rdma.rkey, ib_wr.sg_list->addr, ib_wr.sg_list->lkey, - m_qp->wr_tl_r, m_qp->wr_hd_r, wr_rx->m_idx); + m_qp->pi_rr_cnt++; + m_qp->post_cnt_rr++; + MCNTR(smd->md, MCM_QP_READ); + mlog(0x10, "[%d:%d:%d] WR[%d] %p RR(%d,%d,%d): wr_id %Lx qn %x flgs %x,%x ln %d " + "r_addr,key %Lx %x to l_addr,key %Lx %x tl %d hd %d, m_idx %x\n", + smd->md->mc->scif_id, smd->entry.tid, m_qp->r_entry.tid, + wr_rx->w_idx, wr_rx, m_qp->post_cnt_rr, m_qp->stall_cnt_rr, + m_qp->pi_rr_cnt, ib_wr.wr_id, ib_qp->qp_num, ib_wr.send_flags, + wr_rx->flags, l_len, ib_wr.wr.rdma.remote_addr, + ib_wr.wr.rdma.rkey, ib_wr.sg_list->addr, ib_wr.sg_list->lkey, + m_qp->wr_tl_r, m_qp->wr_hd_r, wr_rx->m_idx); + } write(smd->md->mc->tx_pipe[1], "w", sizeof "w"); return; bail: @@ -969,6 +1002,8 @@ buf_err: ib_wr.send_flags, l_len, ib_wr.wr.rdma.remote_addr, ib_wr.wr.rdma.rkey, ib_wr.sg_list->addr, ib_wr.sg_list->lkey, m_qp->wr_tl_r, m_qp->wr_tl_r_wt, m_qp->wr_hd_r); + + m_pi_send_wc(m_qp, wr_rx, IBV_WC_REM_ACCESS_ERR); /* report error */ } void m_pi_rcv_event(struct mcm_qp *m_qp, wrc_idata_t *wrc) diff --git a/dapl/svc/mpxy_out.c b/dapl/svc/mpxy_out.c index eff81fc..d015dc3 100644 --- a/dapl/svc/mpxy_out.c +++ b/dapl/svc/mpxy_out.c @@ -274,9 +274,15 @@ static int m_po_send_pi(struct mcm_qp *m_qp, struct mcm_wr *m_wr, int wr_idx) sge.lkey = m_qp->wr_buf_rx_mr->lkey; } sge.addr = (uint64_t)(uintptr_t) wr_rx_ptr; - sge.length = (uint32_t) sizeof(struct mcm_wr_rx); + sge.length = (uint32_t) m_qp->wrc_rem.wr_sz; /* proxy m_wr over to remote m_wr_rem slot, remote will initiate RR and send back WC */ + if (m_qp->p2p_data && m_wr->sg[0].length < m_qp->p2p_data) { + mlog(0x4, " Sending the proxy data ( len %d ) inside the WR.\n", m_wr->sg[0].length); + m_wr->flags |= M_PROXY_INLINE; + memcpy (wr_rx_ptr->inline_data, (void *)m_wr->sg[0].addr, m_wr->sg[0].length); + } + m_wr->flags |= M_SEND_PI; mcm_hton_wr_rx(wr_rx_ptr, m_wr, m_qp->wc_tl); /* build rx_wr for wire transfer, send it */ @@ -336,10 +342,11 @@ static int m_po_send_pi(struct mcm_qp *m_qp, struct mcm_wr *m_wr, int wr_idx) ret = ibv_post_send(ib_qp, &wr, &bad_wr); if (ret) { mlog(0, " ERR: m_wr %p idx %d laddr=%p ln=%d lkey=%x flgs %x" - " tl %d hd %d pp %d sig %d\n", + " tl %d hd %d pp %d sig %d ret %d %s\n", m_wr, wr_idx, sge.addr, sge.length, sge.lkey, m_wr->flags, m_qp->wr_tl, m_qp->wr_hd, - m_qp->wr_pp_rem, m_qp->post_sig_cnt); + m_qp->wr_pp_rem, m_qp->post_sig_cnt, + ret, strerror(errno)); mlog(0, " ERR: wr_id %Lx %p sglist %p sge %d op %d flgs %x" " idata 0x%x raddr %p rkey %x \n", m_wr->wr.wr_id, m_wr->wr.sg_list, @@ -1086,7 +1093,7 @@ retry: /* Proxy_out -> */ m_wr = (struct mcm_wr *)WRID_ADDR(wc[i].wr_id); m_qp = (struct mcm_qp *)m_wr->context; - if (!MXF_EP(&m_qp->cm->msg.daddr1)) + if (m_qp->cm && !MXF_EP(&m_qp->cm->msg.daddr1)) m_qp->comp_cnt++; MCNTR(m_qp->smd->md, MCM_QP_WRITE_DONE); diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index b04d823..922eeae 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -146,8 +146,8 @@ static int init_scif() scif_close(scif_listen_ep); return -1; } - mlog(1," MPXYD: Listening on reserved SCIF OFED port %d, listen_EP %d, backlog %d\n", - (uint16_t)scif_id.port, scif_sport, scif_listen_qlen); + mlog(1," MPXYD: Listening on reserved SCIF OFED port %d, backlog %d\n", + (uint16_t)scif_id.port, scif_listen_qlen); return 0; } @@ -801,7 +801,7 @@ found: } err: if (!smd) { - mlog(0, " ERR: mix_open_device failed for %s - %d\n", msg->name, msg->port); + mlog(1, " WARN: open failed for %s - %d\n", msg->name, msg->port); msg->hdr.status = MIX_ENODEV; } @@ -1345,8 +1345,8 @@ int main(int argc, char **argv) logfile = mpxy_open_log(); mpxy_log_options(); - mlog(0, "CCL Proxy - SCIF/IB DAPL RDMA Proxy Service, Mix Version %d (Build-%u) v2\n", - DAT_MIX_VER, PACKAGE_DATE); + mlog(0, "CCL Proxy - SCIF/IB DAPL RDMA Proxy Service %s (%u)\n", + PACKAGE_VERSION, PACKAGE_DATE); if (init_scif()) { mlog(0, "ERROR - unable to open/init SCIF device\n"); diff --git a/dapl/svc/mpxyd.h b/dapl/svc/mpxyd.h index e444f5f..c733157 100644 --- a/dapl/svc/mpxyd.h +++ b/dapl/svc/mpxyd.h @@ -58,7 +58,7 @@ #define min(a, b) ((a < b) ? (a) : (b)) #define max(a, b) ((a > b) ? (a) : (b)) -#define MCM_IB_INLINE 160 +#define MCM_IB_INLINE (sizeof(mcm_wr_rx_t)) #define MIX_MAX_MSG_SIZE (8*1024*1024) #define MIX_MIN 4 /* oldest version supported */ @@ -211,6 +211,7 @@ typedef struct mcm_qp { int comp_cnt; char *wr_buf_rx; /* mcm_wr_rx_t entries, for devices without inline data */ struct ibv_mr *wr_buf_rx_mr; + int p2p_data; /* Max number of bytes to pass from proxy to proxy in the WR */ /* Proxy-in: WR management, remote view from TX side */ mcm_wrc_info_t wrc_rem; /* WR and WC buffers: remote, in CM req and reply */ int wr_pp_rem; /* work request pending */ diff --git a/dapl/svc/util.c b/dapl/svc/util.c index 8b5db68..010678c 100644 --- a/dapl/svc/util.c +++ b/dapl/svc/util.c @@ -57,6 +57,8 @@ extern int mcm_rep_ms; extern int mcm_rtu_ms; extern int mcm_dreq_ms; extern int mcm_proxy_in; +extern int mcm_mic0_mss; +extern int mcm_mic1_mss; /* mix.c */ extern int mix_align; @@ -451,6 +453,10 @@ void mpxy_set_options( int debug_mode ) while (mcm_rx_entries < rsize) mcm_rx_entries <<= 1; } + else if (!strcasecmp("mcm_mic0_mss", opt)) + mcm_mic0_mss = atoi(value); + else if (!strcasecmp("mcm_mic1_mss", opt)) + mcm_mic1_mss = atoi(value); } fclose(f);