]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
mpxyd: init proxy WC buffer queues for new queue management
authorArlin Davis <arlin.r.davis@intel.com>
Sun, 9 Mar 2014 18:49:48 +0000 (11:49 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Sun, 9 Mar 2014 18:49:48 +0000 (11:49 -0700)
change device destroy to dequeue SMD immediately.
change TX thread to run whenever active device is queued.

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
dapl/svc/mpxyd.c

index b05ac63fe62a4745d31cbf60bb595da0438fd369..dfbc06c23a2f839a180ac54264c91f8d2d9d40fb 100644 (file)
@@ -296,6 +296,8 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd)
        mcm_cq_t *m_cq, *next_cq;
        mcm_mr_t *m_mr, *next_mr;
 
+       remove_entry(&smd->entry); /* remove off md->smd_list */
+
        /* free cm_id port */
        if (smd->cm_id) {
                mcm_free_port(smd->md->ports, smd->cm_id);
@@ -400,8 +402,6 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd)
        if (smd->ref_cnt)
                mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt);
 
-       remove_entry(&smd->entry);
-
        mpxy_destroy_bpool(smd);
        mlog(8, " proxy buffer pools destroyed \n");
 
@@ -465,9 +465,9 @@ static int create_smd_bpool(mcm_scif_dev_t *smd)
                mlog(0, "failed to allocate smd m_bu_wc, m_len=%d, ERR: %d\n", wcq_len, ret);
                return -1;
        }
-       smd->m_buf_hd = 1;
+       smd->m_buf_hd = 0;
        smd->m_buf_tl = 0;
-       smd->m_buf_end = wcq_len/sizeof(mcm_buf_wc_t);
+       smd->m_buf_end = (wcq_len/sizeof(mcm_buf_wc_t)) - 1;
 
        mlog(8, " m_buf_wc %p, len %d allocated \n", smd->m_buf_wc, wcq_len);
 
@@ -513,9 +513,9 @@ static int create_smd_bpool(mcm_scif_dev_t *smd)
        mlog(8, " m_buf_wc_r %p, len %d, entries %d \n",
                   smd->m_buf_wc_r, wcq_len, wcq_len/sizeof(mcm_buf_wc_t));
 
-       smd->m_buf_hd_r = 1;
+       smd->m_buf_hd_r = 0;
        smd->m_buf_tl_r = 0;
-       smd->m_buf_end_r = wcq_len/sizeof(mcm_buf_wc_t);
+       smd->m_buf_end_r = (wcq_len/sizeof(mcm_buf_wc_t)) - 1;
        return 0;
 }
 
@@ -538,6 +538,7 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e
        if (ret) {
                mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n",
                        ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret);
+               smd->cmd_buf = NULL;
                goto err;
        }
        mlog(8, "Allocated smd cmd_buf = %p len %d\n",
@@ -592,6 +593,7 @@ err:
                        free(smd->cmd_buf);
                if (smd->ports)
                        free(smd->ports);
+
                mpxy_destroy_bpool(smd);
                free(smd);
        }
@@ -735,7 +737,6 @@ void sig_handler( int signum )
        finished = 1;
 }
 
-
 static void mpxy_set_thread_priority()
 {
        int policy;
@@ -764,7 +765,6 @@ static void mpxy_set_thread_priority()
                pthread_self(), params.sched_priority);
 }
 
-
 void mpxy_tx_thread(void *mic_client)
 {
        mcm_client_t *mc = (mcm_client_t*)mic_client;
@@ -773,7 +773,7 @@ void mpxy_tx_thread(void *mic_client)
        struct mcm_cq *m_cq;
        struct mcm_qp *m_qp;
        struct mcm_fd_set *set;
-       int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt;
+       int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt, smd_cnt;
        char rbuf[2];
 
        if (mcm_affinity) {
@@ -800,7 +800,7 @@ void mpxy_tx_thread(void *mic_client)
                mpxy_lock(&mc->txlock);
                mcm_fd_zero(set);
                mcm_fd_set(mc->tx_pipe[0], set, POLLIN);
-               data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0;
+               data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0, smd_cnt=0;
                for (i=0;i<MCM_IB_MAX;i++) {
                        md = &mc->mdev[i];
                        if (md->ibctx == NULL)
@@ -810,6 +810,7 @@ void mpxy_tx_thread(void *mic_client)
                        mpxy_lock(&md->slock);
                        smd = get_head_entry(&md->smd_list);
                        while (smd && !smd->destroy) {
+                               smd_cnt++;
                                smd->th_ref_cnt++;
                                mpxy_unlock(&md->slock);
 
@@ -843,13 +844,17 @@ void mpxy_tx_thread(void *mic_client)
                        rf_cnt += ((uint64_t *)md->cntrs)[MCM_SCIF_READ_FROM];
                        rd_cnt += ((uint64_t *)md->cntrs)[MCM_QP_READ_DONE];
                }
-               time_ms = (data || events) ? 0:-1;
+               time_ms = smd_cnt ? 0:-1;
                mpxy_unlock(&mc->txlock);
-               if (time_ms) mlog(0x20, "TX sleep WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
+               if (time_ms) mlog(0x10, "TX sleep WR %d RF %d RD %d\n", wr_cnt,rf_cnt,rd_cnt);
                mcm_select(set, time_ms);
-               if (time_ms) mlog(0x20, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
-               if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN)
-                       read(mc->tx_pipe[0], rbuf, 2);
+               if (time_ms) mlog(0x10, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
+               if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) {
+                       int cnt = 0;
+                       while (read(mc->tx_pipe[0], rbuf, 1) > 0)
+                               cnt++;
+                       mlog(0x10, " PIPE r_cnt %d\n", cnt);
+               }
        }
        mlog(0, "TX thread exiting\n");
 }
@@ -1072,15 +1077,12 @@ void mpxy_rx_thread(void *mic_client)
                        mpxy_lock(&md->slock);
                        smd = get_head_entry(&md->smd_list);
                        while (smd && !smd->destroy) {
-                               mlog(0x8, " smd %p destroy %d refs %d\n",
-                                         smd, smd->destroy, smd->th_ref_cnt);
                                smd->th_ref_cnt++;
                                mpxy_unlock(&md->slock);
 
                                mpxy_lock(&smd->cqrlock);
                                m_cq = get_head_entry(&smd->cqrlist);
                                while (m_cq) {
-                                       mlog(0x8, " CQr %p \n", m_cq);
                                        m_rcv_event(m_cq, &data); /* chk receive requests, initiate RR's */
                                        if (m_cq->ib_ch)
                                                mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN);
@@ -1091,12 +1093,6 @@ void mpxy_rx_thread(void *mic_client)
                                mpxy_lock(&smd->qprlock);
                                m_qp = get_head_entry(&smd->qprlist);
                                while (m_qp) {
-                                       mlog(0x8, " QPr %p nxt %p prv %p hd %p qprlist %p\n",
-                                               m_qp, m_qp->r_entry.next,
-                                               m_qp->r_entry.prev,
-                                               m_qp->r_entry.head,
-                                               &smd->qprlist);
-
                                        m_pi_pending_wr(m_qp, &data); /* RR's and scif_sendto */
                                        m_qp = get_next_entry(&m_qp->r_entry, &smd->qprlist);
                                }