From bda8e1c96e5b576ec7ec3e4d226fc4e21dbc716d Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Tue, 18 Feb 2014 16:02:18 -0800 Subject: [PATCH] mpxyd: improve QP destruction to manage QP1 and QP2 variations With proxy-in and proxy-out connection combinations the proxy agent sometimes manages 2 QPs. Change QP flush and destruction to manage all combinations of QPs. QP can also be on both tx and rx link-list for proxy-in and proxy-out processing. QP free needs to be modified to serialize and remove QP object from all lists. Remove QPN option from mix_get_qp call. Proxy-in RX_IMM message processing changed to validate CM connected state and IB QP state before reposting. Proxy-in pending_wr processing should send WC's to release proxy buffers more frequently instead of on last segment. With multiple QP's sharing proxy buffer it could stall waiting for last segment WC's. It will now signal on last segment or every 10th segment by default. Signed-off-by: Arlin Davis --- dapl/svc/mcm.c | 23 ++++++++++++++++------- dapl/svc/mix.c | 45 +++++++++++++++++---------------------------- dapl/svc/mpxy_in.c | 35 +++++++++++++++++++++-------------- dapl/svc/mpxyd.c | 4 ++++ 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/dapl/svc/mcm.c b/dapl/svc/mcm.c index 2034ad0..3bafedd 100644 --- a/dapl/svc/mcm.c +++ b/dapl/svc/mcm.c @@ -413,11 +413,23 @@ int mcm_modify_qp(struct ibv_qp *qp_handle, return ret; } +/* move QP's to error state and destroy. Flush the proxy SR queue is exists */ void mcm_flush_qp(struct mcm_qp *m_qp) { struct mcm_sr *m_sr; struct dat_mix_wc wc; + if (m_qp->ib_qp1) { + mcm_modify_qp(m_qp->ib_qp1, IBV_QPS_ERR, 0, 0, NULL); + ibv_destroy_qp(m_qp->ib_qp1); + m_qp->ib_qp1 = NULL; + } + if (m_qp->ib_qp2) { + mcm_modify_qp(m_qp->ib_qp2, IBV_QPS_ERR, 0, 0, NULL); + ibv_destroy_qp(m_qp->ib_qp2); + m_qp->ib_qp2 = NULL; + } + mpxy_lock(&m_qp->rxlock); while (m_qp->sr_tl != m_qp->sr_hd) { m_sr = (struct mcm_sr *)(m_qp->sr_buf + (m_qp->sr_sz * m_qp->sr_tl)); @@ -439,7 +451,6 @@ void mcm_flush_qp(struct mcm_qp *m_qp) mpxy_unlock(&m_qp->rxlock); } - /* MCM Endpoint CM objects */ void m_cm_free(mcm_cm_t *cm) { @@ -840,10 +851,9 @@ void mcm_cm_disc(mcm_cm_t *cm) switch (cm->state) { case MCM_CONNECTED: /* CONSUMER: move to err state to flush */ - if (cm->m_qp) { - mcm_modify_qp(cm->m_qp->ib_qp2, IBV_QPS_ERR, 0, 0, 0); + if (cm->m_qp) mcm_flush_qp(cm->m_qp); - } + /* send DREQ, event after DREP or DREQ timeout */ cm->state = MCM_DISC_PENDING; cm->msg.op = htons(MCM_DREQ); @@ -882,10 +892,9 @@ void mcm_cm_disc(mcm_cm_t *cm) case MCM_DISC_RECV: MCNTR(cm->md, MCM_CM_DREQ_IN); /* CM_THREAD: move to err state to flush */ - if (cm->m_qp) { - mcm_modify_qp(cm->m_qp->ib_qp2, IBV_QPS_ERR, 0, 0, 0); + if (cm->m_qp) mcm_flush_qp(cm->m_qp); - } + /* DREQ received, send DREP and schedule event, finalize */ cm->msg.op = htons(MCM_DREP); cm->state = MCM_DISCONNECTED; diff --git a/dapl/svc/mix.c b/dapl/svc/mix.c index 8774e5d..93f1ee1 100644 --- a/dapl/svc/mix.c +++ b/dapl/svc/mix.c @@ -319,14 +319,14 @@ mcm_cq_t *mix_get_cq(mcm_scif_dev_t *smd, uint32_t tid) } /* locate QP object, qpt list */ -mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid, uint32_t qpn) +mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid) { mcm_qp_t *qp = NULL; mpxy_lock(&smd->qptlock); qp = get_head_entry(&smd->qptlist); while (qp) { - if ((qp->t_entry.tid == tid) || (qp->ib_qp2->qp_num == qpn)) + if (qp->t_entry.tid == tid) break; qp = get_next_entry(&qp->t_entry, &smd->qptlist); } @@ -495,12 +495,15 @@ resp: /* called with smd->qptlist lock held */ void m_qp_free(struct mcm_qp *m_qp) { - struct ibv_qp *qp1 = m_qp->ib_qp1; - struct ibv_qp *qp2 = m_qp->ib_qp2; + if (m_qp->t_entry.tid) + remove_entry(&m_qp->t_entry); + if (m_qp->r_entry.tid) { /* MXS - MXS, also qprlist */ + mpxy_lock(&m_qp->smd->qprlock); + remove_entry(&m_qp->r_entry); + mpxy_unlock(&m_qp->smd->qprlock); + } - m_qp->ib_qp1 = NULL; - m_qp->ib_qp2 = NULL; if (m_qp->cm) { /* unlink CM, serialized */ struct mcm_cm *cm = m_qp->cm; @@ -511,27 +514,12 @@ void m_qp_free(struct mcm_qp *m_qp) mpxy_unlock(&cm->lock); mcm_dqconn(m_qp->smd, cm); } - if (qp1) { - mcm_modify_qp(qp1, IBV_QPS_ERR, 0, 0, NULL); - ibv_destroy_qp(qp1); - } - if (qp2) { - mcm_modify_qp(qp2, IBV_QPS_ERR, 0, 0, NULL); - ibv_destroy_qp(qp2); - } - mcm_flush_qp(m_qp); - - if (m_qp->t_entry.tid) - remove_entry(&m_qp->t_entry); - - if (m_qp->r_entry.tid) - remove_entry(&m_qp->r_entry); + mcm_flush_qp(m_qp); /* move QP to error, flush & destroy */ #ifdef MCM_PROFILE if (mcm_profile) mcm_qp_prof_pr(m_qp, MCM_QP_ALL); #endif - /* resource pools, proxy_in CQ, and qp object */ m_po_destroy_bpool(m_qp); m_pi_destroy_bpool(m_qp); @@ -552,15 +540,16 @@ static int mix_qp_destroy(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg) int len; struct mcm_qp *m_qp; - mlog(8, " MIX_QP_DESTROY: QP_t - id 0x%x\n", pmsg->req_id ); MCNTR(smd->md, MCM_QP_FREE); /* Find the QP */ - m_qp = mix_get_qp(smd, pmsg->req_id, 0); + m_qp = mix_get_qp(smd, pmsg->req_id); if (!m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->req_id); goto err; } + mlog(8, " QP_t - id 0x%x m_qp = %p\n", pmsg->req_id, m_qp); + mpxy_lock(&smd->qptlock); m_qp_free(m_qp); mpxy_unlock(&smd->qptlock); @@ -1050,7 +1039,7 @@ static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t sc pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id); /* Find the QP for linking */ - m_qp = mix_get_qp(smd, pmsg->qp_id, 0); + m_qp = mix_get_qp(smd, pmsg->qp_id); if (!m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); goto err; @@ -1519,7 +1508,7 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t sc memcpy(&m_cm->msg, &pmsg->msg, sizeof(dat_mcm_msg_t)); /* Attach the QP for this CR */ - m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id, 0); + m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id); if (!m_cm->m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); mpxy_unlock(&m_cm->lock); @@ -1854,7 +1843,7 @@ static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_sr_t *pmsg) } /* get QP by ID */ - m_qp = mix_get_qp(smd, pmsg->qp_id, 0); + m_qp = mix_get_qp(smd, pmsg->qp_id); if (!m_qp) { struct dat_mix_wc wc; @@ -1908,7 +1897,7 @@ static int mix_post_recv(mcm_scif_dev_t *smd, dat_mix_sr_t *pmsg) } /* get QP by ID */ - m_qp = mix_get_qp(smd, pmsg->qp_id, 0); + m_qp = mix_get_qp(smd, pmsg->qp_id); if (!m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); goto err; diff --git a/dapl/svc/mpxy_in.c b/dapl/svc/mpxy_in.c index e91e829..6ba736e 100644 --- a/dapl/svc/mpxy_in.c +++ b/dapl/svc/mpxy_in.c @@ -436,7 +436,7 @@ static int m_pi_send_wc(struct mcm_qp *m_qp, struct mcm_wr_rx *wr_rx, int status struct ibv_qp *ib_qp; int wc_idx, ret; - mlog(4," WC_rem: wr_rx[%d] %p, wc_idx %d flgs %x, WR_r tl %d-%d wt %d hd %d\n", + mlog(0x10," WC_rem: wr_rx[%d] %p, wc_idx %d flgs %x, WR_r tl %d-%d wt %d hd %d\n", wr_rx->w_idx, wr_rx, m_qp->wc_hd_rem, wr_rx->flags, m_qp->wr_tl_r, wr_rx->w_idx, m_qp->wr_tl_r_wt, m_qp->wr_hd_r); @@ -1054,7 +1054,7 @@ void m_pi_rcv_event(struct mcm_qp *m_qp, wrc_idata_t *wrc) } -/* Proxy-in service +/* Proxy-in service - RX thread * * <- Work request in (RW_imm - WR idata), remote initiated RW * <- Work completion in (RW_imm - WC idata), local initiated RW @@ -1065,7 +1065,7 @@ void m_rcv_event(struct mcm_cq *m_cq, int *events) struct ibv_cq *ib_cq; struct mcm_qp *m_qp; void *cq_ctx; - int i, wc_cnt, ret, err, notify = 0; + int i, wc_cnt, ret, err=0, notify=0; ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx); if (ret == 0) @@ -1108,6 +1108,12 @@ retry: ntohl(wc[i].imm_data), m_qp->ib_qp2->state); continue; } + if (m_qp->cm && (m_qp->cm->state != MCM_CONNECTED)) { + mlog(1," WARN: RX data on DISC m_qp %p qp1 %p qp2 %p\n", + m_qp, m_qp->ib_qp1, m_qp->ib_qp2, + mcm_state_str(m_qp->cm->state)); + continue; + } if (wc[i].opcode == IBV_WC_RECV_RDMA_WITH_IMM) { struct ibv_recv_wr r_wr, *r_err; @@ -1134,16 +1140,16 @@ retry: ib_qp = m_qp->ib_qp2; errno = 0; - err = ibv_post_recv(ib_qp, &r_wr, &r_err); - if (err) { - mlog(0,"ERR: qp %p (%s) qpn %x ibv_post_recv ret = %d %s\n", - m_qp, (MXS_EP(&m_qp->smd->md->addr) && - MSS_EP(&m_qp->cm->msg.daddr1)) ? "QP1":"QP2", - m_qp->ib_qp2 ? - m_qp->ib_qp2->qp_num:m_qp->ib_qp1->qp_num, - ret, strerror(errno)); - - /* todo: report QP error state? */ + if (ib_qp) { + err = ibv_post_recv(ib_qp, &r_wr, &r_err); + if (err) { + mlog(0,"ERR: qp %p (%s) qpn %x ibv_post_recv ret = %d %s\n", + m_qp, (MXS_EP(&m_qp->smd->md->addr) && + MSS_EP(&m_qp->cm->msg.daddr1)) ? "QP1":"QP2", + m_qp->ib_qp2 ? + m_qp->ib_qp2->qp_num:m_qp->ib_qp1->qp_num, + ret, strerror(errno)); + } } MCNTR(m_qp->smd->md, MCM_QP_RECV); @@ -1239,7 +1245,8 @@ void m_pi_pending_wr(struct mcm_qp *m_qp, int *data) m_pi_free_sr(m_qp, m_sr); } - if (wr_rx->flags & M_SEND_LS) { + /* need regular WC's to release PI buffers */ + if ((wr_rx->flags & M_SEND_LS) || (!(wr_rx->w_idx % 10))) { mlog(4, "WR_rx[%d] wr %p LastSeg: send WC! tl %d hd %d\n", wr_rx->w_idx, wr_rx, m_qp->wr_tl_r, m_qp->wr_hd_r); diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index 2f7f6ce..d9e6169 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -310,7 +310,9 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd) m_cm = get_head_entry(&smd->llist); while (m_cm) { next_cm = get_next_entry(&m_cm->entry, &smd->llist); + mpxy_unlock(&smd->llock); mcm_dqlisten(smd, m_cm); /* dequeue and free */ + mpxy_lock(&smd->llock); m_cm = next_cm; } init_list(&smd->llist); @@ -322,7 +324,9 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd) m_cm = get_head_entry(&smd->clist); while (m_cm) { next_cm = get_next_entry(&m_cm->entry, &smd->clist); + mpxy_unlock(&smd->clock); mcm_dqconn(smd, m_cm); /* dequeue and free */ + mpxy_lock(&smd->clock); m_cm = next_cm; } init_list(&smd->clist); -- 2.46.0