From 0620d2a2238525658d22bd41a2738b1527aa684e Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Mon, 14 Apr 2014 17:09:32 -0700 Subject: [PATCH] mpxyd: serialize MD cm port space usage, add swap to rej call, resend dropped rej Signed-off-by: Arlin Davis --- dapl/svc/mcm.c | 67 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/dapl/svc/mcm.c b/dapl/svc/mcm.c index a3d66bb..63afa9a 100644 --- a/dapl/svc/mcm.c +++ b/dapl/svc/mcm.c @@ -466,8 +466,10 @@ void m_cm_free(mcm_cm_t *cm) /* client, release local conn id port */ if (!cm->l_ep && cm->sid) { + mpxy_lock(&cm->md->plock); mcm_free_port(cm->md->ports, cm->sid); cm->sid = 0; + mpxy_unlock(&cm->md->plock); } /* last reference, destroy */ if (!cm->ref_cnt) { @@ -510,7 +512,9 @@ mcm_cm_t *m_cm_create(mcm_scif_dev_t *smd, mcm_qp_t *m_qp, dat_mcm_addr_t *r_add /* ACTIVE: init source address QP info from MPXYD and MIC client */ if (m_qp) { + mpxy_lock(&smd->md->plock); cm->sid = mcm_get_port(smd->md->ports, 0, (uint64_t)cm); + mpxy_unlock(&smd->md->plock); cm->msg.sport = htons(cm->sid); if (!cm->msg.sport) { mpxy_lock_destroy(&cm->lock); @@ -617,9 +621,12 @@ void mcm_dqlisten(mcm_scif_dev_t *smd, mcm_cm_t *cm) { mpxy_lock(&smd->llock); remove_entry(&cm->entry); + mpxy_unlock(&smd->llock); + + mpxy_lock(&smd->md->plock); mcm_free_port(smd->md->ports, cm->sid); cm->sid = 0; - mpxy_unlock(&smd->llock); + mpxy_unlock(&smd->md->plock); mpxy_lock(&cm->lock); cm->ref_cnt--; /* llist ref */ @@ -818,25 +825,37 @@ static int mcm_post_rmsg(mcm_ib_dev_t *md, dat_mcm_msg_t *msg) return (ibv_post_recv(md->qp, &recv_wr, &recv_err)); } -int mcm_cm_rej_out(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, DAT_MCM_OP type) +int mcm_cm_rej_out(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, DAT_MCM_OP type, int swap) { dat_mcm_msg_t smsg; memset(&smsg, 0, sizeof(smsg)); smsg.ver = htons(DAT_MCM_VER); smsg.op = htons(type); - smsg.dport = msg->dport; - smsg.dqpn = msg->dqpn; - smsg.sport = msg->sport; - smsg.sqpn = msg->sqpn; - memcpy(&smsg.daddr1, &msg->daddr1, sizeof(dat_mcm_addr_t)); - memcpy(&smsg.saddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); - memcpy(&smsg.daddr2, &msg->daddr2, sizeof(dat_mcm_addr_t)); - memcpy(&smsg.saddr2, &msg->saddr2, sizeof(dat_mcm_addr_t)); - - mlog(2," CM reject -> LID %x, QPN %x PORT %x\n", - ntohs(smsg.daddr1.lid), - ntohl(smsg.dqpn), ntohs(smsg.dport)); + + if (swap) { + smsg.dport = msg->sport; + smsg.dqpn = msg->sqpn; + smsg.sport = msg->dport; + smsg.sqpn = msg->dqpn; + memcpy(&smsg.saddr1, &msg->daddr1, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.daddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.saddr2, &msg->daddr2, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.daddr2, &msg->saddr2, sizeof(dat_mcm_addr_t)); + } else { + smsg.dport = msg->dport; + smsg.dqpn = msg->dqpn; + smsg.sport = msg->sport; + smsg.sqpn = msg->sqpn; + memcpy(&smsg.daddr1, &msg->daddr1, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.saddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.daddr2, &msg->daddr2, sizeof(dat_mcm_addr_t)); + memcpy(&smsg.saddr2, &msg->saddr2, sizeof(dat_mcm_addr_t)); + } + + mlog(2," sLID %x, sQPN %x sPORT %x -> dLID %x, dQPN %x dPORT %x\n", + ntohs(smsg.saddr1.lid), ntohl(smsg.sqpn), ntohs(smsg.sport), + ntohs(smsg.daddr1.lid), ntohl(smsg.dqpn), ntohs(smsg.dport)); if (type == MCM_REJ_USER) MCNTR(md, MCM_CM_REJ_USER_OUT); @@ -990,7 +1009,8 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, mpxy_lock(&cm->lock); switch (cm->state) { case MCM_LISTEN: /* passive */ - mlog(2, "LISTEN: req_in: l_cm %p, sid %d\n", cm, cm->sid); + mlog(2, "LISTEN: req_in: dev_id %d l_cm %p, sid %d,0x%x\n", + cm->smd->entry.tid, cm, cm->sid, cm->sid); mpxy_unlock(&cm->lock); MCNTR(md, MCM_CM_REQ_IN); mix_cm_req_in(cm, msg, len); @@ -1155,9 +1175,9 @@ mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg, int *dup) mpxy_lock_t *lock; int listenq = 0; - mlog(8, " <- rmsg: op %s [lid, sid, cQP, QPr, QPt, pid]: " + mlog(8, " <- SMD %p rmsg(%p): op %s [lid, sid, cQP, QPr, QPt, pid]: " "%x %x %x %x %x <- %x %x %x %x %x l_pid %x r_pid %x\n", - mcm_op_str(ntohs(msg->op)), + smd, msg, mcm_op_str(ntohs(msg->op)), ntohs(msg->daddr1.lid), ntohs(msg->dport), ntohl(msg->dqpn), ntohl(msg->daddr1.qpn), ntohl(msg->daddr2.qpn), ntohs(msg->saddr1.lid), ntohs(msg->sport), ntohl(msg->sqpn), ntohl(msg->saddr1.qpn), @@ -1220,6 +1240,11 @@ retry_listenq: ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn)); *dup = 1; MCNTR(cm->md, MCM_CM_ERR_REQ_DUP); + if (cm->state == MCM_REJECTED) { /* REJ dropped, resend REJ*/ + mlog(1," DUPLICATE: CM_REQ in: REJECT state, resend CM_REJ %d\n", + ntohs(cm->msg.op)); + mcm_cm_rej_out(cm->md, &cm->msg, ntohs(cm->msg.op), 1); + } return NULL; } } @@ -1268,7 +1293,7 @@ mcm_cm_t *mcm_get_cm(mcm_ib_dev_t *md, dat_mcm_msg_t *msg) ntohl(msg->d_id)); if (ntohs(msg->op) == MCM_REQ) - mcm_cm_rej_out(md, msg, MCM_REJ_CM); + mcm_cm_rej_out(md, msg, MCM_REJ_CM, 1); if (ntohs(msg->op) == MCM_DREP) MCNTR(md, MCM_CM_ERR_DREP_DUP); @@ -1315,8 +1340,8 @@ retry: for (i = 0; i < ret; i++) { msg = (dat_mcm_msg_t*) (uintptr_t) wc[i].wr_id; - mlog(8, " mcm_recv: stat=%d op=%s ln=%d id=%p sqp=%x\n", - wc[i].status, mcm_op_str(ntohs(msg->op)), + mlog(2, " mcm_recv[%d]: stat=%d op=%s ln=%d id=%p sqp=%x\n", + i, wc[i].status, mcm_op_str(ntohs(msg->op)), wc[i].byte_len, (void*)wc[i].wr_id, wc[i].src_qp); MCNTR(md, MCM_CM_MSG_IN); @@ -1330,7 +1355,7 @@ retry: continue; } if (!(cm = mcm_get_cm(md, msg))) { - mlog(2, " NO_MATCH cm_msg %p, op %s ver %d st %x ln %d\n", + mlog(2, " NO_MATCH or DUP: post_rmsg %p op %s ver %d st %x ln %d\n", msg, mcm_op_str(ntohs(msg->op)), ntohs(msg->ver), wc[i].status, wc[i].byte_len); mcm_post_rmsg(md, msg); -- 2.46.0