From: Arlin Davis Date: Fri, 15 Mar 2013 21:27:17 +0000 (-0700) Subject: mpxyd: add inline support, eager completion, improve proxy resource management X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=a77d135161ece7369809e3161edfcad61eb9d8ec;p=~ardavis%2Fdapl.git mpxyd: add inline support, eager completion, improve proxy resource management Add inline support for MIX and IB dma channels Add eager completion, configurable, to signal writes or sends after scif_readfrom is signaled and all data is local to proxy instead of waiting for IB signal. User data on MIC is available for reuse. Combine sends and writes to mix_post_send command, provide ordering guarantees between inline and dma data. Allow's direct posting from OP thread is head of queue. Add new counters for inline and signaled IO. Extend m_wr to include flags for controlling eager completions and proxy buffer and work request management. cq event FD is now non-blocking and processed via TX thread instead of OP thread. Allows for polling > 1 event at a time. Signed-off-by: Arlin Davis --- diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index 296d0e8..fb8d503 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -84,11 +84,16 @@ static int mcm_profile = 0; /* scif-rdma cmd and data channel parameters */ static int mix_align = 64; static int mix_buffer_mb = 128; -static int mix_buffer_sg = 1048576; +static int mix_buffer_sg = 131072; +static int mix_buffer_sg_cnt = 100; static int mix_cmd_depth = 50; static int mix_cmd_size = 256; static int mix_shared_buffer = 1; static int mix_max_msg_mb = 64; +static int mix_inline_threshold = 256; +static int mix_eager_completion = 0; +static int mcm_ib_inline = 128; +static int mcm_ib_signal_rate = 20; /* cm parameters */ static int mcm_depth = 500; @@ -163,6 +168,23 @@ typedef struct mcm_ib_dev { } mcm_ib_dev_t; +enum mcm_send_flags { + M_SEND_POSTED = 1 << 0, /* m_wr already posted */ + M_SEND_CN_SIG = 1 << 1, /* m_wr consumer signaled, IB completion */ + M_SEND_CN_EAGER_SIG = 1 << 2, /* m_wr consumer eager signaled, SCIF read completion */ + M_SEND_MP_SIG = 1 << 3, /* m_wr mpxyd signaled, segmentation, manage proxy buf/wr resources */ +}; + +typedef struct mcm_wr { + struct ibv_send_wr wr; + struct ibv_sge sg[DAT_MIX_SGE_MAX]; + uint64_t org_id; + uint64_t context; + uint32_t m_idx; + uint32_t w_idx; + int flags; +} mcm_wr_t; + /* DAPL MCM QP object, id in entry, buffer pool per QP for now, TODO share a pool at SMD level? */ typedef struct mcm_qp { LLIST_ENTRY entry; @@ -172,6 +194,7 @@ typedef struct mcm_qp { pthread_mutex_t lock; /* QP lock, pool and cookies */ dat_mix_qp_attr_t qp_t; dat_mix_qp_attr_t qp_r; + struct mcm_cq *m_cq; /* CQ for sQP */ char *m_buf; /* MIC proxy buffer, SCIF and IB */ struct ibv_mr *m_mr; /* IB registration */ off_t m_off; /* SCIF registration */ @@ -185,7 +208,11 @@ typedef struct mcm_qp { int wr_tl; /* work request pool tail */ int wr_end; /* work request pool end */ int wr_len; /* work request pool size */ + int wr_sz; /* work request entry size, 64 byte aligned */ int ref_cnt; + int post_cnt; /* segmenting requires completion management to avoid WR depletion */ + int post_sig_cnt; + int comp_cnt; uint64_t rf_min; /* scif_readfrom profiling */ uint64_t rf_max; uint64_t rf_avg; @@ -204,6 +231,7 @@ typedef struct mcm_cq { uint32_t cq_len; uint32_t cq_id; /* MIC client */ uint64_t cq_ctx; /* MIC client */ + uint64_t prev_id; int ref_cnt; } mcm_cq_t; @@ -257,7 +285,9 @@ typedef struct mcm_scif_dev { pthread_mutex_t qplock; /* qp lock */ pthread_mutex_t cqlock; /* cq lock */ pthread_mutex_t mrlock; /* mr lock */ - int ref_cnt; /* references */ + int destroy; /* destroying device, all resources */ + int ref_cnt; /* child references */ + int th_ref_cnt; /* work thread references */ struct mcm_ib_dev *md; /* mcm_ib_dev, parent */ uint16_t cm_id; /* port ID MIC client, md->ports */ uint64_t *ports; /* EP port space MIC client */ @@ -311,8 +341,16 @@ static char *mcm_cntr_names[] = { "MCM_CQ_REARM", "MCM_CQ_EVENT", "MCM_QP_CREATE", + "MCM_MX_SEND", + "MCM_MX_SEND_INLINE", + "MCM_MX_WRITE", + "MCM_MX_WRITE_SEG", + "MCM_MX_WRITE_INLINE", "MCM_QP_SEND", + "MCM_QP_SEND_INLINE", "MCM_QP_WRITE", + "MCM_QP_WRITE_SEG", + "MCM_QP_WRITE_INLINE", "MCM_QP_READ", "MCM_QP_FREE", "MCM_QP_EVENT", @@ -399,9 +437,17 @@ typedef enum mcm_counters MCM_CQ_POLL, MCM_CQ_REARM, MCM_CQ_EVENT, + MCM_MX_SEND, + MCM_MX_SEND_INLINE, + MCM_MX_WRITE, + MCM_MX_WRITE_SEG, + MCM_MX_WRITE_INLINE, MCM_QP_CREATE, MCM_QP_SEND, + MCM_QP_SEND_INLINE, MCM_QP_WRITE, + MCM_QP_WRITE_SEG, + MCM_QP_WRITE_INLINE, MCM_QP_READ, MCM_QP_FREE, MCM_QP_EVENT, @@ -491,6 +537,17 @@ static void m_cm_free(struct mcm_cm *m_cm); void mpxy_op_thread(void *mic_client); void mpxy_tx_thread(void *mic_client); void mpxy_cm_thread(void *mic_client); +void mcm_cq_event(struct mcm_cq *m_cq); +static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data, int *events); + +static inline void sleep_usec(int usec) +{ + struct timespec sleep, remain; + + sleep.tv_sec = 0; + sleep.tv_nsec = usec * 1000; + nanosleep(&sleep, &remain); +} static inline uint32_t mcm_ts_us(void) { @@ -662,6 +719,18 @@ static int mcm_select(struct mcm_fd_set *set, int time_ms) return ret; } +static int mcm_config_fd(int fd) +{ + int opts; + + opts = fcntl(fd, F_GETFL); + if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) { + mlog(1, "fcntl on fd %d ERR %d %s\n", fd, opts, strerror(errno)); + return errno; + } + return 0; +} + /* MCM 16-bit port space */ static uint16_t mcm_get_port(uint64_t *p_port, uint16_t port, uint64_t ctx) { @@ -745,6 +814,8 @@ static void mpxy_set_options( int debug_mode ) mix_buffer_sg = atoi(value); else if (!strcasecmp("buffer_alignment", opt)) mix_align = atoi(value); + else if (!strcasecmp("buffer_inline_threshold", opt)) + mix_inline_threshold = atoi(value); else if (!strcasecmp("mcm_depth", opt)) mcm_depth = atoi(value); else if (!strcasecmp("scif_port_id", opt)) @@ -763,8 +834,12 @@ static void mpxy_set_options( int debug_mode ) mcm_affinity = atoi(value); else if (!strcasecmp("mcm_affinity_base", opt)) mcm_affinity_base = atoi(value); + else if (!strcasecmp("mcm_ib_inline", opt)) + mcm_ib_inline = atoi(value); else if (!strcasecmp("mcm_perf_profile", opt)) mcm_profile = atoi(value); + else if (!strcasecmp("mcm_eager_completion", opt)) + mix_eager_completion = atoi(value); } fclose(f); @@ -788,6 +863,9 @@ static void mpxy_log_options(void) mlog(0, "RDMA buffer pool size %d MB\n", mix_buffer_mb); mlog(0, "RDMA buffer segment size %d\n", mix_buffer_sg); mlog(0, "RDMA buffer alignment %d\n", mix_align); + mlog(0, "RDMA SCIF inline threshold %d\n", mix_inline_threshold); + mlog(0, "RDMA IB inline threshold %d\n", mcm_ib_inline); + mlog(0, "RDMA eager completion %d\n", mix_eager_completion); mlog(0, "Maximum message size %d MB\n", mix_max_msg_mb); mlog(0, "CM msg queue depth %d\n", mcm_depth); mlog(0, "CM msg completion signal rate %d\n", mcm_signal); @@ -1589,13 +1667,13 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e smd->m_shared_len = mix_buffer_mb * (1024 * 1024); else smd->m_shared_len = 0; - smd->m_len = smd->m_shared_len + 8 * (1024 * 1024); + smd->m_len = smd->m_shared_len + (8 * (1024 * 1024)); ret = posix_memalign((void **)&smd->m_buf, 4096, smd->m_len); if (ret) { mlog(0, "failed to allocate smd m_buf, m_len=%d, ERR: %s\n", smd->m_len, strerror(errno)); goto err; } - mlog(1, " Allocate/Register RDMA Proxy buffer %p, ln=%d\n", smd->m_buf, smd->m_len); + mlog(0, " Allocate/Register RDMA Proxy buffer %p-%p, ln=%d\n", smd->m_buf, (char*)smd->m_buf + smd->m_len, smd->m_len); smd->m_offset = scif_register(tx_ep, smd->m_buf, smd->m_len, (off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0); @@ -1603,7 +1681,7 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e mlog(0, " scif_register addr=%p,%d failed %s\n", smd->m_buf, smd->m_len, strerror(errno)); goto err; } - mlog(1, " SCIF addr=%p, offset=0x%llx, len %d\n", smd->m_buf, smd->m_offset, smd->m_len); + mlog(0, " SCIF addr=%p, offset=0x%llx, len %d\n", smd->m_buf, smd->m_offset, smd->m_len); smd->m_mr = ibv_reg_mr(smd->md->pd, smd->m_buf, smd->m_len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ); @@ -1611,7 +1689,7 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e mlog(0, " IB addr=%p,%d failed %s\n", smd->m_buf, smd->m_len, strerror(errno)); goto err; } - mlog(1, " IB registered addr=%p,%d, mr_addr=%p handle=0x%x, lkey=0x%x rkey=0x%x \n", + mlog(0, " IB registered addr=%p,%d, mr_addr=%p handle=0x%x, lkey=0x%x rkey=0x%x \n", smd->m_buf, smd->m_len, smd->m_mr->addr, smd->m_mr->handle, smd->m_mr->lkey, smd->m_mr->rkey); @@ -1770,7 +1848,7 @@ err: /* close MCM device, MIC client, md->slock held */ static void mix_close_device(mcm_ib_dev_t *md, mcm_scif_dev_t *smd) { - mlog(1, " md %p smd %p\n", md, smd); + mlog(0, " md %p smd %p\n", md, smd); /* close and remove scif MIX client, leave parent mcm_ib_dev open */ if (smd->scif_op_ep) { @@ -1781,6 +1859,19 @@ static void mix_close_device(mcm_ib_dev_t *md, mcm_scif_dev_t *smd) scif_close(smd->scif_tx_ep); smd->scif_tx_ep = 0; } + if (smd->scif_ev_ep) { + scif_close(smd->scif_ev_ep); + smd->scif_ev_ep = 0; + } + + smd->destroy = 1; + + while (smd->th_ref_cnt) { + mlog(0, " waiting for SMD %p ref_cnt (%d) = 0\n", smd, smd->th_ref_cnt); + pthread_mutex_unlock(&md->slock); + sleep_usec(1000); + pthread_mutex_lock(&md->slock); + } mcm_destroy_smd(smd); mlog(1, " freed smd %p\n", md, smd); @@ -1963,14 +2054,14 @@ mcm_cq_t *mix_get_cq(mcm_scif_dev_t *smd, uint32_t tid) } /* locate QP object */ -mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid) +mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid, uint32_t qpn) { mcm_qp_t *qp = NULL; pthread_mutex_lock(&smd->qplock); qp = get_head_entry(&smd->qplist); while (qp) { - if (qp->entry.tid == tid) + if ((qp->entry.tid == tid) || (qp->ib_qp->qp_num == qpn)) break; qp = get_next_entry(&qp->entry, &smd->qplist); } @@ -2023,7 +2114,7 @@ static int create_mbuf_pool(struct mcm_qp *m_qp) goto err; } - mlog(1, " RDMA Proxy buffer %p, len %d\n", m_qp->m_buf, m_qp->m_len); + mlog(0, " RDMA Proxy buffer %p, len %d\n", m_qp->m_buf, m_qp->m_len); m_qp->m_off = scif_register(m_qp->smd->scif_tx_ep, m_qp->m_buf, m_qp->m_len, (off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0); @@ -2039,7 +2130,7 @@ static int create_mbuf_pool(struct mcm_qp *m_qp) mlog(0, " IB addr=%p,%d failed %s\n", m_qp->m_buf, m_qp->m_len, strerror(errno)); goto err; } - mlog(1, " IB_mr addr=%p,%d, mr_addr %p handle 0x%x lkey 0x%x rkey 0x%x \n", + mlog(0, " IB_mr addr=%p,%d, mr_addr %p handle 0x%x lkey 0x%x rkey 0x%x \n", m_qp->m_buf, m_qp->m_len, m_qp->m_mr->addr, m_qp->m_mr->handle, m_qp->m_mr->lkey, m_qp->m_mr->rkey); return 0; @@ -2062,15 +2153,15 @@ static int create_wrbuf_pool(struct mcm_qp *m_qp, int entries) { /* RDMA proxy pool, register with SCIF and IB, set pool and segm size with parameters */ m_qp->wr_end = entries; - m_qp->wr_len = DAT_MCM_WR * entries; /* 192 bytes to align signaling, ibv_qp + 7 SGE's */ + m_qp->wr_sz = ALIGN_64(sizeof(struct mcm_wr)); + m_qp->wr_len = m_qp->wr_sz * entries; /* 64 byte aligned for signal_fence */ if (posix_memalign((void **)&m_qp->wr_buf, 4096, ALIGN_PAGE(m_qp->wr_len))) { mlog(0, "failed to allocate wr_buf, m_qp=%p, wr_len=%d, entries=%d\n", m_qp, m_qp->wr_len, entries); goto err; } - memset(m_qp->wr_buf, 0, ALIGN_PAGE(m_qp->wr_len)); - mlog(1, " WR buf pool %p, LEN req=%d, act=%d\n", m_qp->wr_buf, m_qp->wr_len, ALIGN_PAGE(m_qp->wr_len) ); + mlog(0, " WR buf pool %p, LEN req=%d, act=%d\n", m_qp->wr_buf, m_qp->wr_len, ALIGN_PAGE(m_qp->wr_len) ); m_qp->wr_off = scif_register(m_qp->smd->scif_tx_ep, m_qp->wr_buf, ALIGN_PAGE(m_qp->wr_len), (off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0); @@ -2079,7 +2170,7 @@ static int create_wrbuf_pool(struct mcm_qp *m_qp, int entries) m_qp->wr_buf, ALIGN_PAGE(m_qp->wr_len), strerror(errno)); goto err; } - mlog(1, " SCIF_mr for wr_buf addr %p, off 0x%llx, len %d, entries %d\n", + mlog(0, " SCIF_mr for wr_buf addr %p, off 0x%llx, len %d, entries %d\n", m_qp->wr_buf, m_qp->wr_off, ALIGN_PAGE(m_qp->wr_len), entries); return 0; @@ -2148,6 +2239,9 @@ static int m_cq_create(mcm_scif_dev_t *smd, int cq_len, struct mcm_cq **m_cq_out if (!m_cq->ib_ch) goto err; + if (mcm_config_fd(m_cq->ib_ch->fd)) + goto err; + m_cq->ib_cq = ibv_create_cq(smd->md->ibctx, cq_len, m_cq, m_cq->ib_ch, 0); if (!m_cq->ib_cq) goto err; @@ -2183,7 +2277,7 @@ static int mix_cq_create(mcm_scif_dev_t *smd, dat_mix_cq_t *pmsg) mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; } - mlog(1, " MIX_CQ_CREATE: cq_len = %d\n", pmsg->cq_len); + mlog(0, " MIX_CQ_CREATE: cq_len = %d\n", pmsg->cq_len); if (m_cq_create(smd, pmsg->cq_len, &new_mcq)) goto err; @@ -2284,7 +2378,7 @@ static int mix_qp_destroy(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg) mlog(1, " MIX_QP_DESTROY: QP_t - id 0x%x\n", pmsg->req_id ); /* Find the QP */ - m_qp = mix_get_qp(smd, pmsg->req_id); + m_qp = mix_get_qp(smd, pmsg->req_id, 0); if (!m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->req_id); goto err; @@ -2315,7 +2409,6 @@ resp: static int mix_qp_modify(mcm_scif_dev_t *smd, dat_mix_qp_t *pmsg) { - /* TODO */ return 0; } @@ -2348,6 +2441,7 @@ static int m_qp_create(mcm_scif_dev_t *smd, mlog(0, " ERR: mix_get_cq, id %d, not found\n", scq_id); goto err; } + m_qp->m_cq = m_cq; attr->recv_cq = m_cq->ib_cq; attr->send_cq = m_cq->ib_cq; @@ -2422,13 +2516,13 @@ static int mix_qp_create(mcm_scif_dev_t *smd, dat_mix_qp_t *pmsg) memset((void *)&qp_create, 0, sizeof(qp_create)); qp_create.cap.max_recv_wr = pmsg->qp_t.max_recv_wr; qp_create.cap.max_recv_sge = pmsg->qp_t.max_recv_sge; - qp_create.cap.max_send_wr = pmsg->qp_t.max_send_wr * 8; /* max of 8 segments per wr */ + qp_create.cap.max_send_wr = min(pmsg->qp_t.max_send_wr + mix_buffer_sg_cnt, 16000); qp_create.cap.max_send_sge = pmsg->qp_t.max_send_sge; - qp_create.cap.max_inline_data = 0; /* better bandwidth without inline */ + qp_create.cap.max_inline_data = mcm_ib_inline; qp_create.qp_type = IBV_QPT_RC; - mlog(1, " QP_t - max_wr %d adjusted for segmentation, inline == 0\n", - qp_create.cap.max_send_wr); + mlog(0, " QP_t - max_wr %d adjusted up to %d for segmentation \n", + pmsg->qp_t.max_send_wr, qp_create.cap.max_send_wr); pmsg->hdr.status = m_qp_create(smd, &qp_create, pmsg->qp_t.scq_id, &new_mqp); if (pmsg->hdr.status) @@ -2445,12 +2539,13 @@ static int mix_qp_create(mcm_scif_dev_t *smd, dat_mix_qp_t *pmsg) pmsg->m_seg = new_mqp->m_seg; pmsg->wr_off = new_mqp->wr_off; pmsg->wr_len = new_mqp->wr_end; + pmsg->m_inline = mix_inline_threshold; - mlog(1, " QP_t - qpn %x, q_id %d, q_ctx %p sq %d,%d rq %d,%d\n", + mlog(1, " QP_t - qpn %x, q_id %d, q_ctx %p sq %d,%d rq %d,%d, il %d\n", pmsg->qp_t.qp_num, pmsg->qp_t.qp_id, new_mqp, pmsg->qp_t.max_send_wr, pmsg->qp_t.max_send_sge, pmsg->qp_t.max_recv_wr, - pmsg->qp_t.max_recv_sge); + pmsg->qp_t.max_recv_sge, pmsg->m_inline); resp: /* send back response */ @@ -2462,7 +2557,7 @@ resp: static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc) { dat_mix_dto_comp_t msg; - int len, i; + int i; /* send DTO events to MIC client */ msg.hdr.ver = DAT_MIX_VER; @@ -2470,19 +2565,26 @@ static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc) msg.hdr.flags = MIX_OP_REQ; msg.cq_id = m_cq->cq_id; msg.cq_ctx = m_cq->cq_ctx; + msg.wc_cnt = nc; for (i=0; i < nc; i++) { - msg.wc_cnt = 1; /* todo, batch in groups of MIX_WC_MAX */ memcpy(&msg.wc, &wc[i], sizeof(*wc)); - len = sizeof(dat_mix_dto_comp_t); - if (scif_send_msg(m_cq->smd->scif_ev_ep, (void*)&msg, len)) - return; - mlog(1, " MIX_DTO_EVENT (ep=%d,sz=%d): cq %p id %d ctx %p stat %d op %d ln %d wr_id %p wc's %d verr 0x%x\n", - m_cq->smd->scif_op_ep, len, m_cq, msg.cq_id, msg.cq_ctx, - msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len, - msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err); + if (msg.wc[i].status != IBV_WC_SUCCESS) { + mlog(0, " ERROR (ep=%d): cq %p id %d ctx %p stat %d" + " op %d wr_id %p wc's %d verr 0x%x\n", + m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx, + msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len, + msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err); + } else { + mlog(1, " SUCCESS (ep=%d): cq %p id %d ctx %p stat %d" + " op %d ln %d wr_id %p wc's %d verr 0x%x\n", + m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx, + msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len, + msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err); + } } + scif_send_msg(m_cq->smd->scif_ev_ep, (void*)&msg, sizeof(msg)); } static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event) @@ -2529,7 +2631,7 @@ static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id); /* Find the QP for linking */ - m_qp = mix_get_qp(smd, pmsg->qp_id); + m_qp = mix_get_qp(smd, pmsg->qp_id, 0); if (!m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); goto err; @@ -2812,7 +2914,7 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) memcpy(&m_cm->msg, &pmsg->msg, sizeof(dat_mcm_msg_t)); /* do I need this copy?? */ /* Attach the QP for this CR */ - m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id); + m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id, 0); if (!m_cm->m_qp) { mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id); return -1; @@ -2912,191 +3014,195 @@ static int mix_cm_disc_in(mcm_cm_t *m_cm) return 0; } -/* Post SEND message request, operation channel */ -static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) +static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data, int *events) { - int len, ret; struct mcm_qp *m_qp; + struct mcm_wr *m_wr; struct ibv_send_wr *bad_wr; - struct ibv_sge sge; - - /* hdr already read, get operation data */ - len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); - if (ret != len) { - mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len); - return -1; - } - mlog(1, " enter: q_id %d, q_ctx %p, len %d, wr_id %p, sge %d, op %x flgs %x \n", - pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, pmsg->wr.wr_id, - pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags); - - m_qp = (struct mcm_qp*)pmsg->qp_ctx; /* trust me for now, TODO use id lookup */ - - mlog(1, " qp_num %x \n", m_qp->ib_qp->qp_num); - - /* TODO, mix with rdma_writes and preserve order */ - len = pmsg->len; - ret = scif_recv(smd->scif_op_ep, smd->m_buf + smd->m_shared_len, len, SCIF_RECV_BLOCK); - if (ret != len) { - mlog(0, " ERR: scif_recv inline DATA, ret %d, exp %d\n", ret, len); - return -1; - } - mlog(1, " scif_recv inline DATA, len = %d, exp %d \n",ret,len); - - pmsg->wr.sg_list = &sge; - pmsg->wr.num_sge = 1; - sge.addr = (uint64_t)smd->m_mr->addr + smd->m_shared_len; - sge.lkey = smd->m_mr->lkey; - sge.length = len; - ret = ibv_post_send(m_qp->ib_qp, &pmsg->wr, &bad_wr); - if (ret) { - struct ibv_wc wc; - mlog(0, " ERR ibv_post_send: %s - wr_id %p\n", strerror(errno), pmsg->wr.wr_id); - - wc.wr_id = pmsg->wr.wr_id; - wc.byte_len = 0; - wc.status = IBV_WC_GENERAL_ERR; - wc.opcode = IBV_WC_SEND; - wc.vendor_err = ret; - mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); - } - MCNTR(smd->md, MCM_QP_SEND); - - mlog(1, " exit: q_id %d, q_ctx %p, len %d\n", - pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len); - - return 0; -} - -static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data) -{ - struct mcm_qp *m_qp; - struct ibv_send_wr *m_wr, *bad_wr; - int ret,posted, wr_idx, wr_max, poll_cnt; + int ret, posted, done, wr_idx, wr_max, poll_cnt, cn_signal; pthread_mutex_lock(&smd->qplock); m_qp = get_head_entry(&smd->qplist); while (m_qp) { - wr_max = 1; /* mix_write and ib_write, equal time between op and tx threads */ + done = 0; + wr_max = mcm_ib_signal_rate * 2; wr_idx = m_qp->wr_tl; - while (wr_idx != m_qp->wr_hd && wr_max--) { /* change to if, one at a time */ - posted = 0; poll_cnt = 100; - m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * wr_idx)); - while ((m_wr->wr_id != m_wr->wr.atomic.swap) && (--poll_cnt)); - mlog(1, " qp %p wr_id %#016Lx poll_cnt %d\n", m_qp, m_wr->wr.atomic.swap, poll_cnt); + /* check for credits on this sQP */ + if (((m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate) >= (m_qp->wr_end - (mcm_ib_signal_rate*2))) { + mlog(0, " wr_len %d - pending %d = low credits %d, rate=%d waiting.. \n", + m_qp->wr_end, (m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate, + m_qp->wr_end - ((m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate), + mcm_ib_signal_rate); + *data += (m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end; + break; + } - if (m_wr->wr_id == m_wr->wr.atomic.swap) { - char *sbuf = (char*)m_wr->sg_list->addr; -#ifdef MCM_PROFILE - if (mcm_profile) { - uint32_t diff = ((mcm_ts_us() + 1000000) - m_wr->xrc_remote_srq_num) % 1000000; - uint32_t ops = (uint32_t)m_wr->wr.atomic.rkey; - - if ((diff/ops) > 100000) - mlog(0, "tx_thread: slow readfrom -> m_wr %p wr_id %#016Lx ln=%d, pending %d, %u usecs (%u-%u), CPU_id=%d\n", - m_wr, m_wr->wr_id, m_wr->sg_list->length, ops, diff/ops, mcm_ts_us(), m_wr->xrc_remote_srq_num, - sched_getcpu()); - - m_wr->xrc_remote_srq_num = 0; - m_wr->wr.atomic.rkey = 0; - - if (!m_qp->rf_min || diff/ops < m_qp->rf_min) - m_qp->rf_min = diff/ops; - if (!m_qp->rf_max || diff/ops > m_qp->rf_max) - m_qp->rf_max = diff/ops; - if (!m_qp->rf_avg) - m_qp->rf_avg = diff/ops; + while (wr_idx != m_qp->wr_hd && wr_max--) { + cn_signal = 0; posted = 0; poll_cnt = 100; + m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * wr_idx)); + + /* inline, OP thread posted */ + if (m_wr->flags & M_SEND_POSTED) { + mlog(1, " POSTED: [%d] qp %p hd %d tl %d idx %d wr %p wr_id %p, addr %p sz %d sflg 0x%x mflg 0x%x\n", + done, m_qp, m_qp->wr_hd, m_qp->wr_tl, wr_idx, m_wr, + m_wr->org_id, m_wr->wr.sg_list->addr, m_wr->sg->length, + m_wr->wr.send_flags, m_wr->flags); + done++; + if (++wr_idx == m_qp->wr_end) + wr_idx = 0; + continue; + } + while ((m_wr->wr.wr_id != m_wr->org_id) && (--poll_cnt)); + mlog(1, " qp %p hd %d tl %d idx %d wr %p wr_id %#016Lx poll_cnt %d\n", + m_qp, m_qp->wr_hd, m_qp->wr_tl, wr_idx, m_wr, m_wr->org_id, poll_cnt); + + if (m_wr->wr.wr_id == m_wr->org_id) { + char *sbuf = (char*)m_wr->wr.sg_list->addr; + + mlog(1, " m_wr %p ready for ibv_post addr=%p ln=%d, lkey=%x\n", + m_wr, sbuf, m_wr->sg->length, m_wr->sg->lkey); + mlog(1, " wr_id %#016Lx next %p sglist %p sge %d op %d flgs" + " %d idata 0x%x raddr %p rkey %x \n", + m_wr->wr.wr_id, m_wr->wr.next, m_wr->wr.sg_list, m_wr->wr.num_sge, m_wr->wr.opcode, + m_wr->wr.send_flags, m_wr->wr.imm_data, m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey); + + /* signaling and eager completion */ + if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) + cn_signal = 1; + + if (!((m_qp->post_cnt+1) % mcm_ib_signal_rate)) { + m_wr->wr.send_flags |= IBV_SEND_SIGNALED; + m_wr->flags |= M_SEND_MP_SIG; + } + else if (mix_eager_completion) + m_wr->wr.send_flags &= ~IBV_SEND_SIGNALED; + + if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) { + m_qp->post_sig_cnt++; + if (cn_signal && !mix_eager_completion) + m_wr->flags |= M_SEND_CN_SIG; + if (mix_shared_buffer) + m_wr->m_idx = (sbuf + (m_wr->wr.sg_list->length - 1)) - smd->m_buf; else - m_qp->rf_avg = ((diff/ops + m_qp->rf_avg) / 2); + m_wr->m_idx = (sbuf + (m_wr->wr.sg_list->length - 1)) - m_qp->m_buf; + + mlog(1, " signaled, qp %p wr %p wr_id %p flgs 0x%x," + " pcnt %d sg_rate %d sqe_cnt %d, hd %d tl %d sz %d\n", + m_qp, m_wr, m_wr->wr.wr_id, m_wr->wr.send_flags, + m_qp->post_cnt, mcm_ib_signal_rate, + m_qp->wr_end, m_qp->wr_hd, m_qp->wr_tl, + m_wr->wr.sg_list->length); } -#endif - - mlog(1, " tx_thread: m_wr %p wr_id %#016Lx data ready for IB write, 1st byte 0x%x last byte 0x%x, ln=%d\n", - m_wr, m_wr->wr_id, sbuf[0], sbuf[m_wr->sg_list->length-1], m_wr->sg_list->length); - mlog(1, " tx_thread: wr_id %#016Lx next %p sglist %p sge %d op %d flgs %d idata 0x%x raddr %p rkey %x \n", - m_wr->wr_id, m_wr->next, m_wr->sg_list, m_wr->num_sge, m_wr->opcode, - m_wr->send_flags, m_wr->imm_data, m_wr->wr.rdma.remote_addr, m_wr->wr.rdma.rkey); - - ret = ibv_post_send(m_qp->ib_qp, m_wr, &bad_wr); - if (ret) { + m_wr->wr.wr_id = (uint64_t)m_wr; + ret = ibv_post_send(m_qp->ib_qp, &m_wr->wr, &bad_wr); + if (ret || (cn_signal && mix_eager_completion)) { struct ibv_wc wc; - mlog(0, " ERR ibv_post_write: %s - wr_id %p\n", - strerror(errno), m_wr->wr_id); - - wc.wr_id = m_wr->wr_id; - wc.byte_len = 0; - wc.status = IBV_WC_GENERAL_ERR; - wc.opcode = IBV_WC_RDMA_WRITE; + mlog(1, " dto_event: sig %d ret %d, %s - m_wr %p, wr_id %p\n", + cn_signal, ret, strerror(errno), + m_wr->wr.wr_id, m_wr->org_id); + + wc.wr_id = m_wr->org_id; + wc.byte_len = m_wr->sg->length; + wc.status = ret ? IBV_WC_GENERAL_ERR : IBV_WC_SUCCESS; + wc.opcode = m_wr->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE; wc.vendor_err = ret; mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); } - MCNTR(smd->md, MCM_QP_WRITE); - mlog(1, " - qp %p wr %p wr_id %#016Lx posted tl=%d hd=%d\n", - m_qp, m_wr, m_wr->wr_id, m_qp->wr_tl, m_qp->wr_hd); + m_qp->post_cnt++; + m_wr->flags |= M_SEND_POSTED; + + if (m_wr->wr.opcode == IBV_WR_SEND) { + if (m_wr->sg->length <= mix_inline_threshold) + MCNTR(smd->md, MCM_QP_SEND_INLINE); + else + MCNTR(smd->md, MCM_QP_SEND); + } else { + if (m_wr->sg->length <= mix_inline_threshold) + MCNTR(smd->md, MCM_QP_WRITE_INLINE); + else + MCNTR(smd->md, MCM_QP_WRITE_SEG); + } + mlog(1, " qp %p wr %p wr_id %#016Lx posted tl=%d" + " hd=%d idx=%d pst=%d,%d cmp %d\n", + m_qp, m_wr, m_wr->org_id, m_qp->wr_tl, + m_qp->wr_hd, wr_idx, m_qp->post_cnt, + m_qp->post_sig_cnt, m_qp->comp_cnt); - m_wr->wr_id = 0; posted++; - if (++m_qp->wr_tl == m_qp->wr_end) - m_qp->wr_tl = 0; } if (!posted) { - mlog(1, " - qp %p wr_id %#016Lx still not ready\n", m_qp, m_wr->wr.atomic.swap); + mlog(1, " qp %p wr %p wr_id %#016Lx still not ready\n", m_qp, m_wr, m_wr->org_id); break; } if (++wr_idx == m_qp->wr_end) /* posted WR, move to next */ wr_idx = 0; + + if (smd->destroy) { + mlog(0, " SMD destroy - QP %p hd %d tl %d pst %d,%d cmp %d, pending data %d, events %d\n", + m_qp, m_qp->wr_hd, m_qp->wr_tl,m_qp->post_cnt, + m_qp->post_sig_cnt, m_qp->comp_cnt, data, events); + mlog(0, " wr %p wr_id %p org_id %p sglist %p sge %d ln %d op %d flgs" + " %x idata 0x%x raddr %p rkey %x m_flgs %x\n", + m_wr, m_wr->wr.wr_id, m_wr->org_id, m_wr->wr.sg_list, + m_wr->wr.num_sge, m_wr->sg->length, m_wr->wr.opcode, + m_wr->wr.send_flags, m_wr->wr.imm_data, + m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey, m_wr->flags); + + } + } + *data += ((m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end) - done; + *events += m_qp->post_sig_cnt - m_qp->comp_cnt; + + if (smd->destroy) { + mlog(0, " SMD destroy - QP %p hd %d tl %d pst %d,%d cmp %d, pending data %d, events %d\n", + m_qp, m_qp->wr_hd, m_qp->wr_tl,m_qp->post_cnt, + m_qp->post_sig_cnt, m_qp->comp_cnt, data, events); } - *data += (m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end; m_qp = get_next_entry(&m_qp->entry, &smd->qplist); } - mlog(1, " tx_tread: pending data = %d on device smd %p\n", *data, smd); pthread_mutex_unlock(&smd->qplock); } -/* Post SEND message request, operation channel */ -static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) +/* initiate proxy data transfer, operation channel */ +static int m_proxy_data(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg, struct mcm_qp *m_qp) { - int len, ret, i; + int len, ret, i, retries; off_t l_off, r_off; uint64_t total_offset; int l_start, l_end, l_len, cacheln_off, seg_len; - struct mcm_qp *m_qp; - struct ibv_send_wr *m_wr; + struct mcm_wr *m_wr; struct ibv_sge *m_sge; - /* hdr already read, get mix_sent_t data */ - len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); - if (ret != len) { - mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len); - return -1; - } mlog(1, " q_id %d, q_ctx %p, len %d, wr_id %p, sge %d, op %x flgs %x wr_idx %d\n", pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, pmsg->wr.wr_id, pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags, pmsg->sge[0].lkey); - m_qp = (struct mcm_qp*)pmsg->qp_ctx; total_offset = 0; + if (pmsg->wr.opcode == IBV_WR_SEND) + MCNTR(smd->md, MCM_MX_SEND); + else + MCNTR(smd->md, MCM_MX_WRITE); + /* lock, tx thread can be posting */ pthread_mutex_lock(&smd->qplock); - m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * m_qp->wr_hd)); - m_sge = (struct ibv_sge *)((char*)m_wr + 80); + m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd)); + m_sge = m_wr->sg; mlog(1, " m_wr %p m_sge %p \n", m_wr, m_sge); - memcpy(&m_wr->num_sge, &pmsg->wr.num_sge, 40); - m_wr->wr.atomic.swap = pmsg->wr.wr_id; - m_wr->sg_list = m_sge; - m_wr->next = 0; -#ifdef MCM_PROFILE - m_wr->xrc_remote_srq_num = 0; -#endif - m_wr->wr.rdma.remote_addr += total_offset; - m_wr->num_sge = 0; + memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40); + m_wr->org_id = pmsg->wr.wr_id; + m_wr->wr.sg_list = m_sge; + m_wr->wr.next = 0; + m_wr->wr.wr.rdma.remote_addr += total_offset; + m_wr->wr.num_sge = 0; + m_wr->wr.wr_id = 0; + m_wr->w_idx = m_qp->wr_hd; + m_wr->flags = 0; + m_wr->context = (uint64_t)m_qp; for (i=0;iwr.num_sge;i++) { @@ -3107,10 +3213,15 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) r_off = ALIGN_DOWN_64(r_off); while (l_len) { - m_wr->num_sge ++; + m_wr->wr.num_sge++; + retries = 1; + + /* Send or last available WR, send all */ + if (pmsg->wr.opcode == IBV_WR_SEND) + seg_len = l_len; + else + seg_len = (l_len > m_qp->m_seg)? m_qp->m_seg : l_len; - seg_len = (l_len > m_qp->m_seg)? m_qp->m_seg : l_len; - if (mix_shared_buffer) { l_start = ALIGN_64(smd->m_hd); if ((l_start + seg_len) > smd->m_shared_len) @@ -3118,11 +3229,11 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) l_end = l_start + seg_len; if (l_start < smd->m_tl && l_end > smd->m_tl) { - mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d\n", - smd->m_hd, smd->m_tl, seg_len); - return -1; /* todo queue up, don't fail */ + mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d, retries %d\n", + smd->m_hd, smd->m_tl, seg_len, retries); + ret = -1; + goto bail; } - l_off = smd->m_offset + l_start; } else { @@ -3134,37 +3245,29 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) if (l_start < m_qp->m_tl && l_end > m_qp->m_tl) { mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d\n", m_qp->m_hd, m_qp->m_tl, seg_len); - return -1; /* todo queue up, don't fail */ + ret = -1; + goto bail; } - l_off = m_qp->m_off + l_start; } mlog(1, " SCIF_readfrom[%d] l_off %p, r_off %p, l_start 0x%x l_end 0x%x seg_len %d, len %d cacheln_off %d \n", i, l_off, r_off, l_start, l_end, seg_len, len, cacheln_off); -#ifdef MCM_PROFILE - if (mcm_profile) { - m_wr->xrc_remote_srq_num = (uint32_t)mcm_ts_us(); - m_wr->wr.atomic.rkey = (uint32_t)((m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end)+1; - mlog(1, " op_thread: SCIF_readfrom on PERF CPU_id %d, ts=%d us, pending %d\n", - sched_getcpu(), m_wr->xrc_remote_srq_num, m_wr->wr.atomic.rkey); - } -#endif + ret = scif_readfrom(smd->scif_tx_ep, l_off, seg_len, r_off, 0); if (ret) { mlog(0, " ERR: scif_readfrom, ret %d\n", ret); - return -1; + goto bail; } MCNTR(smd->md, MCM_SCIF_READ_FROM); if (mix_shared_buffer) { smd->m_hd = l_end; - m_sge->addr = (uint64_t)(smd->m_buf + l_start + cacheln_off); + m_sge->addr = (uint64_t)(smd->m_buf + l_start + cacheln_off); m_sge->lkey = smd->m_mr->lkey; - } - else { + } else { m_qp->m_hd = l_end; - m_sge->addr = (uint64_t)(m_qp->m_buf + l_start + cacheln_off); + m_sge->addr = (uint64_t)(m_qp->m_buf + l_start + cacheln_off); m_sge->lkey = m_qp->m_mr->lkey; } m_sge->length = seg_len - cacheln_off; @@ -3182,8 +3285,8 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) m_sge++; /* if enough for this WR, then set up DMA signal, and move to next WR */ - if (seg_len == m_qp->m_seg || i == pmsg->wr.num_sge - 1) { - l_off = m_qp->wr_off + (m_qp->wr_hd * DAT_MCM_WR); + if (seg_len == m_qp->m_seg || i == pmsg->wr.num_sge - 1) { + l_off = m_qp->wr_off + (m_qp->wr_hd * m_qp->wr_sz); mlog(1, " SCIF_fence_signal[%d] l_off %p, wr_id %p, new wr_hd 0x%x wr_len %d\n", i, l_off, pmsg->wr.wr_id, m_qp->wr_hd, m_qp->wr_len); @@ -3192,54 +3295,64 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL); if (ret) { mlog(0," ERR: scif_fence_signal, ret %d \n", ret); - return -1; + goto bail; } MCNTR(smd->md, MCM_SCIF_SIGNAL); + MCNTR(smd->md, MCM_MX_WRITE_SEG); - /* remove special flags unless it's the last segment */ + /* remove IMM unless it's the last segment */ /* NON-COMPLIANT: IMM segmented causes receiver RDMA length will be wrong */ if (l_len || i != pmsg->wr.num_sge -1) { - if (m_wr->opcode == IBV_WR_RDMA_WRITE_WITH_IMM) - m_wr->opcode = IBV_WR_RDMA_WRITE; - m_wr->send_flags &= IBV_SEND_INLINE; + if (m_wr->wr.opcode == IBV_WR_RDMA_WRITE_WITH_IMM) + m_wr->wr.opcode = IBV_WR_RDMA_WRITE; + m_wr->wr.send_flags = 0; } + if (pmsg->len <= mcm_ib_inline) + m_wr->wr.send_flags |= IBV_SEND_INLINE; + /* took WR slot, move head */ if (++m_qp->wr_hd == m_qp->wr_end) m_qp->wr_hd = 0; - if (m_qp->wr_hd == m_qp->wr_tl) - mlog(0, "ERR: proxy send queue overflow in m_qp %p, queue size: %d\n", m_qp, m_qp->wr_end); + if (m_qp->wr_hd == m_qp->wr_tl) { + mlog(0, " ERR: post_write stalled waiting for proxy WR, hd 0x%x, tl 0x%x, len %d, retries %d\n", + m_qp->wr_hd, m_qp->wr_tl, seg_len, retries); + pthread_mutex_unlock(&smd->qplock); + ret = -1; + goto bail; + } pthread_mutex_unlock(&smd->qplock); ret = write(smd->md->mc->tx_pipe[1], "w", sizeof("w")); /* signal tx_thread */ pthread_mutex_lock(&smd->qplock); /* prepare the next WR */ - m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * m_qp->wr_hd)); - m_sge = (struct ibv_sge *)((char*)m_wr + 80); - - mlog(1, " next m_wr %p m_sge %p \n", m_wr, m_sge); - - memcpy(&m_wr->num_sge, &pmsg->wr.num_sge, 40); - m_wr->wr.atomic.swap = pmsg->wr.wr_id; - m_wr->sg_list = m_sge; - m_wr->next = 0; -#ifdef MCM_PROFILE - m_wr->xrc_remote_srq_num = 0; -#endif - m_wr->wr.rdma.remote_addr += total_offset; - m_wr->num_sge = 0; + m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd)); + m_sge = m_wr->sg; + + mlog(1, " next m_wr %p m_sge %p \n", m_wr, m_wr->sg); + + memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40); + m_wr->org_id = pmsg->wr.wr_id; + m_wr->wr.sg_list = m_wr->sg; + m_wr->wr.next = 0; + m_wr->wr.wr.rdma.remote_addr += total_offset; + m_wr->wr.num_sge = 0; + m_wr->wr.wr_id = 0; + m_wr->w_idx = m_qp->wr_hd; + m_wr->flags = 0; + m_wr->context = (uint64_t)m_qp; } } } pthread_mutex_unlock(&smd->qplock); - if (m_wr->num_sge) { + if (m_wr->wr.num_sge) { pthread_mutex_lock(&smd->qplock); mlog(0, " Warning: corner case, the last sge has length 0\n"); - l_off = m_qp->wr_off + (m_qp->wr_hd * DAT_MCM_WR); + l_off = m_qp->wr_off + (m_qp->wr_hd * m_qp->wr_sz); mlog(1, " SCIF_fence_signal[%d] l_off %p, wr_id %p, new wr_hd 0x%x wr_len %d\n", i, l_off, pmsg->wr.wr_id, m_qp->wr_hd, m_qp->wr_len); @@ -3248,9 +3361,10 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL); if (ret) { mlog(0," ERR: scif_fence_signal, ret %d \n", ret); - return -1; + goto bail; } MCNTR(smd->md, MCM_SCIF_SIGNAL); + MCNTR(smd->md, MCM_MX_WRITE_SEG); /* took WR slot, move head */ if (++m_qp->wr_hd == m_qp->wr_end) @@ -3262,10 +3376,196 @@ static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) ret = write(smd->md->mc->tx_pipe[1], "w", sizeof("w")); /* signal tx_thread */ pthread_mutex_unlock(&smd->qplock); } + ret = 0; +bail: + if (ret) { + struct ibv_wc wc; + + wc.wr_id = pmsg->wr.wr_id; + wc.byte_len = 0; + wc.status = IBV_WC_GENERAL_ERR; + wc.opcode = pmsg->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE; + wc.vendor_err = ret; + mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); + } mlog(1, " exit: q_id %d, q_ctx %p, len %d, wr_hd = %d\n", pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, m_qp->wr_hd); - return 0; + + return ret; +} + +/* Post SEND message request, IB send or rdma write, operation channel */ +static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) +{ + int len, ret, l_start, l_end; + struct mcm_qp *m_qp; + struct mcm_wr *m_wr, *m_wr_prev; + + /* hdr already read, get operation data */ + ret = -1; + len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len); + return -1; + } + m_qp = (struct mcm_qp*)pmsg->qp_ctx; + + mlog(1, " q_id %d, q_num %x ln %d, wr_id %p, sge %d, op %x flgs %x pst %d,%d cmp %d, inl %d, %s\n", + pmsg->qp_id, m_qp->ib_qp->qp_num, pmsg->len, pmsg->wr.wr_id, + pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags, m_qp->post_cnt, + m_qp->post_sig_cnt, m_qp->comp_cnt, pmsg->hdr.flags & MIX_OP_INLINE ? 1:0, + pmsg->wr.opcode == IBV_WR_SEND ? "SND":"WR"); + + if (!(pmsg->hdr.flags & MIX_OP_INLINE)) + return (m_proxy_data(smd, pmsg, m_qp)); + + if (pmsg->wr.opcode == IBV_WR_SEND) + MCNTR(smd->md, MCM_MX_SEND_INLINE); + else + MCNTR(smd->md, MCM_MX_WRITE_INLINE); + + pthread_mutex_lock(&smd->qplock); + m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd)); + if (m_qp->wr_hd == 0) + m_wr_prev = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * (m_qp->wr_end - 1))); + else + m_wr_prev = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * (m_qp->wr_hd - 1))); + + len = pmsg->len; + + mlog(1, " inline, m_wr %p m_wr_prev %p m_sge %p len %d hd %d tl %d\n", + m_wr, m_wr_prev, m_wr->sg, len, m_qp->wr_hd, m_qp->wr_tl); + + /* IB_WR */ + memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40); + m_wr->wr.sg_list = m_wr->sg; + m_wr->wr.next = 0; + m_wr->wr.num_sge = len ? 1:0; + + /* M_WR */ + m_wr->org_id = pmsg->wr.wr_id; + m_wr->w_idx = m_qp->wr_hd; + m_wr->flags = 0; + m_wr->context = (uint64_t)m_qp; + + if (mix_shared_buffer) { + l_start = ALIGN_64(smd->m_hd); + if ((l_start + len) > smd->m_shared_len) + l_start = 0; + l_end = l_start + len; + + if (l_start < smd->m_tl && l_end > smd->m_tl) { + mlog(0, " ERR: send inline stalled, no bufs, hd %d tl %d ln %d\n", + smd->m_hd, smd->m_tl, len); + goto bail; /* todo queue up, don't fail */ + } + m_wr->sg->addr = (uint64_t)(smd->m_buf + l_start); + m_wr->sg->lkey = smd->m_mr->lkey; + m_wr->sg->length = len; + smd->m_hd = m_wr->m_idx = l_end; /* move proxy buffer hd */ + } + else { + l_start = ALIGN_64(m_qp->m_hd); + if ((l_start + len) > m_qp->m_len) + l_start = 0; + l_end = l_start + len; + + if (l_start < m_qp->m_tl && l_end > m_qp->m_tl) { + mlog(0, " ERR: send inline stalled, no bufs, hd %d tl %d ln %d\n", + m_qp->m_hd, m_qp->m_tl, len); + goto bail; /* todo queue up, don't fail */ + } + m_wr->sg->addr = (uint64_t)(m_qp->m_buf + l_start); + m_wr->sg->lkey = m_qp->m_mr->lkey; + m_wr->sg->length = len; + m_qp->m_hd = m_wr->m_idx = l_end; /* move proxy buffer hd */ + } + + if (len) { + /* copy data into proxy buffer, signal TX thread via wr_id */ + ret = scif_recv(smd->scif_op_ep, (void*)m_wr->sg->addr, len, SCIF_RECV_BLOCK); + if (ret != len) { + mlog(0, " ERR: scif_recv inline DATA, ret %d, exp %d\n", ret, len); + goto bail; + } + } + + if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) { + if (mix_eager_completion) + m_wr->flags |= M_SEND_CN_EAGER_SIG; + else + m_wr->flags |= M_SEND_CN_SIG; + } + + if (len <= mcm_ib_inline) + m_wr->wr.send_flags |= IBV_SEND_INLINE; + + /* order ok, nothing pending, post here */ + if (m_qp->wr_hd == m_qp->wr_tl || + m_wr_prev->flags & M_SEND_POSTED) { + struct ibv_send_wr *bad_wr; + + /* segmentation separate from consumer signaling */ + if (!((m_qp->post_cnt+1) % mcm_ib_signal_rate)) { + m_wr->flags |= M_SEND_MP_SIG; + m_wr->wr.send_flags |= IBV_SEND_SIGNALED; + } + else if (mix_eager_completion) + m_wr->wr.send_flags &= ~IBV_SEND_SIGNALED; + + m_wr->flags |= M_SEND_POSTED; + m_wr->wr.wr_id = (uint64_t)m_wr; + + ret = ibv_post_send(m_qp->ib_qp, &m_wr->wr, &bad_wr); + if (!ret) { + m_qp->post_cnt++; + if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) { + m_qp->post_sig_cnt++; + mlog(1, " INLINE signaled, qp %p wr %p wr_id %p cn_flgs %x wr_flgs %x," + " pcnt %d sg_rate %d sqe_cnt %d, hd %d tl %d sz %d\n", + m_qp, m_wr, m_wr->wr.wr_id, pmsg->wr.send_flags, + m_wr->wr.send_flags, + m_qp->post_cnt, mcm_ib_signal_rate, + m_qp->wr_end, m_qp->wr_hd, m_qp->wr_tl, len); + } + if (m_wr->wr.opcode == IBV_WR_SEND) + MCNTR(smd->md, MCM_QP_SEND_INLINE); + else + MCNTR(smd->md, MCM_QP_WRITE_INLINE); + } + } else + ret = 0; + + /* took WR slot, move hd */ + if (++m_qp->wr_hd == m_qp->wr_end) + m_qp->wr_hd = 0; + + /* signal TX, CQ thread, this WR data ready */ + m_wr->wr.wr_id = pmsg->wr.wr_id; + if (!(m_wr->flags & M_SEND_POSTED) || + ((m_wr->flags & M_SEND_POSTED) && + (m_wr->wr.send_flags & IBV_SEND_SIGNALED))) + write(smd->md->mc->tx_pipe[1], "w", sizeof("w")); + +bail: + pthread_mutex_unlock(&smd->qplock); + + if (ret || + ((m_wr->flags & M_SEND_POSTED) && + (m_wr->flags & M_SEND_CN_EAGER_SIG))) { + struct ibv_wc wc; + + wc.wr_id = pmsg->wr.wr_id; + wc.byte_len = len; + wc.status = ret ? IBV_WC_GENERAL_ERR:IBV_WC_SUCCESS; + wc.opcode = pmsg->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE; + wc.vendor_err = ret; + mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); + } + + return ret; } /* receive data on connected SCIF endpoints, operation and unsolicited channels, */ @@ -3310,9 +3610,6 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) case MIX_SEND: ret = mix_post_send(smd, (dat_mix_send_t *)phdr); break; - case MIX_WRITE: - ret = mix_post_write(smd, (dat_mix_send_t *)phdr); - break; case MIX_LISTEN: ret = mix_listen(smd, (dat_mix_listen_t *)phdr); break; @@ -3549,7 +3846,7 @@ static void mcm_cm_disc(mcm_cm_t *cm) int finalize = 1; pthread_mutex_lock(&cm->lock); - mlog(1," enter: state = %s \n", mcm_state_str(cm->state)); + mlog(0," enter: state = %s \n", mcm_state_str(cm->state)); switch (cm->state) { case MCM_CONNECTED: /* CONSUMER: move to err state to flush */ @@ -3560,7 +3857,7 @@ static void mcm_cm_disc(mcm_cm_t *cm) cm->msg.op = htons(MCM_DREQ); cm->retries = 0; finalize = 0; /* wait for DREP */ - mlog(1, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n", + mlog(0, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n", cm->retries+1, cm->entry.tid, htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn), htons(cm->msg.sport),htons(cm->msg.daddr.lid), @@ -3598,7 +3895,7 @@ static void mcm_cm_disc(mcm_cm_t *cm) /* DREQ received, send DREP and schedule event, finalize */ cm->msg.op = htons(MCM_DREP); cm->state = MCM_DISCONNECTED; - mlog(1, " DREQ_in: cm_id %d send DREP %x %x %x -> %x %x %x\n", + mlog(0, " DREQ_in: cm_id %d send DREP %x %x %x -> %x %x %x\n", cm->entry.tid, htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn), htons(cm->msg.sport),htons(cm->msg.daddr.lid), htonl(cm->msg.dqpn), htons(cm->msg.dport)); @@ -4145,32 +4442,94 @@ static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer) void mcm_cq_event(struct mcm_cq *m_cq) { struct ibv_cq *ib_cq; + struct mcm_qp *m_qp; + struct mcm_wr *m_wr; void *cq_ctx; - int ret, notify = 0; - struct ibv_wc wc[10]; + int i, ret, num, notify = 0; + struct ibv_wc wc[DAT_MIX_WC_MAX]; + struct ibv_wc wc_ev[DAT_MIX_WC_MAX]; - mlog(1," m_cq(%p) \n", m_cq); + mlog(2," m_cq(%p) \n", m_cq); ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx); - mlog(1," m_cq %p == cq_ctx %p \n", m_cq, cq_ctx); - if (ret == 0) { + if (ret == 0) ibv_ack_cq_events(m_cq->ib_cq, 1); - } + retry: - ret = ibv_poll_cq(m_cq->ib_cq, 10, wc); - mlog(1," completions = %d \n", ret); + ret = ibv_poll_cq(m_cq->ib_cq, DAT_MIX_WC_MAX, wc); if (ret <= 0) { if (!ret && !notify) { ibv_req_notify_cq(m_cq->ib_cq, 0); notify = 1; goto retry; } + mlog(2," m_cq(%p) empty, armed... \n", m_cq); return; } else notify = 0; - /* NOTE: if WR was segmented update length, no context to handle so let mcm client handle */ - mix_dto_event(m_cq, wc, ret); + num = 0; + for (i=0; icontext; + + mlog(1," wr_id[%d of %d] m_wr %p m_qp %p\n", i, ret, m_wr, m_qp); + + m_qp->comp_cnt++; + + if (wc[i].status == IBV_WC_SUCCESS) { + if (m_wr->flags & M_SEND_CN_SIG) { + wc_ev[num].wr_id = m_wr->org_id; + wc_ev[num].status = IBV_WC_SUCCESS; + wc_ev[num].byte_len = wc[i].byte_len; + num++; + } + } + else { + /* segmentation, only report first error */ + if (m_cq->prev_id != m_wr->org_id) { + char *sbuf = (char*)m_wr->sg->addr; + + mlog(0," DTO ERR: st %d, vn %x pst %d cmp %d qstate 0x%x\n", + wc[i].status, wc[i].vendor_err, m_qp->post_cnt, + m_qp->comp_cnt, m_qp->ib_qp->state); + mlog(0, " DTO ERR: m_wr %p laddr %p=0x%x - %p=0x%x, len=%d, lkey=%x\n", + m_wr, sbuf, sbuf[0], &sbuf[m_wr->sg->length], + sbuf[m_wr->sg->length], m_wr->sg->length, m_wr->sg->lkey); + mlog(0, " DTO ERR: wr_id %#016Lx next %p sglist %p sge %d op %d flgs" + " %d idata 0x%x raddr %p rkey %x \n", + m_wr->org_id, m_wr->wr.next, m_wr->sg, m_wr->wr.num_sge, + m_wr->wr.opcode, m_wr->wr.send_flags, m_wr->wr.imm_data, + m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey); + + m_cq->prev_id = m_wr->org_id; + wc_ev[num].wr_id = m_wr->org_id; + wc_ev[num].status = wc[i].status; + num++; + } + } + if (mix_shared_buffer) { + mlog(1," smd->mbuf_tl %p to %p, wr_tl %d to %d, pst %d,%d cmp %d\n", + m_qp->smd->m_tl, m_wr->m_idx, m_qp->wr_tl, + m_wr->w_idx, m_qp->post_cnt, m_qp->post_sig_cnt, + m_qp->comp_cnt); + pthread_mutex_lock(&m_qp->smd->qplock); + m_qp->smd->m_tl = m_wr->m_idx; + m_qp->wr_tl = m_wr->w_idx; + pthread_mutex_unlock(&m_qp->smd->qplock); + } + else { + mlog(1," qp->mbuf_tl %p to %p, wr_tl %p to %d\n", + m_qp->m_tl, m_wr->m_idx); + pthread_mutex_lock(&m_qp->smd->qplock); + m_qp->m_tl = m_wr->m_idx; + m_qp->wr_tl = m_wr->w_idx; + pthread_mutex_unlock(&m_qp->smd->qplock); + } + } + if (num) + mix_dto_event(m_cq, wc_ev, num); + goto retry; } @@ -4187,8 +4546,9 @@ void mpxy_tx_thread(void *mic_client) mcm_client_t *mc = (mcm_client_t*)mic_client; struct mcm_ib_dev *md; struct mcm_scif_dev *smd; + struct mcm_cq *m_cq; struct pollfd set; - int i, data =0; + int i, data, events; char rbuf[2]; if (mcm_affinity) { @@ -4203,7 +4563,7 @@ void mpxy_tx_thread(void *mic_client) while (!finished) { pthread_mutex_lock(&mc->txlock); - data = 0; + data = 0, events = 0; for (i=0;imdev[i]; if (md->ibctx == NULL) @@ -4212,27 +4572,37 @@ void mpxy_tx_thread(void *mic_client) /* all active DAPL MCM clients on this IB device */ pthread_mutex_lock(&md->slock); smd = get_head_entry(&md->smd_list); - while (smd) { - smd->ref_cnt++; + while (smd && !smd->destroy) { + smd->th_ref_cnt++; pthread_mutex_unlock(&md->slock); - m_post_pending_wr(smd, &data); /* chk pending WR's */ + + pthread_mutex_lock(&smd->cqlock); + m_cq = get_head_entry(&smd->cqlist); + while (m_cq) { + mcm_cq_event(m_cq); /* chk DTO completion */ + m_cq = get_next_entry(&m_cq->entry, &smd->cqlist); + } + pthread_mutex_unlock(&smd->cqlock); + + m_post_pending_wr(smd, &data, &events); /* chk pending WR's */ + pthread_mutex_lock(&md->slock); - smd->ref_cnt--; + smd->th_ref_cnt--; smd = get_next_entry(&smd->entry, &md->smd_list); } pthread_mutex_unlock(&md->slock); } pthread_mutex_unlock(&mc->txlock); - if (!data) { /* don't sleep if we have pending writes, on any device */ + if (!data && !events) { /* don't sleep if we have pending writes or completions */ set.fd = mc->tx_pipe[0]; set.events = POLLIN; set.revents = 0; - mlog(1, "TX thread waiting, no data\n"); + mlog(1, " waiting, no data\n"); poll(&set, 1, -1); /* no more pending, next scif_readfrom will signal */ - mlog(1, "TX thread signaled\n"); + mlog(1, " signaled\n"); - if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) + while (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN) read(mc->tx_pipe[0], rbuf, 2); } } @@ -4245,7 +4615,6 @@ void mpxy_op_thread(void *mic_client) struct mcm_fd_set *set; struct mcm_ib_dev *md; struct mcm_scif_dev *smd, *next; - struct mcm_cq *m_cq; char rbuf[2]; int i, ret, time_ms; @@ -4279,20 +4648,13 @@ void mpxy_op_thread(void *mic_client) mcm_fd_set(md->ibctx->async_fd, set, POLLIN); mcm_fd_set(md->rch->fd, set, POLLIN); - /* all active SCIF MIC clients */ + /* all active SCIF MIC clients, OP channels */ pthread_mutex_lock(&md->slock); smd = get_head_entry(&md->smd_list); while (smd) { - pthread_mutex_lock(&smd->cqlock); - m_cq = get_head_entry(&smd->cqlist); - while (m_cq) { - mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN); - m_cq = get_next_entry(&m_cq->entry, &smd->cqlist); - } - pthread_mutex_unlock(&smd->cqlock); mcm_fd_set(smd->scif_op_ep, set, POLLIN); mcm_check_timers(smd, &time_ms); - smd = get_next_entry(&smd->entry, &md->smd_list);; + smd = get_next_entry(&smd->entry, &md->smd_list); } pthread_mutex_unlock(&md->slock); } @@ -4312,27 +4674,17 @@ void mpxy_op_thread(void *mic_client) /* SCIF MIC client ops and conn messages */ pthread_mutex_lock(&md->slock); smd = get_head_entry(&md->smd_list); - while (smd) { - smd->ref_cnt++; + while (smd && !smd->destroy) { + smd->th_ref_cnt++; next = get_next_entry(&smd->entry, &md->smd_list); pthread_mutex_unlock(&md->slock); - pthread_mutex_lock(&smd->cqlock); - m_cq = get_head_entry(&smd->cqlist); - while (m_cq) { - ret = mcm_poll(m_cq->ib_ch->fd, POLLIN); - if (ret == POLLIN) - mcm_cq_event(m_cq); /* completions */ - m_cq = get_next_entry(&m_cq->entry, &smd->cqlist); - } - pthread_mutex_unlock(&smd->cqlock); - ret = mcm_poll(smd->scif_op_ep, POLLIN); /* operations */ if (ret == POLLIN) ret = mix_scif_recv(smd, smd->scif_op_ep); pthread_mutex_lock(&md->slock); - smd->ref_cnt--; + smd->th_ref_cnt--; next = get_next_entry(&smd->entry, &md->smd_list); if (ret == POLLERR) @@ -4344,6 +4696,7 @@ void mpxy_op_thread(void *mic_client) } pthread_mutex_unlock(&mc->oplock); } + free(set); mlog(0, "OP,CM,Event thread exiting\n"); } void mpxy_cm_thread(void *mic_client) @@ -4371,6 +4724,7 @@ void mpxy_cm_thread(void *mic_client) md = &mc->mdev[i]; if (md->ibctx == NULL) continue; + set[fds].fd = mc->cm_pipe[0]; set[fds].events = POLLIN; set[fds].revents = 0;