mcm_cq_t *m_cq, *next_cq;
mcm_mr_t *m_mr, *next_mr;
+ remove_entry(&smd->entry); /* remove off md->smd_list */
+
/* free cm_id port */
if (smd->cm_id) {
mcm_free_port(smd->md->ports, smd->cm_id);
if (smd->ref_cnt)
mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt);
- remove_entry(&smd->entry);
-
mpxy_destroy_bpool(smd);
mlog(8, " proxy buffer pools destroyed \n");
mlog(0, "failed to allocate smd m_bu_wc, m_len=%d, ERR: %d\n", wcq_len, ret);
return -1;
}
- smd->m_buf_hd = 1;
+ smd->m_buf_hd = 0;
smd->m_buf_tl = 0;
- smd->m_buf_end = wcq_len/sizeof(mcm_buf_wc_t);
+ smd->m_buf_end = (wcq_len/sizeof(mcm_buf_wc_t)) - 1;
mlog(8, " m_buf_wc %p, len %d allocated \n", smd->m_buf_wc, wcq_len);
mlog(8, " m_buf_wc_r %p, len %d, entries %d \n",
smd->m_buf_wc_r, wcq_len, wcq_len/sizeof(mcm_buf_wc_t));
- smd->m_buf_hd_r = 1;
+ smd->m_buf_hd_r = 0;
smd->m_buf_tl_r = 0;
- smd->m_buf_end_r = wcq_len/sizeof(mcm_buf_wc_t);
+ smd->m_buf_end_r = (wcq_len/sizeof(mcm_buf_wc_t)) - 1;
return 0;
}
if (ret) {
mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n",
ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret);
+ smd->cmd_buf = NULL;
goto err;
}
mlog(8, "Allocated smd cmd_buf = %p len %d\n",
free(smd->cmd_buf);
if (smd->ports)
free(smd->ports);
+
mpxy_destroy_bpool(smd);
free(smd);
}
finished = 1;
}
-
static void mpxy_set_thread_priority()
{
int policy;
pthread_self(), params.sched_priority);
}
-
void mpxy_tx_thread(void *mic_client)
{
mcm_client_t *mc = (mcm_client_t*)mic_client;
struct mcm_cq *m_cq;
struct mcm_qp *m_qp;
struct mcm_fd_set *set;
- int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt;
+ int i, time_ms, data, events, cpu_id, wr_cnt, rf_cnt, rd_cnt, smd_cnt;
char rbuf[2];
if (mcm_affinity) {
mpxy_lock(&mc->txlock);
mcm_fd_zero(set);
mcm_fd_set(mc->tx_pipe[0], set, POLLIN);
- data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0;
+ data = 0, events = 0, wr_cnt=0, rf_cnt=0, rd_cnt=0, smd_cnt=0;
for (i=0;i<MCM_IB_MAX;i++) {
md = &mc->mdev[i];
if (md->ibctx == NULL)
mpxy_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd && !smd->destroy) {
+ smd_cnt++;
smd->th_ref_cnt++;
mpxy_unlock(&md->slock);
rf_cnt += ((uint64_t *)md->cntrs)[MCM_SCIF_READ_FROM];
rd_cnt += ((uint64_t *)md->cntrs)[MCM_QP_READ_DONE];
}
- time_ms = (data || events) ? 0:-1;
+ time_ms = smd_cnt ? 0:-1;
mpxy_unlock(&mc->txlock);
- if (time_ms) mlog(0x20, "TX sleep WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
+ if (time_ms) mlog(0x10, "TX sleep WR %d RF %d RD %d\n", wr_cnt,rf_cnt,rd_cnt);
mcm_select(set, time_ms);
- if (time_ms) mlog(0x20, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
- if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN)
- read(mc->tx_pipe[0], rbuf, 2);
+ if (time_ms) mlog(0x10, "TX wake WR %d RF %d RD %d\n",wr_cnt,rf_cnt,rd_cnt);
+ if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) {
+ int cnt = 0;
+ while (read(mc->tx_pipe[0], rbuf, 1) > 0)
+ cnt++;
+ mlog(0x10, " PIPE r_cnt %d\n", cnt);
+ }
}
mlog(0, "TX thread exiting\n");
}
mpxy_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd && !smd->destroy) {
- mlog(0x8, " smd %p destroy %d refs %d\n",
- smd, smd->destroy, smd->th_ref_cnt);
smd->th_ref_cnt++;
mpxy_unlock(&md->slock);
mpxy_lock(&smd->cqrlock);
m_cq = get_head_entry(&smd->cqrlist);
while (m_cq) {
- mlog(0x8, " CQr %p \n", m_cq);
m_rcv_event(m_cq, &data); /* chk receive requests, initiate RR's */
if (m_cq->ib_ch)
mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN);
mpxy_lock(&smd->qprlock);
m_qp = get_head_entry(&smd->qprlist);
while (m_qp) {
- mlog(0x8, " QPr %p nxt %p prv %p hd %p qprlist %p\n",
- m_qp, m_qp->r_entry.next,
- m_qp->r_entry.prev,
- m_qp->r_entry.head,
- &smd->qprlist);
-
m_pi_pending_wr(m_qp, &data); /* RR's and scif_sendto */
m_qp = get_next_entry(&m_qp->r_entry, &smd->qprlist);
}