From 905ed84ab3e67c8478873988344c052c8ef1e97c Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Wed, 29 May 2013 16:00:32 -0700 Subject: [PATCH] mpxyd: CM optimizations for MIC clients, improved checking on inbound CM messages allow CM operations to be received on OP or EV channels from MIC clients and provide each SMD channel with aligned message buffer for scif_recv processing. add checking for NO match at MD level after checking all SMD children for inbound CM message match and add dump_cm_lists function for debug. add check for inline message threshold, DAT_MIX_INLINE_MAX Signed-off-by: Arlin Davis --- dapl/svc/mpxyd.c | 202 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 139 insertions(+), 63 deletions(-) diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index 754b4b7..643d9ee 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -307,6 +307,7 @@ typedef struct mcm_scif_dev { int m_len; /* buffer size */ int m_seg; /* segment size */ int m_shared_len; /* shared buffer size */ + char *cmd_buf; /* operation command buffer */ } mcm_scif_dev_t; @@ -822,8 +823,11 @@ static void mpxy_set_options( int debug_mode ) mix_buffer_sg = atoi(value); else if (!strcasecmp("buffer_alignment", opt)) mix_align = atoi(value); - else if (!strcasecmp("buffer_inline_threshold", opt)) + else if (!strcasecmp("buffer_inline_threshold", opt)) { mix_inline_threshold = atoi(value); + if (mix_inline_threshold > DAT_MIX_INLINE_MAX) + mix_inline_threshold = DAT_MIX_INLINE_MAX; + } else if (!strcasecmp("mcm_depth", opt)) mcm_depth = atoi(value); else if (!strcasecmp("scif_port_id", opt)) @@ -1651,7 +1655,8 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd) ibv_dereg_mr(smd->m_mr); free (smd->m_buf); - mlog(1, " m_buf unregistered and freed \n"); + free (smd->cmd_buf); + mlog(1, " m_buf, cmd_buf unregistered and freed\n"); if (smd->ref_cnt) { mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt); @@ -1688,7 +1693,16 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e memset(smd, 0, sizeof(*smd)); smd->md = md; - /* small shared SEND message pool, TODO protect this across QP's */ + ret = posix_memalign((void **)&smd->cmd_buf, 64, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX)); + if (ret) { + mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n", + ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret); + goto err; + } + mlog(1, "Allocated smd cmd_buf = %p len %d\n", + smd->cmd_buf, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX)); + + /* small shared SEND message pool */ smd->m_seg = mix_buffer_sg; if (mix_shared_buffer) smd->m_shared_len = mix_buffer_mb * (1024 * 1024); @@ -2664,8 +2678,8 @@ static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event) return (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len)); } -/* Active: new connection request operation, create CM object */ -static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +/* Active: new connection request operation, consumer context, create CM object */ +static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep) { int len, ret; struct mcm_qp *m_qp = NULL; @@ -2673,7 +2687,7 @@ static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -2739,14 +2753,14 @@ resp: } /* disconnect request out */ -static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep) { int len, ret; struct mcm_cm *m_cm; /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -2767,14 +2781,14 @@ static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) } /* Active, reply received, send RTU, unsolicited channel */ -static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep) { int len, ret; struct mcm_cm *m_cm; /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -2935,14 +2949,14 @@ static int mix_cm_rtu_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) } /* PASSIVE, accept connect request from client, cr_reply */ -static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep) { int len, ret; struct mcm_cm *m_cm; /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -3024,14 +3038,14 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) } /* PASSIVE, user reject from MIX client */ -static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep) { int len, ret; struct mcm_cm *m_cm; /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -3651,21 +3665,25 @@ bail: return ret; } +static int last_op; + /* receive data on connected SCIF endpoints, operation and unsolicited channels, */ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) { - char cmd[DAT_MIX_MSG_MAX]; - dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd; + dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)smd->cmd_buf; int ret, len; len = sizeof(*phdr); mlog(1, " -> scif_ep %d len %d ...\n", scif_ep, len); ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK); if ((ret != len) || (phdr->ver != DAT_MIX_VER)) { - mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n", - scif_ep, ret, len, phdr->ver); - return -1; + mlog(0, " ERR: smd %p ep %d ret %d exp %d ver %d op %d flgs %d last_op %s\n", + smd, scif_ep, ret, len, phdr->ver, phdr->op, + phdr->flags, mix_op_str(last_op)); + return POLLERR; /* fatal, close device */ } + last_op = phdr->op; + mlog(1, " <- %d bytes: ver %d, op %d, flags %d scif_ep %d\n", len, phdr->ver, phdr->op, phdr->flags, scif_ep); @@ -3703,26 +3721,26 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) ret = mix_listen_free(smd, phdr); break; case MIX_CM_REQ: - ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr, scif_ep); break; case MIX_CM_REP: case MIX_CM_ACCEPT: - ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr, scif_ep); break; case MIX_CM_REJECT: - ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr, scif_ep); break; case MIX_CM_RTU: case MIX_CM_EST: - ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr, scif_ep); break; case MIX_CM_DISC: - ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr, scif_ep); break; case MIX_CM_DREP: default: - mlog(0, " ERROR!!! unknown MIX operation: %d\n", phdr->op); - return -1; + mlog(0, " ERR: SMD %p unknown msg->op: %d, close device\n", smd, phdr->op); + return POLLERR; /* fatal, close device */ } MCNTR(smd->md, MCM_SCIF_RECV); return ret; @@ -4039,9 +4057,9 @@ static int mcm_cm_rep_out(mcm_cm_t *cm) } if (cm->retries == cm->md->retries) { - mlog(0, " CM_REPLY: RETRIES EXHAUSTED (lid port qpn)" + mlog(0, " CM_REPLY: RETRIES (%d) EXHAUSTED (lid port qpn)" " %x %x %x -> %x %x %x\n", - htons(cm->msg.saddr.lid), + cm->retries, htons(cm->msg.saddr.lid), htons(cm->msg.sport), htonl(cm->msg.saddr.qpn), htons(cm->msg.daddr.lid), @@ -4176,8 +4194,55 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, } } + +static void mcm_dump_cm_lists(mcm_scif_dev_t *smd) +{ + mcm_cm_t *cm = NULL, *next; + LLIST_ENTRY *list; + pthread_mutex_t *lock; + + mlog(0, " SMD %p : \n"); + + /* listen list*/ + list = &smd->llist; + lock = &smd->llock; + pthread_mutex_lock(lock); + next = get_head_entry(list); + while (next) { + cm = next; + next = get_next_entry(&cm->entry, list); + + mlog(0, " CM_LIST %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n", + mcm_state_str(cm->state), + ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id), + ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id)); + } + pthread_mutex_unlock(lock); + + /* conn list */ + list = &smd->clist; + lock = &smd->clock; + pthread_mutex_lock(lock); + next = get_head_entry(list); + while (next) { + cm = next; + next = get_next_entry(&cm->entry, list); + + mlog(0, " CM_CONN %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n", + mcm_state_str(cm->state), + ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id), + ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id)); + } + pthread_mutex_unlock(lock); + +} + /* Find matching CM object for this receive message, return CM reference, timer */ -mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg) +mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg, int *dup) { mcm_cm_t *cm = NULL, *next, *found = NULL; LLIST_ENTRY *list; @@ -4195,6 +4260,7 @@ mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg) /* conn list first, duplicate requests for MCM_REQ */ list = &smd->clist; lock = &smd->clock; + *dup = 0; retry_listenq: pthread_mutex_lock(lock); @@ -4246,6 +4312,7 @@ retry_listenq: ntohl(msg->dqpn), ntohl(msg->daddr.qpn), ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport), ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn)); + *dup = 1; MCNTR(cm->md, MCM_CM_ERR_REQ_DUP); return NULL; } @@ -4261,38 +4328,6 @@ retry_listenq: goto retry_listenq; } -/* don't send reject becasue it may match next smd */ -#if 0 - /* not match on listenq for valid request, send reject */ - if (ntohs(msg->op) == MCM_REQ && !found) { - mlog(1, " mcm_recv: NO LISTENER for %s %x %x i%x c%x" - " < %x %x %x, sending reject\n", - mcm_op_str(ntohs(msg->op)), - ntohs(msg->daddr.lid), ntohs(msg->dport), - ntohl(msg->daddr.qpn), ntohl(msg->dqpn), - ntohs(msg->saddr.lid), ntohs(msg->sport), - ntohl(msg->saddr.qpn)); - - mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM); - } -#endif - - if (!found) { - mlog(1, " mcm_recv: WARNING no match - op %s [lid, port, cqp, iqp, pid]:" - " %x %x %x %x %x <- %x %x %x %x l_pid %x r_pid %x\n", - mcm_op_str(ntohs(msg->op)), - ntohs(msg->daddr.lid), ntohs(msg->dport), - ntohl(msg->dqpn), ntohl(msg->daddr.qpn), - ntohl(msg->d_id), ntohs(msg->saddr.lid), - ntohs(msg->sport), ntohl(msg->sqpn), - ntohl(msg->saddr.qpn), ntohl(msg->s_id), - ntohl(msg->d_id)); - - if (ntohs(msg->op) == MCM_DREP) { - MCNTR(cm->md, MCM_CM_ERR_DREP_DUP); - } - } - return found; } @@ -4301,17 +4336,54 @@ mcm_cm_t *mcm_get_cm(mcm_ib_dev_t *md, dat_mcm_msg_t *msg) { mcm_cm_t *cm = NULL; mcm_scif_dev_t *smd; + int dup; /* Walk scif device client list */ pthread_mutex_lock(&md->slock); smd = get_head_entry(&md->smd_list); while (smd) { - cm = mcm_get_smd_cm(smd, msg); + cm = mcm_get_smd_cm(smd, msg, &dup); if (cm) break; smd = get_next_entry(&smd->entry, &md->smd_list); } pthread_mutex_unlock(&md->slock); + + if (!cm) { + mlog(0, " no match - op %s [lid, port, cqp, iqp, pid]:" + " %x %x %x %x %x <- %x %x %x %x lpid %x rpid %x\n", + mcm_op_str(ntohs(msg->op)), + ntohs(msg->daddr.lid), ntohs(msg->dport), + ntohl(msg->dqpn), ntohl(msg->daddr.qpn), + ntohl(msg->d_id), ntohs(msg->saddr.lid), + ntohs(msg->sport), ntohl(msg->sqpn), + ntohl(msg->saddr.qpn), ntohl(msg->s_id), + ntohl(msg->d_id)); + + if (!dup && (ntohs(msg->op) == MCM_REQ)) { + mlog(0, " mcm_recv: NO LISTENER for %s %x %x i%x c%x" + " < %x %x %x, sending reject\n", + mcm_op_str(ntohs(msg->op)), + ntohs(msg->daddr.lid), ntohs(msg->dport), + ntohl(msg->daddr.qpn), ntohl(msg->dqpn), + ntohs(msg->saddr.lid), ntohs(msg->sport), + ntohl(msg->saddr.qpn)); + mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM); + } + + if (ntohs(msg->op) == MCM_DREP) { + MCNTR(cm->md, MCM_CM_ERR_DREP_DUP); + } + + /* debug */ + pthread_mutex_lock(&md->slock); + smd = get_head_entry(&md->smd_list); + while (smd) { + mcm_dump_cm_lists(smd); + smd = get_next_entry(&smd->entry, &md->smd_list); + } + pthread_mutex_unlock(&md->slock); + } return cm; } @@ -4770,7 +4842,11 @@ void mpxy_op_thread(void *mic_client) ret = mcm_poll(smd->scif_op_ep, POLLIN); /* operations */ if (ret == POLLIN) ret = mix_scif_recv(smd, smd->scif_op_ep); - + if (ret != POLLERR) { + ret = mcm_poll(smd->scif_ev_ep, POLLIN); /* CM, events */ + if (ret == POLLIN) + ret = mix_scif_recv(smd, smd->scif_ev_ep); + } pthread_mutex_lock(&md->slock); smd->th_ref_cnt--; next = get_next_entry(&smd->entry, &md->smd_list); -- 2.46.0