From d452c3780f4a68d27e46611fb31f2a96197ee2f0 Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Fri, 1 Feb 2013 17:33:17 -0800 Subject: [PATCH] mpxyd: cm scaling bug fixes and profiling New CM thread to help with CM scale out. Testing with dtestcm with 1000's of connections. MPI testing up to 60ppn on KNC nodes. Add new disc timers and disconnect logging for debug. Add cleanup for IB device during service termination. Add profiling of device and CM operations to help debug scaling issues Signed-off-by: Arlin Davis --- dapl/svc/mpxyd.c | 547 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 456 insertions(+), 91 deletions(-) diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index ed85d2d..9ecf780 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -94,9 +94,12 @@ static int mix_max_msg_mb = 64; static int mcm_depth = 500; static int mcm_size = 256; static int mcm_signal = 100; -static int mcm_retry = 3; -static int mcm_rep_ms = 1000; +static int mcm_max_rcv = 20; +static int mcm_retry = 10; +static int mcm_disc_retry = 5; +static int mcm_rep_ms = 1600; static int mcm_rtu_ms = 800; +static int mcm_dreq_ms = 1000; static FILE *logfile; static pthread_mutex_t flock; @@ -156,6 +159,7 @@ typedef struct mcm_ib_dev { int cm_timer; int rep_time; int rtu_time; + void *cntrs; } mcm_ib_dev_t; @@ -188,6 +192,7 @@ typedef struct mcm_qp { uint64_t wr_min; /* IB write profiling, with completions */ uint64_t wr_max; uint64_t wr_avg; + } mcm_qp_t; /* DAPL MCM CQ object, id in entry */ @@ -272,27 +277,208 @@ typedef struct mcm_scif_dev { } mcm_scif_dev_t; -/* 1-16 MIC nodes, 1-8 IB ports, 2 threads (op/cm/events and data) each node */ +/* 1-16 MIC nodes, 1-8 IB ports, 3 threads (op/events, CM, and rdma) each node */ #define MCM_IB_MAX 8 #define MCM_CLIENT_MAX 16 typedef struct mcm_client { uint16_t scif_id; int op_pipe[2]; int tx_pipe[2]; - int op_cpuid; - int tx_cpuid; - cpu_set_t mic_mask; + int cm_pipe[2]; cpu_set_t op_mask; cpu_set_t tx_mask; + cpu_set_t cm_mask; pthread_mutex_t oplock; pthread_mutex_t txlock; + pthread_mutex_t cmlock; pthread_t tx_thread; pthread_t op_thread; + pthread_t cm_thread; mcm_ib_dev_t mdev[MCM_IB_MAX]; } mcm_client_t; static mcm_client_t mcm_client_list[MCM_CLIENT_MAX]; +/* Counters */ +static char *mcm_cntr_names[] = { + "MCM_PD_CREATE", + "MCM_PD_FREE", + "MCM_MR_CREATE", + "MCM_MR_FREE", + "MCM_CQ_CREATE", + "MCM_CQ_FREE", + "MCM_CQ_POLL", + "MCM_CQ_REARM", + "MCM_CQ_EVENT", + "MCM_QP_CREATE", + "MCM_QP_SEND", + "MCM_QP_WRITE", + "MCM_QP_READ", + "MCM_QP_FREE", + "MCM_QP_EVENT", + "MCM_SRQ_CREATE", + "MCM_SRQ_FREE", + "MCM_MEM_ALLOC", + "MCM_MEM_ALLOC_DATA", + "MCM_MEM_FREE", + "MCM_ASYNC_ERROR", + "MCM_ASYNC_QP_ERROR", + "MCM_ASYNC_CQ_ERROR", + "MCM_SCIF_SEND", + "MCM_SCIF_RECV", + "MCM_SCIF_READ_FROM", + "MCM_SCIF_WRITE_TO", + "MCM_SCIF_SIGNAL", + "MCM_LISTEN_CREATE", + "MCM_LISTEN_CREATE_ANY", + "MCM_LISTEN_FREE", + "MCM_CM_CONN_EVENT", + "MCM_CM_DISC_EVENT", + "MCM_CM_TIMEOUT_EVENT", + "MCM_CM_ERR_EVENT", + "MCM_CM_TX_POLL", + "MCM_CM_RX_POLL", + "MCM_CM_MSG_OUT", + "MCM_CM_MSG_IN", + "MCM_CM_MSG_POST", + "MCM_CM_REQ_OUT", + "MCM_CM_REQ_IN", + "MCM_CM_REQ_ACCEPT", + "MCM_CM_REP_OUT", + "MCM_CM_REP_IN", + "MCM_CM_RTU_OUT", + "MCM_CM_RTU_IN", + "MCM_CM_REJ_OUT", + "MCM_CM_REJ_IN", + "MCM_CM_REJ_USER_OUT", + "MCM_CM_REJ_USER_IN", + "MCM_CM_ACTIVE_EST", + "MCM_CM_PASSIVE_EST", + "MCM_CM_AH_REQ_OUT", + "MCM_CM_AH_REQ_IN", + "MCM_CM_AH_RESOLVED", + "MCM_CM_DREQ_OUT", + "MCM_CM_DREQ_IN", + "MCM_CM_DREQ_DUP", + "MCM_CM_DREP_OUT", + "MCM_CM_DREP_IN", + "MCM_CM_MRA_OUT", + "MCM_CM_MRA_IN", + "MCM_CM_REQ_FULLQ_POLL", + "MCM_CM_ERR", + "MCM_CM_ERR_REQ_FULLQ", + "MCM_CM_ERR_REQ_DUP", + "MCM_CM_ERR_REQ_RETRY", + "MCM_CM_ERR_REP_DUP", + "MCM_CM_ERR_REP_RETRY", + "MCM_CM_ERR_RTU_DUP", + "MCM_CM_ERR_RTU_RETRY", + "MCM_CM_ERR_REFUSED", + "MCM_CM_ERR_RESET", + "MCM_CM_ERR_TIMEOUT", + "MCM_CM_ERR_REJ_TX", + "MCM_CM_ERR_REJ_RX", + "MCM_CM_ERR_DREQ_DUP", + "MCM_CM_ERR_DREQ_RETRY", + "MCM_CM_ERR_DREP_DUP", + "MCM_CM_ERR_DREP_RETRY", + "MCM_CM_ERR_MRA_DUP", + "MCM_CM_ERR_MRA_RETRY", + "MCM_CM_ERR_UNEXPECTED_STATE", + "MCM_CM_ERR_UNEXPECTED_MSG", +}; + +typedef enum mcm_counters +{ + MCM_PD_CREATE, + MCM_PD_FREE, + MCM_MR_CREATE, + MCM_MR_FREE, + MCM_CQ_CREATE, + MCM_CQ_FREE, + MCM_CQ_POLL, + MCM_CQ_REARM, + MCM_CQ_EVENT, + MCM_QP_CREATE, + MCM_QP_SEND, + MCM_QP_WRITE, + MCM_QP_READ, + MCM_QP_FREE, + MCM_QP_EVENT, + MCM_SRQ_CREATE, + MCM_SRQ_FREE, + MCM_MEM_ALLOC, + MCM_MEM_ALLOC_DATA, + MCM_MEM_FREE, + MCM_ASYNC_ERROR, + MCM_ASYNC_QP_ERROR, + MCM_ASYNC_CQ_ERROR, + MCM_SCIF_SEND, + MCM_SCIF_RECV, + MCM_SCIF_READ_FROM, + MCM_SCIF_WRITE_TO, + MCM_SCIF_SIGNAL, + MCM_LISTEN_CREATE, + MCM_LISTEN_CREATE_ANY, + MCM_LISTEN_FREE, + MCM_CM_CONN_EVENT, + MCM_CM_DISC_EVENT, + MCM_CM_TIMEOUT_EVENT, + MCM_CM_ERR_EVENT, + MCM_CM_TX_POLL, + MCM_CM_RX_POLL, + MCM_CM_MSG_OUT, + MCM_CM_MSG_IN, + MCM_CM_MSG_POST, + MCM_CM_REQ_OUT, + MCM_CM_REQ_IN, + MCM_CM_REQ_ACCEPT, + MCM_CM_REP_OUT, + MCM_CM_REP_IN, + MCM_CM_RTU_OUT, + MCM_CM_RTU_IN, + MCM_CM_REJ_OUT, + MCM_CM_REJ_IN, + MCM_CM_REJ_USER_OUT, + MCM_CM_REJ_USER_IN, + MCM_CM_ACTIVE_EST, + MCM_CM_PASSIVE_EST, + MCM_CM_AH_REQ_OUT, + MCM_CM_AH_REQ_IN, + MCM_CM_AH_RESOLVED, + MCM_CM_DREQ_OUT, + MCM_CM_DREQ_IN, + MCM_CM_DREQ_DUP, + MCM_CM_DREP_OUT, + MCM_CM_DREP_IN, + MCM_CM_MRA_OUT, + MCM_CM_MRA_IN, + MCM_CM_REQ_FULLQ_POLL, + MCM_CM_ERR, + MCM_CM_ERR_REQ_FULLQ, + MCM_CM_ERR_REQ_DUP, + MCM_CM_ERR_REQ_RETRY, + MCM_CM_ERR_REP_DUP, + MCM_CM_ERR_REP_RETRY, + MCM_CM_ERR_RTU_DUP, + MCM_CM_ERR_RTU_RETRY, + MCM_CM_ERR_REFUSED, + MCM_CM_ERR_RESET, + MCM_CM_ERR_TIMEOUT, + MCM_CM_ERR_REJ_TX, + MCM_CM_ERR_REJ_RX, + MCM_CM_ERR_DREQ_DUP, + MCM_CM_ERR_DREQ_RETRY, + MCM_CM_ERR_DREP_DUP, + MCM_CM_ERR_DREP_RETRY, + MCM_CM_ERR_MRA_DUP, + MCM_CM_ERR_MRA_RETRY, + MCM_CM_ERR_UNEXPECTED_STATE, + MCM_CM_ERR_UNEXPECTED_MSG, + MCM_ALL_COUNTERS, /* MUST be last */ + +} MCM_COUNTERS; + /* forward prototypes */ static void mcm_cm_disc(mcm_cm_t *m_cm); static int mcm_cm_req_out(mcm_cm_t *m_cm); @@ -304,6 +490,7 @@ static void m_qp_free(struct mcm_qp *m_qp); static void m_cm_free(struct mcm_cm *m_cm); void mpxy_op_thread(void *mic_client); void mpxy_tx_thread(void *mic_client); +void mpxy_cm_thread(void *mic_client); static inline uint32_t mcm_ts_us(void) { @@ -343,6 +530,22 @@ static void mpxy_write(int level, const char *format, ...) va_end(args); } +#define MCNTR(mdev, cntr) ((uint64_t *)mdev->cntrs)[cntr]++ + +static void md_cntr_log(mcm_ib_dev_t *md, int counter, int reset) { + int i; + + for (i = 0; i < MCM_ALL_COUNTERS; i++) { + if ((counter == i) || (counter == MCM_ALL_COUNTERS)) { + if (((uint64_t *)md->cntrs)[i]) { + mlog(0, "%s = %u\n", mcm_cntr_names[i], ((uint64_t *)md->cntrs)[i]); + if (reset) + ((uint64_t *)md->cntrs)[i] = 0; + } + } + } +} + /* link list helper resources */ static void init_list(LLIST_ENTRY *head) { @@ -634,15 +837,17 @@ static inline int scif_send_msg(scif_epd_t ep, void *msg, int len) int ret; while (len) { + mlog(2, " scif_send - ep %d, len=%d \n", ep, len); ret = scif_send(ep, msg, len, SCIF_SEND_BLOCK); + mlog(2, " scif_sent - ep %d, len=%d \n", ep, len); if (ret == -1) { mlog(0, " ERR: scif_send - ep %d, %s, len=%d \n", ep, strerror(errno), len); return -1; } if (ret < len) { - mlog(0, " WARN: scif_send - ep %d, blocked len=%d, sent=%d\n", - ep, strerror(errno), len, ret); + mlog(0, " WARNING: scif_send - ep %d, blocked len=%d, sent=%d\n", + ep, len, ret); } len -= ret; } @@ -660,10 +865,11 @@ static int init_scif() for (i=0; iscif_id = 0; - if (pipe(mc->op_pipe) || pipe(mc->tx_pipe)) + if (pipe(mc->op_pipe) || pipe(mc->tx_pipe) || pipe(mc->cm_pipe)) return -1; pthread_mutex_init(&mc->oplock, NULL); pthread_mutex_init(&mc->txlock, NULL); + pthread_mutex_init(&mc->cmlock, NULL); for (ii=0; ii< MCM_IB_MAX; ii++) { md = &mc->mdev[ii]; memset((void *)md, 0, sizeof(mcm_ib_dev_t)); @@ -715,7 +921,25 @@ static void close_scif() static void close_ib() { - /* any cleanup ??, server thread should do the work */ + int i,ii; + mcm_client_t *mc; + mcm_ib_dev_t *md; + + /* clean up device resources */ + for (i=0; imdev[ii]; + if (md->cntrs) { + free(md->cntrs); + md->cntrs = NULL; + } + if (md->ibctx) { + ibv_close_device(md->ibctx); + md->ibctx = NULL; + } + } + } return; } @@ -728,7 +952,6 @@ static int config_fd(int fd) mlog(0, " config_fd: fcntl on fd %d ERR %d %s\n", fd, opts, strerror(errno)); return errno; } - return 0; } @@ -1007,14 +1230,15 @@ static void mcm_qlisten(mcm_scif_dev_t *smd, mcm_cm_t *cm) pthread_mutex_unlock(&smd->llock); } /* dequeue listen object from listen list */ -static void mcm_dqlisten(mcm_scif_dev_t *smd, mcm_cm_t *cm) +static void mcm_dqlisten(mcm_scif_dev_t *smd, mcm_cm_t *cm, uint16_t port) { pthread_mutex_lock(&smd->llock); remove_entry(&cm->entry); + mcm_free_port(smd->md->ports, port); + m_cm_free(cm); pthread_mutex_unlock(&smd->llock); } - /* * Open IB device */ @@ -1269,6 +1493,7 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) m_cm_free(m_cm); m_cm = next_cm; } + init_list(&smd->llist); pthread_mutex_unlock(&smd->llock); mlog(1, " cm listen list destroyed \n"); @@ -1280,6 +1505,7 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) m_cm_free(m_cm); m_cm = next_cm; } + init_list(&smd->clist); pthread_mutex_unlock(&smd->clock); mlog(1, " cm connection list destroyed \n"); @@ -1290,8 +1516,10 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) m_cq_free(m_cq); m_cq = next_cq; } + init_list(&smd->cqlist); pthread_mutex_unlock(&smd->cqlock); mlog(1, " cq_list destroyed \n"); + pthread_mutex_lock(&smd->qplock); m_qp = get_head_entry(&smd->qplist); while (m_qp) { @@ -1299,6 +1527,7 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) m_qp_free(m_qp); m_qp = next_qp; } + init_list(&smd->qplist); pthread_mutex_unlock(&smd->qplock); mlog(1, " qp_list destroyed \n"); @@ -1323,6 +1552,8 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) pthread_mutex_destroy(&smd->cqlock); pthread_mutex_destroy(&smd->mrlock); + md_cntr_log(smd->md, MCM_ALL_COUNTERS, 1); + smd->md = NULL; free(smd); } @@ -1441,24 +1672,33 @@ static mcm_scif_dev_t *mix_open_device(char *name, int port, scif_epd_t op_ep, s mc = &mcm_client_list[node]; + /* new device, both op and tx thread sync */ + pthread_mutex_lock(&mc->oplock); + pthread_mutex_lock(&mc->txlock); + pthread_mutex_lock(&mc->cmlock); + /* New MIC node, start up OP and TX threads per node */ if (!mc->scif_id) { mc->scif_id = node; if (pthread_create(&mc->op_thread, NULL, (void *(*)(void *))mpxy_op_thread, (void*)mc)) { mlog(0, " op pthread_create ERR: %s\n", strerror(errno)); - return NULL; + goto err; } if (pthread_create(&mc->tx_thread, NULL, (void *(*)(void *))mpxy_tx_thread, (void*)mc)) { pthread_cancel(mc->op_thread); mlog(0, " tx pthread_create ERR: %s\n", strerror(errno)); - return NULL; + goto err; + } + if (pthread_create(&mc->cm_thread, NULL, + (void *(*)(void *))mpxy_cm_thread, (void*)mc)) { + pthread_cancel(mc->op_thread); + pthread_cancel(mc->tx_thread); + mlog(0, " cm pthread_create ERR: %s\n", strerror(errno)); + goto err; } } - /* new device, both op and tx thread sync */ - pthread_mutex_lock(&mc->oplock); - pthread_mutex_lock(&mc->txlock); for (i=0; imdev[i]; @@ -1481,6 +1721,12 @@ static mcm_scif_dev_t *mix_open_device(char *name, int port, scif_epd_t op_ep, s pthread_mutex_init(&md->slock, NULL); pthread_mutex_init(&md->plock, NULL); pthread_mutex_init(&md->txlock, NULL); + md->cntrs = malloc(sizeof(uint64_t) * MCM_ALL_COUNTERS); + if (!md->cntrs) { + free(md); + goto err; + } + memset(md->cntrs, 0, sizeof(uint64_t) * MCM_ALL_COUNTERS); strcpy(md->name, name); md->mc = mc; md->port = port; @@ -1501,10 +1747,13 @@ found: insert_tail(&smd->entry, &md->smd_list, (void *)smd); pthread_mutex_unlock(&md->slock); + /* new device, FD's to add to poll in threads */ write(mc->op_pipe[1], "w", sizeof "w"); /* signal op_thread */ + write(mc->cm_pipe[1], "w", sizeof "w"); /* signal cm_thread */ err: pthread_mutex_unlock(&mc->oplock); pthread_mutex_unlock(&mc->txlock); + pthread_mutex_unlock(&mc->cmlock); return smd; } @@ -1525,7 +1774,6 @@ static void mix_close_device(mcm_ib_dev_t *md, mcm_scif_dev_t *smd) mcm_destroy_smd(smd); mlog(1, " freed smd %p\n", md, smd); - return; } @@ -1623,19 +1871,16 @@ static int mix_listen_free(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg) pthread_mutex_lock(&smd->llock); cm = get_head_entry(&smd->llist); while (cm) { - if (cm->sid == (uint16_t)pmsg->req_id) { - remove_entry(&cm->entry); - mcm_free_port(smd->md->ports, (uint16_t)pmsg->req_id); - m_cm_free(cm); + if (cm->sid == (uint16_t)pmsg->req_id) break; - } cm = get_next_entry(&cm->entry, &smd->llist); } pthread_mutex_unlock(&smd->llock); - if (cm) + if (cm) { + mcm_dqlisten(smd, cm, (uint16_t)pmsg->req_id); pmsg->status = MIX_SUCCESS; - else + } else pmsg->status = MIX_EINVAL; /* send back response */ @@ -2008,10 +2253,11 @@ static void m_qp_free(struct mcm_qp *m_qp) m_qp->ib_qp = NULL; remove_entry(&m_qp->entry); +#ifdef MCM_PROFILE if (mcm_profile && m_qp->rf_min) mlog(0, " QP (%p) PERF scif_readfrom() times (usecs): max %u min %u avg %u\n", m_qp, m_qp->rf_max, m_qp->rf_min, m_qp->rf_avg); - +#endif /* resource pools and qp object */ destroy_mbuf_pool(m_qp); destroy_wrbuf_pool(m_qp); @@ -2242,7 +2488,14 @@ static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event) msg.cm_ctx = m_cm->cm_ctx; msg.event = event; - mlog(1, " MIX_CM_EVENT: cm %p cm_id %d, ctx %p, event 0x%x\n", m_cm, msg.cm_id, msg.cm_ctx, event); + if (event == DAT_CONNECTION_EVENT_DISCONNECTED) { + pthread_mutex_lock(&m_cm->lock); + m_cm->state = MCM_FREE; + pthread_mutex_unlock(&m_cm->lock); + } + + mlog(1, " MIX_CM_EVENT: cm %p cm_id %d, ctx %p, event 0x%x\n", + m_cm, m_cm->entry.tid, msg.cm_ctx, event); len = sizeof(dat_mix_cm_event_t); return (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len)); @@ -2345,11 +2598,6 @@ static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) return 0; } - mlog(1," DREQ out -> dport 0x%x, dqpn 0x%x dlid 0x%x \n", - ntohs(m_cm->msg.dport), - ntohl(m_cm->msg.dqpn), - ntohs(m_cm->msg.daddr.lid)); - /* process DREQ */ mcm_cm_disc(m_cm); return 0; @@ -2495,7 +2743,6 @@ static int mix_cm_req_in(mcm_cm_t *cm, dat_mcm_msg_t *pkt, int pkt_len) if (scif_send_msg(acm->smd->scif_ev_ep, (void*)&msg, len)) return -1; - mlog(1, " success cm_id %d cm_ctx %p\n", msg.cm_id, msg.cm_ctx); return 0; } @@ -2505,7 +2752,7 @@ static int mix_cm_rtu_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) dat_mix_cm_t msg; int len; - mlog(1, " cm_id %d, ctx %p\n", m_cm->cm_id, m_cm->cm_ctx); + mlog(1, " CONN ESTABLISHED cm_id %d, ctx %p\n", m_cm->cm_id, m_cm->cm_ctx); /* Forward, as is, conn_reply message to MIC client, with remote QP info */ msg.hdr.ver = DAT_MIX_VER; @@ -2518,7 +2765,6 @@ static int mix_cm_rtu_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) if (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len)) return -1; - mlog(1, " success cm_id %d\n", msg.cm_id); pthread_mutex_lock(&m_cm->lock); m_cm->state = MCM_CONNECTED; pthread_mutex_unlock(&m_cm->lock); @@ -2538,7 +2784,7 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; } - mlog(1, " MIX_CM_REP_OUT: mic_ctx %p my_id %d, my_ctx %p, qp_id %d, pmsg_sz=%d \n", + mlog(1, " REP: mic_ctx %p my_id %d, my_ctx %p, qp_id %d, pmsg_sz=%d \n", pmsg->sp_ctx, pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id, sizeof(dat_mix_cm_t)); /* Find the CM for this reply */ @@ -2561,7 +2807,7 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); return -1; } - mlog(1, " MIX_CM_REP_OUT: found cm_id %d = %p qp_id %d = %p \n", + mlog(1, " REP: found cm_id %d = %p qp_id %d = %p \n", pmsg->cm_id, m_cm, pmsg->qp_id, m_cm->m_qp); mlog(1," QPt 0x%x -> d_port 0x%x, cqpn %x QPr %x lid 0x%x psize %d\n", @@ -2637,7 +2883,6 @@ static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) return 0; } - /* disconnect request from peer, unsolicited channel */ static int mix_cm_disc_in(mcm_cm_t *m_cm) { @@ -2654,7 +2899,6 @@ static int mix_cm_disc_in(mcm_cm_t *m_cm) if (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len)) return -1; - mlog(1, " MIX_CM_DISC_IN: cm_id %d\n", msg.req_id); return 0; } @@ -2707,6 +2951,7 @@ static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) wc.vendor_err = ret; mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); } + MCNTR(smd->md, MCM_QP_SEND); mlog(1, " exit: q_id %d, q_ctx %p, len %d\n", pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len); @@ -2734,7 +2979,7 @@ static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data) if (m_wr->wr_id == m_wr->wr.atomic.swap) { char *sbuf = (char*)m_wr->sg_list->addr; - +#ifdef MCM_PROFILE if (mcm_profile) { uint32_t diff = ((mcm_ts_us() + 1000000) - m_wr->xrc_remote_srq_num) % 1000000; uint32_t ops = (uint32_t)m_wr->wr.atomic.rkey; @@ -2756,14 +3001,14 @@ static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data) else m_qp->rf_avg = ((diff/ops + m_qp->rf_avg) / 2); } +#endif mlog(1, " tx_thread: m_wr %p wr_id %#016Lx data ready for IB write, 1st byte 0x%x last byte 0x%x, ln=%d\n", m_wr, m_wr->wr_id, sbuf[0], sbuf[m_wr->sg_list->length-1], m_wr->sg_list->length); - mlog(1, " tx_thread: wr_id %#016Lx next %p sglist %p sge %d op %d flgs %d idata 0x%x raddr %p rkey %x srq %x\n", + mlog(1, " tx_thread: wr_id %#016Lx next %p sglist %p sge %d op %d flgs %d idata 0x%x raddr %p rkey %x \n", m_wr->wr_id, m_wr->next, m_wr->sg_list, m_wr->num_sge, m_wr->opcode, - m_wr->send_flags, m_wr->imm_data, m_wr->wr.rdma.remote_addr, m_wr->wr.rdma.rkey, - m_wr->xrc_remote_srq_num); + m_wr->send_flags, m_wr->imm_data, m_wr->wr.rdma.remote_addr, m_wr->wr.rdma.rkey); ret = ibv_post_send(m_qp->ib_qp, m_wr, &bad_wr); if (ret) { @@ -2778,6 +3023,7 @@ static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data) wc.vendor_err = ret; mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); } + MCNTR(smd->md, MCM_QP_WRITE); mlog(1, " - qp %p wr %p wr_id %#016Lx posted tl=%d hd=%d\n", m_qp, m_wr, m_wr->wr_id, m_qp->wr_tl, m_qp->wr_hd); @@ -2836,7 +3082,9 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) m_wr->wr.atomic.swap = pmsg->wr.wr_id; m_wr->sg_list = m_sge; m_wr->next = 0; +#ifdef MCM_PROFILE m_wr->xrc_remote_srq_num = 0; +#endif m_wr->wr.rdma.remote_addr += total_offset; m_wr->num_sge = 0; @@ -2884,19 +3132,20 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) mlog(1, " SCIF_readfrom[%d] l_off %p, r_off %p, l_start 0x%x l_end 0x%x seg_len %d, len %d cacheln_off %d \n", i, l_off, r_off, l_start, l_end, seg_len, len, cacheln_off); - +#ifdef MCM_PROFILE if (mcm_profile) { m_wr->xrc_remote_srq_num = (uint32_t)mcm_ts_us(); m_wr->wr.atomic.rkey = (uint32_t)((m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end)+1; mlog(1, " op_thread: SCIF_readfrom on PERF CPU_id %d, ts=%d us, pending %d\n", sched_getcpu(), m_wr->xrc_remote_srq_num, m_wr->wr.atomic.rkey); } - +#endif ret = scif_readfrom(smd->scif_tx_ep, l_off, seg_len, r_off, 0); if (ret) { mlog(0, " ERR: scif_readfrom, ret %d\n", ret); return -1; } + MCNTR(smd->md, MCM_SCIF_READ_FROM); if (mix_shared_buffer) { smd->m_hd = l_end; @@ -2935,6 +3184,7 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) mlog(0," ERR: scif_fence_signal, ret %d \n", ret); return -1; } + MCNTR(smd->md, MCM_SCIF_SIGNAL); /* remove special flags unless it's the last segment */ /* NON-COMPLIANT: IMM segmented causes receiver RDMA length will be wrong */ @@ -2965,7 +3215,9 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) m_wr->wr.atomic.swap = pmsg->wr.wr_id; m_wr->sg_list = m_sge; m_wr->next = 0; +#ifdef MCM_PROFILE m_wr->xrc_remote_srq_num = 0; +#endif m_wr->wr.rdma.remote_addr += total_offset; m_wr->num_sge = 0; } @@ -2988,6 +3240,7 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) mlog(0," ERR: scif_fence_signal, ret %d \n", ret); return -1; } + MCNTR(smd->md, MCM_SCIF_SIGNAL); /* took WR slot, move head */ if (++m_qp->wr_hd == m_qp->wr_end) @@ -3078,7 +3331,7 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) mlog(0, " ERROR!!! unknown MIX operation: %d\n", phdr->op); return -1; } - + MCNTR(smd->md, MCM_SCIF_RECV); return ret; } @@ -3171,6 +3424,7 @@ retry: md->s_tl = (int)wc.wr_id; polled++; + MCNTR(md, MCM_CM_TX_POLL); goto retry; } return msg; @@ -3233,6 +3487,7 @@ bail: if (ret) mlog(0, " ERR: ibv_post_send() %s\n", strerror(errno)); + MCNTR(md, MCM_CM_MSG_OUT); pthread_mutex_unlock(&md->txlock); return ret; } @@ -3249,7 +3504,7 @@ static int mcm_post_rmsg(mcm_ib_dev_t *md, dat_mcm_msg_t *msg) sge.length = sizeof(dat_mcm_msg_t) + sizeof(struct ibv_grh); sge.lkey = md->mr_rbuf->lkey; sge.addr = (uintptr_t)((char *)msg - sizeof(struct ibv_grh)); - + MCNTR(md, MCM_CM_MSG_POST); return (ibv_post_recv(md->qp, &recv_wr, &recv_err)); } @@ -3271,6 +3526,11 @@ static int mcm_cm_rej_out(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, DAT_MCM_OP type) ntohs(smsg.daddr.lid), ntohl(smsg.dqpn), ntohs(smsg.dport)); + if (type == MCM_REJ_USER) + MCNTR(md, MCM_CM_REJ_USER_OUT); + else + MCNTR(md, MCM_CM_REJ_OUT); + return (mcm_send(md, &smsg, NULL, 0)); } @@ -3279,6 +3539,7 @@ static void mcm_cm_disc(mcm_cm_t *cm) int finalize = 1; pthread_mutex_lock(&cm->lock); + mlog(1," enter: state = %s \n", mcm_state_str(cm->state)); switch (cm->state) { case MCM_CONNECTED: /* CONSUMER: move to err state to flush */ @@ -3287,13 +3548,19 @@ static void mcm_cm_disc(mcm_cm_t *cm) /* send DREQ, event after DREP or DREQ timeout */ cm->state = MCM_DISC_PENDING; cm->msg.op = htons(MCM_DREQ); + cm->retries = 0; finalize = 0; /* wait for DREP */ + mlog(1, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n", + cm->retries+1, cm->entry.tid, + htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn), + htons(cm->msg.sport),htons(cm->msg.daddr.lid), + htonl(cm->msg.dqpn), htons(cm->msg.dport)); + MCNTR(cm->md, MCM_CM_DREQ_OUT); break; case MCM_DISC_PENDING: - /* DREQ timeout, retry twice and then give up */ - cm->msg.op = htons(MCM_DREQ); - if (cm->retries > 2) { - mlog(1, " CM_DREQ: RETRIES EXHAUSTED:" + /* DREQ timeout, retry */ + if (cm->retries > mcm_disc_retry) { + mlog(0, " DISC: RETRIES EXHAUSTED:" " %x %x %x -> %x %x %x\n", htons(cm->msg.saddr.lid), htonl(cm->msg.saddr.qpn), @@ -3301,20 +3568,39 @@ static void mcm_cm_disc(mcm_cm_t *cm) htons(cm->msg.daddr.lid), htonl(cm->msg.dqpn), htons(cm->msg.dport)); - finalize = 1; + cm->state = MCM_DISCONNECTED; + goto final; } + cm->msg.op = htons(MCM_DREQ); + finalize = 0; /* wait for DREP */ + mlog(1, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n", + cm->retries+1, cm->entry.tid, + htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn), + htons(cm->msg.sport),htons(cm->msg.daddr.lid), + htonl(cm->msg.dqpn), htons(cm->msg.dport)); + MCNTR(cm->md, MCM_CM_DREQ_OUT); break; case MCM_DISC_RECV: + MCNTR(cm->md, MCM_CM_DREQ_IN); /* CM_THREAD: move to err state to flush */ modify_qp(cm->m_qp->ib_qp, IBV_QPS_ERR,0,0,0); /* DREQ received, send DREP and schedule event, finalize */ cm->msg.op = htons(MCM_DREP); + cm->state = MCM_DISCONNECTED; + mlog(1, " DREQ_in: cm_id %d send DREP %x %x %x -> %x %x %x\n", + cm->entry.tid, htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn), + htons(cm->msg.sport),htons(cm->msg.daddr.lid), + htonl(cm->msg.dqpn), htons(cm->msg.dport)); + MCNTR(cm->md, MCM_CM_DREP_OUT); break; case MCM_DISCONNECTED: + mlog(1," state = %s already disconnected\n", mcm_state_str(cm->state) ); pthread_mutex_unlock(&cm->lock); + MCNTR(cm->md, MCM_CM_DREQ_DUP); return; default: + MCNTR(cm->md, MCM_CM_ERR_UNEXPECTED_STATE); mlog(1, " disconnect UNKNOWN state: qp %p cm %p %s %s" " %x %x %x %s %x %x %x r_id %x l_id %x\n", cm->m_qp, cm, cm->msg.saddr.qp_type == IBV_QPT_RC ? "RC" : "UD", @@ -3329,10 +3615,12 @@ static void mcm_cm_disc(mcm_cm_t *cm) cm->timer = mcm_time_us(); /* DREQ, expect reply */ mcm_send(cm->md, &cm->msg, NULL, 0); +final: pthread_mutex_unlock(&cm->lock); - - if (finalize) + if (finalize) { + MCNTR(cm->md, MCM_CM_DISC_EVENT); mix_cm_event(cm, DAT_CONNECTION_EVENT_DISCONNECTED); + } } static int mcm_cm_rep_out(mcm_cm_t *cm) @@ -3366,6 +3654,7 @@ static int mcm_cm_rep_out(mcm_cm_t *cm) htonl(cm->msg.daddr.qpn)); pthread_mutex_unlock(&cm->lock); + MCNTR(cm->md, MCM_CM_TIMEOUT_EVENT); mix_cm_event(cm, DAT_CONNECTION_EVENT_TIMED_OUT); return -1; } @@ -3376,6 +3665,7 @@ static int mcm_cm_rep_out(mcm_cm_t *cm) pthread_mutex_unlock(&cm->lock); return -1; } + MCNTR(cm->md, MCM_CM_REP_OUT); pthread_mutex_unlock(&cm->lock); return 0; } @@ -3383,33 +3673,38 @@ static int mcm_cm_rep_out(mcm_cm_t *cm) static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, int len) { - mlog(1, " cm %p state %s \n", cm, mcm_state_str(cm->state)); + mlog(1, " cm %p cm_id %d state %s \n", cm, cm->entry.tid, mcm_state_str(cm->state)); pthread_mutex_lock(&cm->lock); switch (cm->state) { case MCM_LISTEN: /* passive */ mlog(1, "LISTEN: req_in: l_cm %p, sid %d\n", cm, cm->sid); pthread_mutex_unlock(&cm->lock); mix_cm_req_in(cm, msg, len); + MCNTR(md, MCM_CM_REQ_IN); break; case MCM_RTU_PENDING: /* passive */ mlog(1, "RTU_PENDING: cm %p, my_id %d, cm_id %d\n", cm, cm->entry.tid, cm->cm_id); pthread_mutex_unlock(&cm->lock); mix_cm_rtu_in(cm, msg, len); + MCNTR(md, MCM_CM_RTU_IN); break; case MCM_REP_PENDING: /* active */ mlog(1, "REP_PENDING: cm %p, my_id %d, cm_id %d\n", cm, cm->entry.tid, cm->cm_id); pthread_mutex_unlock(&cm->lock); mix_cm_reply_in(cm, msg, len); + MCNTR(md, MCM_CM_REP_IN); break; case MCM_REP_RCV: /* active */ - if (ntohs(msg->op) == MCM_REP) + if (ntohs(msg->op) == MCM_REP) { mlog(1, "REP_RCV: DUPLICATE cm %p, my_id %d, cm_id %d\n", cm, cm->entry.tid, cm->cm_id); + MCNTR(md, MCM_CM_ERR_REP_DUP); + } pthread_mutex_unlock(&cm->lock); break; case MCM_CONNECTED: /* active and passive */ /* DREQ, change state and process */ - cm->retries = 2; if (ntohs(msg->op) == MCM_DREQ) { + mlog(1, "DREQ_in: cm %p, cm_id %d\n", cm, cm->entry.tid); cm->state = MCM_DISC_RECV; pthread_mutex_unlock(&cm->lock); mcm_cm_disc(cm); @@ -3417,7 +3712,7 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, } /* active: RTU was dropped, resend */ if (ntohs(msg->op) == MCM_REP) { - mlog(1, " RESEND RTU: op %s st %s [lid, port, cqp, iqp]:" + mlog(1, " REP_in resend RTU: op %s st %s [lid, port, cqp, iqp]:" " %x %x %x %x -> %x %x %x %x r_pid %x\n", mcm_op_str(ntohs(cm->msg.op)), mcm_state_str(cm->state), @@ -3426,14 +3721,15 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport), ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id)); - + MCNTR(md, MCM_CM_REP_IN); cm->msg.op = htons(MCM_RTU); mcm_send(cm->smd->md, &cm->msg, NULL, 0); } pthread_mutex_unlock(&cm->lock); break; case MCM_DISC_PENDING: /* active and passive */ - /* DREQ or DREP, finalize with event */ + MCNTR(md, MCM_CM_DREP_IN); + MCNTR(md, MCM_CM_DISC_EVENT); cm->state = MCM_DISCONNECTED; pthread_mutex_unlock(&cm->lock); mix_cm_event(cm, DAT_CONNECTION_EVENT_DISCONNECTED); @@ -3442,18 +3738,20 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, case MCM_FREE: /* DREQ dropped, resend */ if (ntohs(msg->op) == MCM_DREQ) { - mlog(1, " RESEND DREP: op %s st %s [lid, port, qpn]:" + MCNTR(md, MCM_CM_DREQ_DUP); + mlog(1, " DREQ_in resend DREP: cm_id %d op %s st %s [lid, port, qpn]:" " %x %x %x -> %x %x %x\n", - mcm_op_str(ntohs(msg->op)), + cm->entry.tid, mcm_op_str(ntohs(msg->op)), mcm_state_str(cm->state), - ntohs(msg->saddr.lid), - ntohs(msg->sport), - ntohl(msg->saddr.qpn), - ntohs(msg->daddr.lid), - ntohs(msg->dport), - ntohl(msg->daddr.qpn)); + ntohs(cm->msg.saddr.lid), + ntohs(cm->msg.sport), + ntohl(cm->msg.saddr.qpn), + ntohs(cm->msg.daddr.lid), + ntohs(cm->msg.dport), + ntohl(cm->msg.daddr.qpn)); cm->msg.op = htons(MCM_DREP); mcm_send(cm->smd->md, &cm->msg, NULL, 0); + MCNTR(md, MCM_CM_DREP_OUT); } else if (ntohs(msg->op) != MCM_DREP){ /* DREP ok to ignore, any other print warning */ @@ -3462,12 +3760,14 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, cm, mcm_op_str(ntohs(msg->op)), mcm_state_str(cm->state), ntohs(msg->sport), ntohl(msg->sqpn)); + MCNTR(cm->md, MCM_CM_ERR_UNEXPECTED_MSG); } pthread_mutex_unlock(&cm->lock); break; case MCM_REJECTED: if (ntohs(msg->op) == MCM_REJ_USER) { pthread_mutex_unlock(&cm->lock); + MCNTR(md, MCM_CM_REJ_USER_IN); break; } default: @@ -3475,7 +3775,7 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, " <- op %s, %s spsp %x sqpn %x slid %x\n", mcm_op_str(ntohs(msg->op)), mcm_state_str(cm->state), ntohs(msg->sport), ntohl(msg->sqpn), ntohs(msg->saddr.lid)); - + MCNTR(cm->md, MCM_CM_ERR_UNEXPECTED_STATE); pthread_mutex_unlock(&cm->lock); break; } @@ -3511,7 +3811,7 @@ retry_listenq: if (cm->state == MCM_DESTROY || cm->state == MCM_FREE) continue; - mlog(1, " CM %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n", + mlog(2, " CM %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n", mcm_state_str(cm->state), ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport), ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id), @@ -3551,8 +3851,8 @@ retry_listenq: ntohl(msg->dqpn), ntohl(msg->daddr.qpn), ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport), ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn)); - - return NULL; + MCNTR(cm->md, MCM_CM_ERR_REQ_DUP); + return NULL; } } } @@ -3594,7 +3894,7 @@ retry_listenq: ntohl(msg->d_id)); if (ntohs(msg->op) == MCM_DREP) { - /* DREP_DUP, counters */ + MCNTR(cm->md, MCM_CM_ERR_DREP_DUP); } } @@ -3646,6 +3946,7 @@ retry: } else notify = 0; + MCNTR(md, MCM_CM_RX_POLL); for (i = 0; i < ret; i++) { msg = (dat_mcm_msg_t*) (uintptr_t) wc[i].wr_id; @@ -3654,6 +3955,8 @@ retry: wc[i].byte_len, (void*)wc[i].wr_id, wc[i].src_qp); + MCNTR(md, MCM_CM_MSG_IN); + /* validate CM message, version */ if (ntohs(msg->ver) != DAT_MCM_VER) { mlog(1, " mcm_recv: UNKNOWN msg %p, ver %d\n", msg, msg->ver); @@ -3677,7 +3980,6 @@ retry: static int mcm_cm_req_out(mcm_cm_t *m_cm) { - mlog(1, " %s 0x%x %x 0x%x -> 0x%x %x 0x%x\n", mcm_state_str(m_cm->state), htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn), @@ -3693,9 +3995,11 @@ static int mcm_cm_req_out(mcm_cm_t *m_cm) htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn), htons(m_cm->msg.sport), htons(m_cm->msg.daddr.lid), htonl(m_cm->msg.dqpn), htons(m_cm->msg.dport)); - mix_cm_event(m_cm, DAT_CONNECTION_EVENT_TIMED_OUT); m_cm->state = MCM_FREE; - goto bail; + pthread_mutex_unlock(&m_cm->lock); + MCNTR(m_cm->md, MCM_CM_TIMEOUT_EVENT); + mix_cm_event(m_cm, DAT_CONNECTION_EVENT_TIMED_OUT); + return -1; } mlog(1, " m_cm %p, state = %d, retries =%d\n", m_cm, m_cm->state,m_cm->md->retries); @@ -3710,15 +4014,13 @@ static int mcm_cm_req_out(mcm_cm_t *m_cm) pthread_mutex_unlock(&m_cm->lock); return 0; bail: - /* send CM event */ pthread_mutex_unlock(&m_cm->lock); return -1; } static int mcm_cm_rtu_out(mcm_cm_t *m_cm) { - mlog(1, " %s 0x%x %x 0x%x -> 0x%x %x 0x%x\n", - mcm_state_str(m_cm->state), + mlog(1, " CONN ESTABLISHED 0x%x %x 0x%x -> 0x%x %x 0x%x\n", htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn), htons(m_cm->msg.sport), htons(m_cm->msg.daddr.lid), htonl(m_cm->msg.dqpn), htons(m_cm->msg.dport)); @@ -3771,6 +4073,7 @@ static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer) (time - cm->timer)/1000, cm->md->rep_time << cm->retries); cm->retries++; + MCNTR(cm->md, MCM_CM_ERR_REQ_RETRY); pthread_mutex_unlock(&cm->lock); mcm_cm_req_out(cm); pthread_mutex_lock(&cm->lock); @@ -3792,6 +4095,7 @@ static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer) (time - cm->timer)/1000, cm->md->rtu_time << cm->retries); cm->retries++; + MCNTR(cm->md, MCM_CM_ERR_REP_RETRY); pthread_mutex_unlock(&cm->lock); mcm_cm_rep_out(cm); pthread_mutex_lock(&cm->lock); @@ -3800,8 +4104,7 @@ static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer) break; case MCM_DISC_PENDING: *timer = cm->md->cm_timer; - /* wait longer each retry */ - if ((time - cm->timer)/1000 > (cm->md->rtu_time << cm->retries)) { + if ((time - cm->timer)/1000 > (mcm_dreq_ms << cm->retries)) { mlog(1, " CM_DREQ retry %d [lid, port, cqp, iqp]:" " %x %x %x %x -> %x %x %x %x r_pid %x Time(ms) %d > %d\n", cm->retries+1, @@ -3813,8 +4116,9 @@ static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer) (time - cm->timer)/1000, cm->md->rtu_time << cm->retries); cm->retries++; + MCNTR(cm->md, MCM_CM_ERR_DREQ_RETRY); pthread_mutex_unlock(&cm->lock); - mix_cm_disc_in(cm); + mcm_cm_disc(cm); pthread_mutex_lock(&cm->lock); break; } @@ -3883,8 +4187,9 @@ void mpxy_tx_thread(void *mic_client) if(sched_setaffinity( 0, sizeof(mc->tx_mask), &mc->tx_mask) == -1) mlog(0, "WARNING: could not set CPU Affinity (%s), continuing...\n", strerror(errno)); } - mlog(0, "TX thread (%x) started for MIC %p node_id %d, CPU_affinity(%d)=%d\n", - pthread_self(), mc, mc->scif_id, mcm_affinity, mcm_affinity ? (mcm_affinity_base + mc->scif_id):0 ); + mlog(0, "TX thread (%x) started for MIC %p node_id %d, CPU_affinity(%s)=%d\n", + pthread_self(), mc, mc->scif_id, mcm_affinity ? "SET":"UNSET", + mcm_affinity ? (mcm_affinity_base + mc->scif_id):0 ); while (!finished) { pthread_mutex_lock(&mc->txlock); @@ -3940,8 +4245,9 @@ void mpxy_op_thread(void *mic_client) if(sched_setaffinity( 0, sizeof(mc->op_mask), &mc->op_mask) == -1) mlog(0, "WARNING: could not set CPU Affinity (%s), continuing...\n", strerror(errno)); } - mlog(0, "OP/CM thread (%x) started for MIC %p node_id %d, CPU_affinity(%d)=%d\n", - pthread_self(), mc, mc->scif_id, mcm_affinity, mcm_affinity ? (mcm_affinity_base + mc->scif_id + 1):0); + mlog(0, "OP/CM thread (%x) started for MIC %p node_id %d, CPU_affinity(%s)=%d\n", + pthread_self(), mc, mc->scif_id, mcm_affinity ? "SET":"UNSET", + mcm_affinity ? (mcm_affinity_base + mc->scif_id + 1):0 ); /* FD array */ set = mcm_alloc_fd_set(); @@ -4025,17 +4331,75 @@ void mpxy_op_thread(void *mic_client) smd = next; } pthread_mutex_unlock(&md->slock); + } + pthread_mutex_unlock(&mc->oplock); + } + mlog(0, "OP,CM,Event thread exiting\n"); +} +void mpxy_cm_thread(void *mic_client) +{ + mcm_client_t *mc = (mcm_client_t*)mic_client; + struct mcm_ib_dev *md; + struct pollfd set[MCM_IB_MAX*3]; + int i, fds; + char rbuf[2]; + + if (mcm_affinity) { + CPU_ZERO( &mc->cm_mask ); + CPU_SET( mcm_affinity_base + mc->scif_id + 2, &mc->cm_mask ); + if(sched_setaffinity( 0, sizeof(mc->cm_mask), &mc->cm_mask) == -1) + mlog(0, "WARNING: could not set CPU Affinity (%s), continuing...\n", strerror(errno)); + } + mlog(0, "CM thread (%x) started for MIC %p node_id %d, CPU_affinity(%s)=%d\n", + pthread_self(), mc, mc->scif_id, mcm_affinity ? "SET":"UNSET", + mcm_affinity ? (mcm_affinity_base + mc->scif_id + 2):0 ); + + while (!finished) { + fds = 0; + pthread_mutex_lock(&mc->cmlock); + for (i=0;imdev[i]; + if (md->ibctx == NULL) + continue; + set[fds].fd = mc->cm_pipe[0]; + set[fds].events = POLLIN; + set[fds].revents = 0; + set[fds+1].fd = md->rch->fd; + set[fds+1].events = POLLIN; + set[fds+1].revents = 0; + set[fds+2].fd = md->ibctx->async_fd; + set[fds+2].events = POLLIN; + set[fds+2].revents = 0; + fds += 3; + } + pthread_mutex_unlock(&mc->cmlock); + + for (i=0;icmlock); + for (i=0;imdev[i]; + if (md->ibctx == NULL) + continue; + + if (mcm_poll(mc->cm_pipe[0], POLLIN) == POLLIN) + read(mc->cm_pipe[0], rbuf, 2); - /* MCM IB events: async device and CM msgs */ if (mcm_poll(md->rch->fd, POLLIN) == POLLIN) mcm_ib_recv(md); if (mcm_poll(md->ibctx->async_fd, POLLIN) == POLLIN) mcm_ib_async_event(md); } - pthread_mutex_unlock(&mc->oplock); + pthread_mutex_unlock(&mc->cmlock); } - mlog(0, "OP,CM,Event thread exiting\n"); + mlog(0, "CM thread exiting\n"); } @@ -4085,6 +4449,7 @@ static void mpxy_server(void) mlog(0, "server canceling threads for MIC node %d\n", mcm_client_list[i].scif_id); pthread_cancel(mcm_client_list[i].tx_thread); pthread_cancel(mcm_client_list[i].op_thread); + pthread_cancel(mcm_client_list[i].cm_thread); } } mlog(0, "all threads canceled, server down\n"); @@ -4165,7 +4530,7 @@ int main(int argc, char **argv) logfile = mpxy_open_log(); - mlog(0, "MIC CCL Proxy - SCIF/IB DAPL RDMA Proxy Service\n"); + mlog(0, "CCL Proxy - SCIF/IB DAPL RDMA Proxy Service\n"); mpxy_log_options(); if (init_scif()) { -- 2.41.0