/* MCM Endpoint CM objects */
void m_cm_free(mcm_cm_t *cm)
{
+ mlog(2, "CM %p free: qp %p ref_cnt %d state %s sid %x\n",
+ cm, cm->m_qp, cm->ref_cnt,
+ mcm_state_str(cm->state), cm->sid);
+
+ mpxy_lock(&cm->lock);
+ if (cm->state != MCM_DESTROY) {
+ cm->ref_cnt--; /* alloc ref */
+ cm->state = MCM_DESTROY;
+ }
+
/* client, release local conn id port */
if (!cm->l_ep && cm->sid) {
mcm_free_port(cm->md->ports, cm->sid);
cm->sid = 0;
}
-
- mpxy_lock_destroy(&cm->lock);
- cm->smd->ref_cnt--;
- free(cm);
+ /* last reference, destroy */
+ if (!cm->ref_cnt) {
+ mpxy_unlock(&cm->lock);
+ mpxy_lock_destroy(&cm->lock);
+ cm->smd->ref_cnt--;
+ free(cm);
+ } else
+ mpxy_unlock(&cm->lock);
}
mcm_cm_t *m_cm_create(mcm_scif_dev_t *smd, mcm_qp_t *m_qp, dat_mcm_addr_t *r_addr)
if (mpxy_lock_init(&cm->lock, NULL))
goto bail;
+ cm->ref_cnt++; /* alloc ref */
cm->state = MCM_INIT;
cm->smd = smd;
cm->md = smd->md;
mpxy_lock_destroy(&cm->lock);
goto bail;
}
+ cm->ref_cnt++; /* Active: QP ref */
cm->m_qp = m_qp;
m_qp->cm = cm;
/* queue up connection object on CM list */
void mcm_qconn(mcm_scif_dev_t *smd, mcm_cm_t *cm)
{
+ mpxy_lock(&cm->lock);
+ cm->ref_cnt++; /* clist ref */
+ mpxy_unlock(&cm->lock);
+
/* add to CONN work queue, list, for mcm fabric CM */
mpxy_lock(&smd->clock);
insert_tail(&cm->entry, &smd->clist, (void *)cm);
remove_entry(&cm->entry);
mpxy_unlock(&smd->clock);
+ mpxy_lock(&cm->lock);
+ cm->ref_cnt--; /* clist ref */
+ mpxy_unlock(&cm->lock);
+
+ m_cm_free(cm);
}
/* queue listen object on listen list */
void mcm_qlisten(mcm_scif_dev_t *smd, mcm_cm_t *cm)
{
+ mpxy_lock(&cm->lock);
+ cm->ref_cnt++; /* llist ref */
+ mpxy_unlock(&cm->lock);
+
/* add to LISTEN work queue, list, for mcm fabric CM */
mpxy_lock(&smd->llock);
insert_tail(&cm->entry, &smd->llist, (void *)cm);
remove_entry(&cm->entry);
mcm_free_port(smd->md->ports, cm->sid);
cm->sid = 0;
- m_cm_free(cm);
mpxy_unlock(&smd->llock);
+
+ mpxy_lock(&cm->lock);
+ cm->ref_cnt--; /* llist ref */
+ mpxy_unlock(&cm->lock);
+
+ m_cm_free(cm);
}
/*
break;
case MCM_DISCONNECTED:
case MCM_FREE:
+ case MCM_DESTROY:
/* DREQ dropped, resend */
if (ntohs(msg->op) == MCM_DREQ) {
MCNTR(md, MCM_CM_DREQ_DUP);
break;
}
default:
- mlog(8, " mcm_recv: Warning, UNKNOWN state"
+ mlog(2, " mcm_recv: Warning, UNKNOWN state"
" <- op %s, %s spsp %x sqpn %x slid %x\n",
mcm_op_str(ntohs(msg->op)), mcm_state_str(cm->state),
ntohs(msg->sport), ntohl(msg->sqpn), ntohs(msg->saddr1.lid));
mpxy_unlock(&md->slock);
if (!cm && !dup) {
- mlog(1, " %s - op %s [lid, port, cqp, iqp, pid]:"
+ mlog(2, " %s - op %s [lid, port, cqp, iqp, pid]:"
" %x %x %x %x %x <- %x %x %x %x lpid %x rpid %x\n",
ntohs(msg->op) == MCM_REQ ? "NO LISTENER":"NO MATCH",
mcm_op_str(ntohs(msg->op)),
struct ibv_qp *qp1 = m_qp->ib_qp1;
struct ibv_qp *qp2 = m_qp->ib_qp2;
+
m_qp->ib_qp1 = NULL;
m_qp->ib_qp2 = NULL;
- if (m_qp->cm) { /* unlink CM */
+ if (m_qp->cm) { /* unlink CM, serialized */
+ struct mcm_cm *cm = m_qp->cm;
+
+ mpxy_lock(&cm->lock);
+ m_qp->cm->ref_cnt--; /* QP ref */
m_qp->cm->m_qp = NULL;
m_qp->cm = NULL;
+ mpxy_unlock(&cm->lock);
+ mcm_dqconn(m_qp->smd, cm);
}
if (qp1) {
mcm_modify_qp(qp1, IBV_QPS_ERR, 0, 0, NULL);
struct mcm_cm *m_cm;
struct ibv_qp *qp, *qp2 = NULL;
union ibv_gid *dgid = NULL, *dgid2 = NULL;
- uint32_t dqpn, dqpn2 = 0;
- uint16_t dlid, dlid2 = 0;
+ uint32_t dqpn = 0, dqpn2 = 0;
+ uint16_t dlid = 0, dlid2 = 0;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
mlog(0, " ERR: mix_get_cm, id %d, not found\n", pmsg->cm_id);
return -1;
}
-
+ mpxy_lock(&m_cm->lock);
/* update CM message from MIX client, save clients id,ctx */
m_cm->cm_id = 0; /* no client id for now, just ctx */
m_cm->cm_ctx = pmsg->sp_ctx;
m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id, 0);
if (!m_cm->m_qp) {
mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id);
+ mpxy_unlock(&m_cm->lock);
return -1;
}
+ m_cm->ref_cnt++; /* Passive: QP ref */
m_cm->m_qp->cm = m_cm;
mcm_save_wrc(m_cm); /* save remote proxy-in WRC QP info */
mlog(2, " MXS -> MSS remote \n");
if (m_qp_create_pi(smd, m_cm->m_qp))
- return -1;
+ goto err;
if (m_pi_prep_rcv_q(m_cm->m_qp))
- return -1;
+ goto err;
/* KR to KL or XEON, QP1<-QP2 and QP2->QP1 */
/* update the src information in CM msg */
mlog(2, " MXS -> MXS remote \n");
if (m_pi_prep_rcv_q(m_cm->m_qp))
- return -1;
+ goto err;
/* update the QPt src information in CM msg */
m_cm->msg.saddr1.ep_map = MIC_XSOCK_DEV;
goto err;
if (m_pi_prep_rcv_q(m_cm->m_qp))
- return -1;
+ goto err;
}
mcm_init_wrc(m_cm); /* send back proxy-in WR/WC raddr,rkey info */
mcm_pr_addrs(2, &m_cm->msg, m_cm->state, 0);
/* send RTU on wire, monitor for retries */
m_cm->state = MCM_RTU_PENDING;
+ mpxy_unlock(&m_cm->lock);
mcm_cm_rep_out(m_cm);
return 0;
err:
ntohl(dqpn), ntohs(dlid));
mcm_pr_addrs(0, &m_cm->msg, m_cm->state, 0);
+ mpxy_unlock(&m_cm->lock);
return -1;
}
m_cm = get_head_entry(&smd->llist);
while (m_cm) {
next_cm = get_next_entry(&m_cm->entry, &smd->llist);
- if (m_cm->sid)
- mcm_free_port(smd->md->ports, m_cm->sid);
- m_cm_free(m_cm);
+ mcm_dqlisten(smd, m_cm); /* dequeue and free */
m_cm = next_cm;
}
init_list(&smd->llist);
m_cm = get_head_entry(&smd->clist);
while (m_cm) {
next_cm = get_next_entry(&m_cm->entry, &smd->clist);
- m_cm_free(m_cm);
+ mcm_dqconn(smd, m_cm); /* dequeue and free */
m_cm = next_cm;
}
init_list(&smd->clist);
/* DAPL MCM Connection/Listen object */
typedef struct mcm_cm {
LLIST_ENTRY entry;
- mpxy_lock_t lock;
+ mpxy_lock_t lock;
struct mcm_ib_dev *md; /* mcm_ib_dev parent reference */
struct mcm_scif_dev *smd; /* mcm_scif_dev parent reference */
struct mcm_cm *l_ep; /* listen reference, passive */
/* mix.c, MIC message exchange (MIX) services */
void m_cq_free(struct mcm_cq *m_cq);
void m_qp_free(struct mcm_qp *m_qp);
-void m_cm_free(struct mcm_cm *m_cm);
void m_mr_free(struct mcm_mr *m_mr);
int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep);
int mix_cm_disc_in(mcm_cm_t *m_cm);