]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
mpxyd: with abnormal CM termination a CM object can be referenced after QP destroy
authorArlin Davis <arlin.r.davis@intel.com>
Thu, 10 Dec 2015 22:48:05 +0000 (14:48 -0800)
committerArlin Davis <arlin.r.davis@intel.com>
Thu, 10 Dec 2015 22:48:05 +0000 (14:48 -0800)
The proxy-in CQ is not flushed and processes properly during
mix_qp_destroy. Depending on the EP mode there can be 2 seperate
connections with multiple CQs to process. Add new mix_cq_flush
function that will flush all pending work on TX and RX side of
proxy engine. CM object is destroyed and reset only after all
pending work is processed on ALL endpoint CQ associations.
Add error logging when WR resources are exhausted.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
dapl/svc/mix.c

index cb8249997fa6df43ee110c9ea4ecc5435b32df79..741ca7c4885fcd90657c99fc49114ffe37040639 100644 (file)
@@ -525,6 +525,27 @@ void m_cq_free(struct mcm_cq *m_cq)
        free(m_cq);
 }
 
+/* smd->cqlock/cqrlock held */
+void m_cq_flush(struct mcm_cq *m_cq)
+{
+       struct ibv_cq *ib_cq = NULL;
+       void *cq_ctx;
+       int ret, cnt=0;
+       struct ibv_wc wc;
+
+       mlog(8, " m_cq %p enter:\n", m_cq);
+       ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx);
+       if (ret == 0)
+               ibv_ack_cq_events(m_cq->ib_cq, 1);
+
+       do {
+               ret = ibv_poll_cq(m_cq->ib_cq, 1, &wc);
+               cnt += ret;
+       }
+       while (ret > 0);
+       mlog(8, " m_cq %p exit: %d events flushed\n", m_cq, cnt);
+}
+
 /* destroy proxy CQ, fits in header */
 static int mix_cq_destroy(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg)
 {
@@ -778,8 +799,26 @@ void m_qp_free(struct mcm_qp *m_qp)
                mpxy_unlock(&m_qp->smd->qprlock);
        }
 
-       mlog(8, " m_qp %p m_cm %p cm_id %d\n",
-               m_qp, m_qp->cm, m_qp->cm ? m_qp->cm->entry.tid:0);
+       mlog(8, " m_qp %p m_cm %p cm_id %d cm_state %d\n",
+               m_qp, m_qp->cm, m_qp->cm ? m_qp->cm->entry.tid:0,
+               m_qp->cm ? m_qp->cm->state:0);
+
+       if (m_qp->cm)
+               m_qp->cm->state = MCM_DISCONNECTED;
+
+       mcm_flush_qp(m_qp); /* QP to error, flush consumer messages */
+
+       if (m_qp->m_cq_tx) { /* flush pending PO WRs on cq_tx */
+               mpxy_lock(&m_qp->smd->cqlock);
+               m_cq_flush(m_qp->m_cq_tx);
+               mpxy_unlock(&m_qp->smd->cqlock);
+       }
+
+       if (m_qp->m_cq_rx) { /* flush pending PI WRs on cq_rx */
+               mpxy_lock(&m_qp->smd->cqrlock);
+               m_cq_flush(m_qp->m_cq_rx);
+               mpxy_unlock(&m_qp->smd->cqrlock);
+       }
 
        if (m_qp->cm) { /* unlink CM, serialized */
                struct mcm_cm *cm = m_qp->cm;
@@ -791,7 +830,6 @@ void m_qp_free(struct mcm_qp *m_qp)
                mpxy_unlock(&cm->lock);
                mcm_dqconn_free(m_qp->smd, cm);
        }
-       mcm_flush_qp(m_qp); /* move QP to error, flush */
 
        if (m_qp->ib_qp1) {
                ibv_destroy_qp(m_qp->ib_qp1);
@@ -812,6 +850,7 @@ void m_qp_free(struct mcm_qp *m_qp)
        if (m_qp->m_cq_rx) {
                mpxy_lock(&m_qp->smd->cqrlock);
                m_cq_free(m_qp->m_cq_rx);
+               m_qp->m_cq_rx = NULL;
                mpxy_unlock(&m_qp->smd->cqrlock);
        }
        mpxy_lock_destroy(&m_qp->txlock); /* proxy out */
@@ -1266,12 +1305,16 @@ void mix_dto_event(struct mcm_cq *m_cq, struct dat_mix_wc *wc, int nc)
 
                if (msg.wc[i].status != IBV_WC_SUCCESS) {
                        if (msg.wc[i].status  != IBV_WC_WR_FLUSH_ERR) {
-                               mlog(0, " [%d:%d] ERROR (ep=%d): cq %p id %d ctx %p stat %d"
-                                       "  op 0x%x ln %d wr_id %p wc's %d verr 0x%x errno=%d,%s\n",
-                                       m_cq->smd->md->mc->scif_id, m_cq->smd->entry.tid,
-                                       m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx,
-                                       msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len,
-                                       msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err,
+                               mlog(0, " [%d:%d] ERROR (ep=%d): id %d stat %d"
+                                       " op %x flg %x ln %d wr_id %p wc's %d"
+                                       " verr 0x%x errno=%d,%s\n",
+                                       m_cq->smd->md->mc->scif_id,
+                                       m_cq->smd->entry.tid,
+                                       m_cq->smd->scif_op_ep, msg.cq_id,
+                                       msg.wc[i].status, msg.wc[i].opcode,
+                                       msg.wc[i].wc_flags, msg.wc[i].byte_len,
+                                       msg.wc[i].wr_id, msg.wc_cnt,
+                                       msg.wc[i].vendor_err,
                                        errno, strerror(errno));
                        }
                } else {
@@ -2183,6 +2226,7 @@ static int mix_proxy_out(mcm_scif_dev_t *smd, dat_mix_sr_t *pmsg, mcm_qp_t *m_qp
        mpxy_lock(&m_qp->txlock);
        if (((m_qp->wr_hd + 1) & m_qp->wr_end) == m_qp->wr_tl) { /* full */
                ret = ENOMEM;
+               mlog(0, " ERR: WR full hd %d tl %d\n", m_qp->wr_hd, m_qp->wr_tl);
                goto bail;
        }
        m_qp->wr_hd = (m_qp->wr_hd + 1) & m_qp->wr_end; /* move hd */