From 0b39116c18d43faf367bac08263fa62341d8ecf8 Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Tue, 14 Jul 2015 14:58:32 -0700 Subject: [PATCH] mcm,mpxyd: fix dreq processing to defer QP flush when proxy WRs still pending The proxy will now defer DREQ flushing of proxy QPs if PI and PO data engines have outstanding requests. Add mcm_qp_busy routine for checking PI and PO data engines. When MIC calls disconnect always send DREQ up to proxy in order to handle deferred flush of proxy side posted rcv messages. Change QP free to modify both local and proxy QPs and check for outstanding rcv message before qp_destroy to avoid infinite wait in dapls_ep_flush_cqs. Signed-off-by: Arlin Davis --- dapl/openib_common/qp.c | 34 +++++++++++++++++-------------- dapl/openib_mcm/cm.c | 16 +++++++++------ dapl/svc/mcm.c | 44 ++++++++++++++++++++++++++++++----------- dapl/svc/mix.c | 24 ++++++++++++++++------ 4 files changed, 80 insertions(+), 38 deletions(-) diff --git a/dapl/openib_common/qp.c b/dapl/openib_common/qp.c index b2f8639..d3584e5 100644 --- a/dapl/openib_common/qp.c +++ b/dapl/openib_common/qp.c @@ -374,7 +374,6 @@ err: DAT_RETURN dapls_ib_qp_free(IN DAPL_IA * ia_ptr, IN DAPL_EP * ep_ptr) { struct ibv_qp *qp; - struct ibv_qp_attr qp_attr; dapl_log(DAPL_DBG_TYPE_EP, " dapls_ib_qp_free: ep_ptr %p qp_handle %p\n", @@ -390,39 +389,44 @@ DAT_RETURN dapls_ib_qp_free(IN DAPL_IA * ia_ptr, IN DAPL_EP * ep_ptr) if (ep_ptr->qp_handle) { qp = ep_ptr->qp_handle->qp; dapl_os_unlock(&ep_ptr->header.lock); - - qp_attr.qp_state = IBV_QPS_ERR; - if (qp) - ibv_modify_qp(qp, &qp_attr, IBV_QP_STATE); - dapls_ep_flush_cqs(ep_ptr); + if (qp) { + dapls_modify_qp_state(qp, IBV_QPS_ERR, 0, 0, 0); + dapls_ep_flush_cqs(ep_ptr); + } #ifdef _OPENIB_CMA_ rdma_destroy_qp(cm_ptr->cm_id); cm_ptr->cm_id->qp = NULL; -#else - if (qp && ibv_destroy_qp(qp)) { - dapl_log(DAPL_DBG_TYPE_ERR, - " qp_free: ibv_destroy_qp error - %s\n", - strerror(errno)); - } #endif #ifdef _OPENIB_MCM_ + if (ep_ptr->qp_handle->qp2) + dapls_modify_qp_state(ep_ptr->qp_handle->qp2, + IBV_QPS_ERR, 0, 0, 0); + /* MIC: shadow support on MPXYD node */ if (ia_ptr->hca_ptr->ib_trans.scif_ep) dapli_mix_qp_free(ep_ptr->qp_handle); - else /* NON MIC: local shadow queue */ + dapls_ep_flush_cqs(ep_ptr); + + if (ep_ptr->qp_handle->qp2) ibv_destroy_qp(ep_ptr->qp_handle->qp2); dapl_os_lock_destroy(&ep_ptr->qp_handle->lock); - mcm_destroy_pi_cq(ep_ptr->qp_handle); - mcm_destroy_wc_q(ep_ptr->qp_handle); #endif + if (qp) + ibv_destroy_qp(qp); } else { dapl_os_unlock(&ep_ptr->header.lock); } +#ifdef _OPENIB_MCM_ + if (ep_ptr->qp_handle) { + mcm_destroy_pi_cq(ep_ptr->qp_handle); + mcm_destroy_wc_q(ep_ptr->qp_handle); + } +#endif /* destroy any UD address handles */ if (ep_ptr->qp_handle->ah) { int i; diff --git a/dapl/openib_mcm/cm.c b/dapl/openib_mcm/cm.c index af55cf3..cc67b77 100644 --- a/dapl/openib_mcm/cm.c +++ b/dapl/openib_mcm/cm.c @@ -1684,6 +1684,13 @@ dapls_ib_disconnect(IN DAPL_EP *ep_ptr, IN DAT_CLOSE_FLAGS close_flags) { dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr); + if (cm_ptr && cm_ptr->tp->scif_ep) { /* always force proxy flush */ + dapli_mix_cm_dreq_out(cm_ptr); + if (ep_ptr->qp_handle->qp) + dapls_modify_qp_state(ep_ptr->qp_handle->qp, + IBV_QPS_ERR,0,0,0); + } + dapl_os_lock(&ep_ptr->header.lock); if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED || ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC || @@ -1693,16 +1700,13 @@ dapls_ib_disconnect(IN DAPL_EP *ep_ptr, IN DAT_CLOSE_FLAGS close_flags) } dapl_os_unlock(&ep_ptr->header.lock); - if (cm_ptr->tp->scif_ep) { /* QPt on MPXYD, QPr local or on MPXYD */ - dapli_mix_cm_dreq_out(cm_ptr); - if (ep_ptr->qp_handle->qp) - dapls_modify_qp_state(ep_ptr->qp_handle->qp, IBV_QPS_ERR,0,0,0); - } else { /* QPt and QPr local */ + /* HST: QPt and QPr local */ + if (!cm_ptr->tp->scif_ep) { dapli_cm_disconnect(cm_ptr); dapls_modify_qp_state(ep_ptr->qp_handle->qp2, IBV_QPS_ERR,0,0,0); } - return DAT_SUCCESS; + return DAT_SUCCESS; } /* diff --git a/dapl/svc/mcm.c b/dapl/svc/mcm.c index 7374835..1f0de05 100644 --- a/dapl/svc/mcm.c +++ b/dapl/svc/mcm.c @@ -442,22 +442,19 @@ int mcm_modify_qp(struct ibv_qp *qp_handle, return ret; } -/* move QP's to error state and destroy. Flush the proxy SR queue is exists */ +/* move QP's to error state. Flush the proxy SR queue is exists */ void mcm_flush_qp(struct mcm_qp *m_qp) { struct mcm_sr *m_sr; struct dat_mix_wc wc; - if (m_qp->ib_qp1) { + mlog(1, " QP1 %p QP2 %p\n", m_qp->ib_qp1, m_qp->ib_qp2); + + if (m_qp->ib_qp1) mcm_modify_qp(m_qp->ib_qp1, IBV_QPS_ERR, 0, 0, NULL); - ibv_destroy_qp(m_qp->ib_qp1); - m_qp->ib_qp1 = NULL; - } - if (m_qp->ib_qp2) { + + if (m_qp->ib_qp2) mcm_modify_qp(m_qp->ib_qp2, IBV_QPS_ERR, 0, 0, NULL); - ibv_destroy_qp(m_qp->ib_qp2); - m_qp->ib_qp2 = NULL; - } mpxy_lock(&m_qp->rxlock); while (m_qp->sr_tl != m_qp->sr_hd) { @@ -900,6 +897,29 @@ int mcm_cm_rej_out(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, DAT_MCM_OP type, int sw return (mcm_send(md, &smsg, NULL, 0)); } +int mcm_qp_busy(struct mcm_qp *m_qp) +{ + int busy = 0; + + mpxy_lock(&m_qp->rxlock); + busy += (m_qp->pi_rw_cnt + m_qp->pi_rr_cnt); + busy += (m_qp->stall_cnt_rr + m_qp->post_cnt_wt); + mpxy_unlock(&m_qp->rxlock); + mlog(2," PI[%d]: rw_cnt %d rr_cnt %d stall %d post %d\n", + busy, m_qp->pi_rw_cnt, m_qp->pi_rr_cnt, + m_qp->stall_cnt_rr, m_qp->post_cnt_wt); + + mpxy_lock(&m_qp->txlock); + busy += (m_qp->wr_pp + m_qp->wr_pp_rem); + busy += (m_qp->post_sig_cnt - m_qp->comp_cnt); + mpxy_unlock(&m_qp->txlock); + mlog(2," PO[%d]: wr_pp %d pp_rem %d pst_sig %d cmp_sig %d\n", + busy, m_qp->wr_pp, m_qp->wr_pp_rem, + m_qp->post_sig_cnt, m_qp->comp_cnt); + + return busy; +} + void mcm_cm_disc(mcm_cm_t *cm) { int finalize = 1; @@ -909,7 +929,7 @@ void mcm_cm_disc(mcm_cm_t *cm) switch (cm->state) { case MCM_CONNECTED: /* CONSUMER: move to err state to flush */ - if (cm->m_qp) + if (cm->m_qp && !mcm_qp_busy(cm->m_qp)) mcm_flush_qp(cm->m_qp); /* send DREQ, event after DREP or DREQ timeout */ @@ -950,7 +970,7 @@ void mcm_cm_disc(mcm_cm_t *cm) case MCM_DISC_RECV: MCNTR(cm->md, MCM_CM_DREQ_IN); /* CM_THREAD: move to err state to flush */ - if (cm->m_qp) + if (cm->m_qp && !mcm_qp_busy(cm->m_qp)) mcm_flush_qp(cm->m_qp); /* DREQ received, send DREP and schedule event, finalize */ @@ -964,6 +984,8 @@ void mcm_cm_disc(mcm_cm_t *cm) break; case MCM_DISCONNECTED: mlog(2," state = %s already disconnected\n", mcm_state_str(cm->state) ); + if (cm->m_qp) /* MIC client disc, force flush */ + mcm_flush_qp(cm->m_qp); mpxy_unlock(&cm->lock); MCNTR(cm->md, MCM_CM_DREQ_DUP); return; diff --git a/dapl/svc/mix.c b/dapl/svc/mix.c index 31b24a0..bfeecbb 100644 --- a/dapl/svc/mix.c +++ b/dapl/svc/mix.c @@ -580,7 +580,16 @@ void m_qp_free(struct mcm_qp *m_qp) mpxy_unlock(&cm->lock); mcm_dqconn_free(m_qp->smd, cm); } - mcm_flush_qp(m_qp); /* move QP to error, flush & destroy */ + mcm_flush_qp(m_qp); /* move QP to error, flush */ + + if (m_qp->ib_qp1) { + ibv_destroy_qp(m_qp->ib_qp1); + m_qp->ib_qp1 = NULL; + } + if (m_qp->ib_qp2) { + ibv_destroy_qp(m_qp->ib_qp2); + m_qp->ib_qp2 = NULL; + } #ifdef MCM_PROFILE if (mcm_profile) @@ -1042,8 +1051,8 @@ void mix_dto_event(struct mcm_cq *m_cq, struct dat_mix_wc *wc, int nc) if (msg.wc[i].status != IBV_WC_SUCCESS) { if (msg.wc[i].status != IBV_WC_WR_FLUSH_ERR) { - mlog(0, " ERROR (ep=%d): cq %p id %d ctx %p stat %d" - " [%d:%d] op 0x%x ln %d wr_id %p wc's %d verr 0x%x errno=%d,%s\n", + mlog(0, " [%d:%d] ERROR (ep=%d): cq %p id %d ctx %p stat %d" + " op 0x%x ln %d wr_id %p wc's %d verr 0x%x errno=%d,%s\n", m_cq->smd->md->mc->scif_id, m_cq->smd->entry.tid, m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx, msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len, @@ -1083,8 +1092,9 @@ void mix_cm_event(mcm_cm_t *m_cm, uint32_t event) mpxy_unlock(&m_cm->lock); } - mlog(2, " MIX_CM_EVENT: cm %p cm_id %d, ctx %p, event 0x%x dev_id %d\n", - m_cm, m_cm->entry.tid, msg.cm_ctx, event, m_cm->smd->entry.tid); + mlog(2, " MIX_CM_EVENT: (ep=%d) cm %p id %d ctx %p event 0x%x dev_id %d\n", + m_cm->smd->scif_ev_ep, m_cm, m_cm->entry.tid, + msg.cm_ctx, event, m_cm->smd->entry.tid); len = sizeof(dat_mix_cm_event_t); mpxy_lock(&m_cm->smd->evlock); @@ -1222,8 +1232,10 @@ static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t s return 0; } - /* process DREQ */ + /* process DREQ, flush QP */ mcm_cm_disc(m_cm); + if (m_cm->m_qp) + mcm_flush_qp(m_cm->m_qp); /* move QP to error, flush */ return 0; } -- 2.46.0