From 06efc6ab82b6feebe57dbe180495c10d32e5548c Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Sun, 9 Mar 2014 11:49:48 -0700 Subject: [PATCH] mpxyd: init proxy WC buffer queues for new queue management change device destroy to dequeue SMD immediately. change TX thread to run whenever active device is queued. Signed-off-by: Arlin Davis --- dapl/svc/mpxyd.c | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index b05ac63..dfbc06c 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -296,6 +296,8 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd) mcm_cq_t *m_cq, *next_cq; mcm_mr_t *m_mr, *next_mr; + remove_entry(&smd->entry); /* remove off md->smd_list */ + /* free cm_id port */ if (smd->cm_id) { mcm_free_port(smd->md->ports, smd->cm_id); @@ -400,8 +402,6 @@ void mpxy_destroy_smd(mcm_scif_dev_t *smd) if (smd->ref_cnt) mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt); - remove_entry(&smd->entry); - mpxy_destroy_bpool(smd); mlog(8, " proxy buffer pools destroyed \n"); @@ -465,9 +465,9 @@ static int create_smd_bpool(mcm_scif_dev_t *smd) mlog(0, "failed to allocate smd m_bu_wc, m_len=%d, ERR: %d\n", wcq_len, ret); return -1; } - smd->m_buf_hd = 1; + smd->m_buf_hd = 0; smd->m_buf_tl = 0; - smd->m_buf_end = wcq_len/sizeof(mcm_buf_wc_t); + smd->m_buf_end = (wcq_len/sizeof(mcm_buf_wc_t)) - 1; mlog(8, " m_buf_wc %p, len %d allocated \n", smd->m_buf_wc, wcq_len); @@ -513,9 +513,9 @@ static int create_smd_bpool(mcm_scif_dev_t *smd) mlog(8, " m_buf_wc_r %p, len %d, entries %d \n", smd->m_buf_wc_r, wcq_len, wcq_len/sizeof(mcm_buf_wc_t)); - smd->m_buf_hd_r = 1; + smd->m_buf_hd_r = 0; smd->m_buf_tl_r = 0; - smd->m_buf_end_r = wcq_len/sizeof(mcm_buf_wc_t); + smd->m_buf_end_r = (wcq_len/sizeof(mcm_buf_wc_t)) - 1; return 0; } @@ -538,6 +538,7 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e if (ret) { mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n", ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret); + smd->cmd_buf = NULL; goto err; } mlog(8, "Allocated smd cmd_buf = %p len %d\n", @@ -592,6 +593,7 @@ err: free(smd->cmd_buf); if (smd->ports) free(smd->ports); + mpxy_destroy_bpool(smd); free(smd); } @@ -735,7 +737,6 @@ void sig_handler( int signum ) finished = 1; } - static void mpxy_set_thread_priority() { int policy; @@ -764,7 +765,6 @@ static void mpxy_set_thread_priority() pthread_self(), params.sched_priority); } - void mpxy_tx_thread(void *mic_client) { mcm_client_t *mc = (mcm_client_t*)mic_client; @@ -773,7 +773,7 @@ void mpxy_tx_thread(void *mic_client) struct mcm_cq *m_cq; struct mcm_qp *m_qp; struct mcm_fd_set *set; - int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt; + int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt, smd_cnt; char rbuf[2]; if (mcm_affinity) { @@ -800,7 +800,7 @@ void mpxy_tx_thread(void *mic_client) mpxy_lock(&mc->txlock); mcm_fd_zero(set); mcm_fd_set(mc->tx_pipe[0], set, POLLIN); - data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0; + data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0, smd_cnt=0; for (i=0;imdev[i]; if (md->ibctx == NULL) @@ -810,6 +810,7 @@ void mpxy_tx_thread(void *mic_client) mpxy_lock(&md->slock); smd = get_head_entry(&md->smd_list); while (smd && !smd->destroy) { + smd_cnt++; smd->th_ref_cnt++; mpxy_unlock(&md->slock); @@ -843,13 +844,17 @@ void mpxy_tx_thread(void *mic_client) rf_cnt += ((uint64_t *)md->cntrs)[MCM_SCIF_READ_FROM]; rd_cnt += ((uint64_t *)md->cntrs)[MCM_QP_READ_DONE]; } - time_ms = (data || events) ? 0:-1; + time_ms = smd_cnt ? 0:-1; mpxy_unlock(&mc->txlock); - if (time_ms) mlog(0x20, "TX sleep WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt); + if (time_ms) mlog(0x10, "TX sleep WR %d RF %d RD %d\n", wr_cnt,rf_cnt,rd_cnt); mcm_select(set, time_ms); - if (time_ms) mlog(0x20, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt); - if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) - read(mc->tx_pipe[0], rbuf, 2); + if (time_ms) mlog(0x10, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt); + if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) { + int cnt = 0; + while (read(mc->tx_pipe[0], rbuf, 1) > 0) + cnt++; + mlog(0x10, " PIPE r_cnt %d\n", cnt); + } } mlog(0, "TX thread exiting\n"); } @@ -1072,15 +1077,12 @@ void mpxy_rx_thread(void *mic_client) mpxy_lock(&md->slock); smd = get_head_entry(&md->smd_list); while (smd && !smd->destroy) { - mlog(0x8, " smd %p destroy %d refs %d\n", - smd, smd->destroy, smd->th_ref_cnt); smd->th_ref_cnt++; mpxy_unlock(&md->slock); mpxy_lock(&smd->cqrlock); m_cq = get_head_entry(&smd->cqrlist); while (m_cq) { - mlog(0x8, " CQr %p \n", m_cq); m_rcv_event(m_cq, &data); /* chk receive requests, initiate RR's */ if (m_cq->ib_ch) mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN); @@ -1091,12 +1093,6 @@ void mpxy_rx_thread(void *mic_client) mpxy_lock(&smd->qprlock); m_qp = get_head_entry(&smd->qprlist); while (m_qp) { - mlog(0x8, " QPr %p nxt %p prv %p hd %p qprlist %p\n", - m_qp, m_qp->r_entry.next, - m_qp->r_entry.prev, - m_qp->r_entry.head, - &smd->qprlist); - m_pi_pending_wr(m_qp, &data); /* RR's and scif_sendto */ m_qp = get_next_entry(&m_qp->r_entry, &smd->qprlist); } -- 2.41.0