}
}
+DAT_RETURN dapls_evd_cqe_to_event(DAPL_EVD * evd_ptr, ib_work_completion_t *cqe)
+{
+ DAT_RETURN dat_status;
+ DAT_EVENT *event;
+
+ if (evd_ptr->ib_cq_handle == IB_INVALID_HANDLE)
+ return DAT_SUCCESS;
+
+ dapli_evd_eh_print_cqe(cqe); /* For debugging. */
+
+ event = dapli_evd_get_and_init_event(evd_ptr, DAT_DTO_COMPLETION_EVENT);
+ if (event == NULL)
+ return DAT_QUEUE_FULL;
+
+ dapli_evd_cqe_to_event(evd_ptr, cqe, event);
+ dapli_evd_post_event(evd_ptr, event);
+
+ return DAT_SUCCESS;
+}
+
/*
* dapls_evd_copy_cq
*
extern void dapls_evd_post_overflow_event (
IN DAPL_EVD *evd_ptr);
+extern DAT_RETURN dapls_evd_cqe_to_event(
+ IN DAPL_EVD *evd_ptr,
+ ib_work_completion_t *cqe);
+
#endif
DAPL_OS_THREAD g_ib_thread;
DAPL_OS_LOCK g_hca_lock;
struct dapl_llist_entry *g_hca_list;
+static char gid_str[INET6_ADDRSTRLEN];
#if defined(_WIN64) || defined(_WIN32)
#include <rdma\winverbs.h>
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" open_hca: ctx=%p port=%d GID subnet %016llx"
" id %016llx\n", cm_id->verbs, cm_id->port_num,
- (unsigned long long)ntohll(gid->global.subnet_prefix),
- (unsigned long long)ntohll(gid->global.interface_id));
+ inet_ntop(AF_INET6, gid, gid_str, sizeof(gid_str)));
/* support for EVD's with CNO's: one channel via thread */
hca_ptr->ib_trans.ib_cq =
#ifdef _OPENIB_MCM_
/* shadow support, MPXYD */
- if (ia_ptr->hca_ptr->ib_trans.scif_ep)
+ if (ia_ptr->hca_ptr->ib_trans.scif_ep) {
ret = dapli_mix_cq_create(evd_ptr->ib_cq_handle);
- if (ret)
- goto err;
+ if (ret)
+ goto err;
+ }
#endif
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
#define DCM_TCLASS 0
/* DAPL uCM timers, default queue sizes */
-#define DCM_RETRY_CNT 15
+#define DCM_RETRY_CNT 1
#define DCM_REP_TIME 800 /* reply timeout in m_secs */
#define DCM_RTU_TIME 400 /* rtu timeout in m_secs */
#define DCM_QP_SIZE 500 /* uCM tx, rx qp size */
" post_snd: op 0x%x flags 0x%x sglist %p, %d\n",
wr.opcode, wr.send_flags, wr.sg_list, wr.num_sge);
+#ifdef _OPENIB_MCM_
+ if (ep_ptr->qp_handle->tp->scif_ep)
+ ret = dapli_mix_post_send(ep_ptr->qp_handle, total_len, &wr, &bad_wr);
+ else
+ ret = ibv_post_send(ep_ptr->qp_handle->sqp, &wr, &bad_wr);
+#else
ret = ibv_post_send(ep_ptr->qp_handle->qp, &wr, &bad_wr);
-
+#endif
if (ret)
return(dapl_convert_errno(errno,"ibv_send"));
ep_ptr->param.local_port_qual = rdma_get_src_port(conn->cm_id);
#else
+
+#ifdef _OPENIB_MCM_
+ /* save resources, 1st QP is receiver, 2nd is sender */
+ if (ia_ptr->hca_ptr->ib_trans.scif_ep) {
+ qp_create.cap.max_inline_data = 0;
+ qp_create.cap.max_send_wr = 1;
+ qp_create.cap.max_send_sge = 1;
+ }
+#endif
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " 1 - QP_ALLOC: QPr sq %d,%d rq %d,%d\n",
+ qp_create.cap.max_send_wr,
+ qp_create.cap.max_send_sge,
+ qp_create.cap.max_recv_wr,
+ qp_create.cap.max_recv_sge);
+
ep_ptr->qp_handle->qp = ibv_create_qp(ib_pd_handle, &qp_create);
if (!ep_ptr->qp_handle->qp) {
ret = errno;
goto err;
}
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " 2 - QP_ALLOC: QPr 0x%x sq %d,%d rq %d,%d\n",
+ ep_ptr->qp_handle->qp->qp_num,
+ qp_create.cap.max_send_wr,
+ qp_create.cap.max_send_sge,
+ qp_create.cap.max_recv_wr,
+ qp_create.cap.max_recv_sge);
#ifdef _OPENIB_MCM_
/* shadow support, MPXYD */
ep_ptr->qp_handle->qp_ctx = (uint64_t)ep_ptr;
ep_ptr->qp_handle->qp_id = 0; /* ??? */
- if (ia_ptr->hca_ptr->ib_trans.scif_ep)
- dapli_mix_qp_create(ep_ptr->qp_handle, &qp_create);
-#endif
+ if (ia_ptr->hca_ptr->ib_trans.scif_ep) { /* MIC: shadow on proxy node */
+ qp_create.cap.max_inline_data = 0; /* setup for bw not latency */
+ qp_create.cap.max_send_wr = attr->max_request_dtos;
+ qp_create.cap.max_send_sge = attr->max_request_iov;
+ qp_create.cap.max_recv_wr = 1;
+ qp_create.cap.max_recv_sge = 1;
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " 3 - QP_ALLOC: QPt (MPXYD) sq %d,%d rq %d,%d\n",
+ qp_create.cap.max_send_wr, qp_create.cap.max_send_sge,
+ qp_create.cap.max_recv_wr, qp_create.cap.max_recv_sge);
+ dapli_mix_qp_create(ep_ptr->qp_handle, &qp_create, req_cq, rcv_cq);
+ } else {
+ /* NON-MIC: need QPt, in case of shadowed QP's from MIC's */
+ qp_create.cap.max_recv_wr = 1;
+ qp_create.cap.max_recv_sge = 1;
+ ep_ptr->qp_handle->sqp = ibv_create_qp(ib_pd_handle, &qp_create);
+ if (!ep_ptr->qp_handle->sqp) {
+ ret = errno;
+ goto err;
+ }
+ if (dapls_modify_qp_state(ep_ptr->qp_handle->sqp,
+ IBV_QPS_INIT, 0, 0, 0) != DAT_SUCCESS) {
+ ibv_destroy_qp(ep_ptr->qp_handle->sqp);
+ ret = errno;
+ goto err;
+ }
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ " 3 - QP_ALLOC: QPt 0x%x sq %d,%d rq %d,%d\n",
+ ep_ptr->qp_handle->sqp->qp_num,
+ qp_create.cap.max_send_wr, qp_create.cap.max_send_sge,
+ qp_create.cap.max_recv_wr, qp_create.cap.max_recv_sge);
+ }
+#endif
/* Setup QP attributes for INIT state on the way out */
if (dapls_modify_qp_state(ep_ptr->qp_handle->qp,
IBV_QPS_INIT, 0, 0, 0) != DAT_SUCCESS) {
#endif
#ifdef _OPENIB_MCM_
- /* shadow support, MPXYD */
+ /* MIC: shadow support on MPXYD node */
if (ia_ptr->hca_ptr->ib_trans.scif_ep)
dapli_mix_qp_free(ep_ptr->qp_handle);
- /* TODO: flush shadow CQ on MPXYD */
+ else /* NON MIC: local shadow queue */
+ ibv_destroy_qp(ep_ptr->qp_handle->sqp);
+ /* TODO: flush shadow CQ on MPXYD */
#endif
- ep_ptr->qp_handle = NULL;
-
} else {
dapl_os_unlock(&ep_ptr->header.lock);
}
-
-
dapl_os_free(ep_ptr->qp_handle, sizeof(struct dcm_ib_qp));
+ ep_ptr->qp_handle = IB_INVALID_HANDLE;
return DAT_SUCCESS;
}
switch (qp_state) {
case IBV_QPS_RTR:
dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " QPS_RTR: type %d qpn 0x%x gid %p (%d) lid 0x%x"
+ " QPS_RTR: type %d l_qpn %x qpn %x lid 0x%x"
" port %d ep %p qp_state %d rd_atomic %d\n",
- qp_handle->qp_type, ntohl(qpn), gid,
- ia_ptr->hca_ptr->ib_trans.global,
- ntohs(lid), ia_ptr->hca_ptr->port_num,
+ qp_handle->qp_type, qp_handle->qp_num,
+ ntohl(qpn), ntohs(lid), ia_ptr->hca_ptr->port_num,
ep_ptr, ep_ptr->qp_state,
ep_ptr->param.ep_attr.max_rdma_read_in);
qp_attr.dest_qp_num = ntohl(qpn);
qp_attr.rq_psn = 1;
qp_attr.path_mtu = ia_ptr->hca_ptr->ib_trans.mtu;
- qp_attr.max_dest_rd_atomic =
- ep_ptr->param.ep_attr.max_rdma_read_in;
- qp_attr.min_rnr_timer =
- ia_ptr->hca_ptr->ib_trans.rnr_timer;
+#ifdef _OPENIB_MCM_
+ qp_attr.max_dest_rd_atomic = 4;
+#else
+ qp_attr.max_dest_rd_atomic = ep_ptr->param.ep_attr.max_rdma_read_in;
+#endif
+ qp_attr.min_rnr_timer = ia_ptr->hca_ptr->ib_trans.rnr_timer;
/* address handle. RC and UD */
qp_attr.ah_attr.dlid = ntohs(lid);
ia_ptr->hca_ptr->ib_trans.ack_retry;
qp_attr.rnr_retry =
ia_ptr->hca_ptr->ib_trans.rnr_retry;
- qp_attr.max_rd_atomic =
- ep_ptr->param.ep_attr.max_rdma_read_out;
+#ifdef _OPENIB_MCM_
+ qp_attr.max_rd_atomic = 4;
+#else
+ qp_attr.max_rd_atomic = ep_ptr->param.ep_attr.max_rdma_read_out;
+#endif
}
/* RC and UD */
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = hca->ib_trans.pkey_idx;
qp_attr.port_num = hca->port_num;
+#ifdef _OPENIB_MCM_
+ qp_attr.qkey = DAT_MCM_UD_QKEY; /* MCM gets different key */
+#else
qp_attr.qkey = DAT_UD_QKEY;
- if (ibv_modify_qp(qp, &qp_attr,
+#endif
+ if (ibv_modify_qp(qp, &qp_attr,
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
return ah;
}
+DAT_RETURN dapls_modify_qp_rtu(struct ibv_qp *qp, uint32_t qpn, uint16_t lid, ib_gid_handle_t gid)
+{
+ DAT_RETURN ret;
+
+ ret = dapls_modify_qp_state(qp, IBV_QPS_RTR, qpn, lid, gid);
+ if (ret != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " MODIFY_QP_RTU: RTR err=%s qpn %x -> lid %x iqp %x\n",
+ strerror(errno), qp->qp_num, ntohs(lid), ntohl(qpn));
+ return ret;
+ }
+
+ ret = dapls_modify_qp_state(qp, IBV_QPS_RTS, qpn, lid, NULL);
+ if (ret != DAT_SUCCESS) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " MODIFY_QP_RTU: RTS err=%s qpn %x -> lid %x iqp %x\n",
+ strerror(errno), qp->qp_num, ntohs(lid), ntohl(qpn));
+ return ret;
+ }
+ return DAT_SUCCESS;
+}
+
/*
* Local variables:
* c-indent-level: 4
#include "dapl_ep_util.h"
#include "dapl_osd.h"
+static char gid_str[INET6_ADDRSTRLEN];
+
enum DAPL_FD_EVENTS {
DAPL_FD_READ = POLLIN,
DAPL_FD_WRITE = POLLOUT,
/* forward declarations */
static int mcm_reply(dp_ib_cm_handle_t cm);
static void mcm_accept(ib_cm_srvc_handle_t cm, dat_mcm_msg_t *msg);
-static void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg);
static void mcm_accept_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg);
static int mcm_send(ib_hca_transport_t *tp, dat_mcm_msg_t *msg, DAT_PVOID p_data, DAT_COUNT p_size);
static void mcm_disconnect_final(dp_ib_cm_handle_t cm);
{
DAPL_OS_TIMEVAL time;
+ if (cm->tp->scif_ep) /* CM timers running on MPXYD */
+ return;
+
dapl_os_lock(&cm->lock);
dapl_os_get_time(&time);
switch (cm->state) {
- case DCM_REP_PENDING:
+ case MCM_REP_PENDING:
*timer = cm->hca->ib_trans.cm_timer;
/* wait longer each retry */
if ((time - cm->timer)/1000 >
return;
}
break;
- case DCM_RTU_PENDING:
+ case MCM_RTU_PENDING:
*timer = cm->hca->ib_trans.cm_timer;
if ((time - cm->timer)/1000 >
(cm->hca->ib_trans.rtu_time << cm->retries)) {
return;
}
break;
- case DCM_DISC_PENDING:
+ case MCM_DISC_PENDING:
*timer = cm->hca->ib_trans.cm_timer;
/* wait longer each retry */
if ((time - cm->timer)/1000 >
/* setup op, rearrange the src, dst cm and addr info */
(void)dapl_os_memzero(&smsg, sizeof(smsg));
- smsg.ver = htons(DCM_VER);
- smsg.op = htons(DCM_REJ_CM);
+ smsg.ver = htons(DAT_MCM_VER);
+ smsg.op = htons(MCM_REJ_CM);
smsg.dport = msg->sport;
smsg.dqpn = msg->sqpn;
smsg.sport = msg->dport;
return (mcm_send(tp, &smsg, NULL, 0));
}
-static void mcm_process_recv(ib_hca_transport_t *tp,
+void mcm_process_recv(ib_hca_transport_t *tp,
dat_mcm_msg_t *msg,
dp_ib_cm_handle_t cm)
{
dapl_os_lock(&cm->lock);
switch (cm->state) {
- case DCM_LISTEN: /* passive */
+ case MCM_LISTEN: /* passive */
dapl_os_unlock(&cm->lock);
mcm_accept(cm, msg);
break;
- case DCM_RTU_PENDING: /* passive */
+ case MCM_RTU_PENDING: /* passive */
dapl_os_unlock(&cm->lock);
mcm_accept_rtu(cm, msg);
break;
- case DCM_REP_PENDING: /* active */
+ case MCM_REP_PENDING: /* active */
dapl_os_unlock(&cm->lock);
mcm_connect_rtu(cm, msg);
break;
- case DCM_CONNECTED: /* active and passive */
+ case MCM_CONNECTED: /* active and passive */
/* DREQ, change state and process */
cm->retries = 2;
- if (ntohs(msg->op) == DCM_DREQ) {
- cm->state = DCM_DISC_RECV;
+ if (ntohs(msg->op) == MCM_DREQ) {
+ cm->state = MCM_DISC_RECV;
dapl_os_unlock(&cm->lock);
dapli_cm_disconnect(cm);
break;
}
/* active: RTU was dropped, resend */
- if (ntohs(msg->op) == DCM_REP) {
+ if (ntohs(msg->op) == MCM_REP) {
dapl_log(DAPL_DBG_TYPE_CM_WARN,
" RESEND RTU: op %s st %s [lid, port, cqp, iqp]:"
" %x %x %x %x -> %x %x %x %x r_pid %x\n",
ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn),
ntohl(cm->msg.d_id));
- cm->msg.op = htons(DCM_RTU);
+ cm->msg.op = htons(MCM_RTU);
mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_RTU_RETRY);
}
dapl_os_unlock(&cm->lock);
break;
- case DCM_DISC_PENDING: /* active and passive */
+ case MCM_DISC_PENDING: /* active and passive */
/* DREQ or DREP, finalize */
dapl_os_unlock(&cm->lock);
mcm_disconnect_final(cm);
break;
- case DCM_DISCONNECTED:
- case DCM_FREE:
+ case MCM_DISCONNECTED:
+ case MCM_FREE:
/* DREQ dropped, resend */
- if (ntohs(msg->op) == DCM_DREQ) {
+ if (ntohs(msg->op) == MCM_DREQ) {
dapl_log(DAPL_DBG_TYPE_CM_WARN,
" RESEND DREP: op %s st %s [lid, port, qpn]:"
" %x %x %x -> %x %x %x\n",
ntohs(msg->daddr.lid),
ntohs(msg->dport),
ntohl(msg->daddr.qpn));
- cm->msg.op = htons(DCM_DREP);
+ cm->msg.op = htons(MCM_DREP);
mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREP_RETRY);
- } else if (ntohs(msg->op) != DCM_DREP){
+ } else if (ntohs(msg->op) != MCM_DREP){
/* DREP ok to ignore, any other print warning */
dapl_log(DAPL_DBG_TYPE_WARN,
" mcm_recv: UNEXPECTED MSG on cm %p"
}
dapl_os_unlock(&cm->lock);
break;
- case DCM_REJECTED:
- if (ntohs(msg->op) == DCM_REJ_USER) {
+ case MCM_REJECTED:
+ if (ntohs(msg->op) == MCM_REJ_USER) {
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_USER_REJ_RX);
dapl_os_unlock(&cm->lock);
break;
DAPL_OS_LOCK *lock;
int listenq = 0;
- /* conn list first, duplicate requests for DCM_REQ */
+ /* conn list first, duplicate requests for MCM_REQ */
list = &tp->list;
lock = &tp->lock;
cm = next;
next = dapl_llist_next_entry(list,
(DAPL_LLIST_ENTRY *)&cm->local_entry);
- if (cm->state == DCM_DESTROY || cm->state == DCM_FREE)
+ if (cm->state == MCM_DESTROY || cm->state == MCM_FREE)
continue;
/* CM sPORT + QPN, match is good enough for listenq */
cm->msg.sport == msg->dport && cm->msg.sqpn == msg->dqpn &&
cm->msg.dport == msg->sport && cm->msg.dqpn == msg->sqpn &&
cm->msg.daddr.lid == msg->saddr.lid) {
- if (ntohs(msg->op) != DCM_REQ) {
+ if (ntohs(msg->op) != MCM_REQ) {
found = cm;
break;
} else {
dapl_os_unlock(lock);
/* no duplicate request on connq, check listenq for new request */
- if (ntohs(msg->op) == DCM_REQ && !listenq && !found) {
+ if (ntohs(msg->op) == MCM_REQ && !listenq && !found) {
listenq = 1;
list = &tp->llist;
lock = &tp->llock;
}
/* not match on listenq for valid request, send reject */
- if (ntohs(msg->op) == DCM_REQ && !found) {
+ if (ntohs(msg->op) == MCM_REQ && !found) {
dapl_log(DAPL_DBG_TYPE_WARN,
" mcm_recv: NO LISTENER for %s %x %x i%x c%x"
" < %x %x %x, sending reject\n",
ntohl(msg->saddr.qpn), ntohl(msg->s_id),
ntohl(msg->d_id));
- if (ntohs(msg->op) == DCM_DREP) {
+ if (ntohs(msg->op) == MCM_DREP) {
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREP_DUP);
}
}
(void*)wc[i].wr_id, wc[i].src_qp);
/* validate CM message, version */
- if (ntohs(msg->ver) < DCM_VER_MIN) {
+ if (ntohs(msg->ver) != DAT_MCM_VER) {
dapl_log(DAPL_DBG_TYPE_WARN,
" mcm_recv: UNKNOWN msg %p, ver %d\n",
msg, msg->ver);
goto bail;
}
- len = (sizeof(*msg) - DCM_MAX_PDATA_SIZE);
+ len = (sizeof(*msg) - DAT_MCM_PDATA_SIZE);
dapl_os_memcpy(smsg, msg, len);
if (p_size) {
smsg->p_size = ntohs(p_size);
wr.wr.ud.ah = tp->ah[dlid];
wr.wr.ud.remote_qpn = ntohl(smsg->dqpn);
- wr.wr.ud.remote_qkey = DAT_UD_QKEY;
+ wr.wr.ud.remote_qkey = DAT_MCM_UD_QKEY;
ret = ibv_post_send(tp->qp, &wr, &bad_wr);
if (ret) {
dapli_cm_dealloc(cm);
}
-dp_ib_cm_handle_t dapls_ib_cm_create(DAPL_EP *ep)
+static dp_ib_cm_handle_t dapls_cm_create(DAPL_HCA *hca, DAPL_EP *ep)
{
dp_ib_cm_handle_t cm;
goto bail;
}
dapls_cm_acquire(cm);
-
- cm->msg.ver = htons(DCM_VER);
+ cm->hca = hca;
+ cm->tp = &hca->ib_trans;
+ cm->msg.ver = htons(DAT_MCM_VER);
cm->msg.s_id = htonl(dapl_os_getpid()); /* process id for src id */
/* ACTIVE: init source address QP info from local EP */
if (ep) {
- DAPL_HCA *hca = ep->header.owner_ia->hca_ptr;
+ if (!hca->ib_trans.scif_ep) { /* CM service local and not on MPXYD */
+
+ cm->msg.sport = htons(mcm_get_port(&hca->ib_trans, 0));
+ if (!cm->msg.sport) {
+ dapl_os_wait_object_destroy(&cm->f_event);
+ dapl_os_wait_object_destroy(&cm->d_event);
+ dapl_os_lock_destroy(&cm->lock);
+ goto bail;
+ }
+ cm->msg.sqpn = htonl(hca->ib_trans.qp->qp_num); /* ucm */
- cm->msg.sport = htons(mcm_get_port(&hca->ib_trans, 0));
- if (!cm->msg.sport) {
- dapl_os_wait_object_destroy(&cm->f_event);
- dapl_os_wait_object_destroy(&cm->d_event);
- dapl_os_lock_destroy(&cm->lock);
- goto bail;
}
- /* link CM object to EP */
- dapl_ep_link_cm(ep, cm);
- cm->hca = hca;
- cm->ep = ep;
-
- /* IB info in network order */
- cm->msg.sqpn = htonl(hca->ib_trans.qp->qp_num); /* ucm */
cm->msg.saddr.qpn = htonl(ep->qp_handle->qp->qp_num); /* ep */
cm->msg.saddr.qp_type = ep->qp_handle->qp->qp_type;
- cm->msg.saddr.lid = hca->ib_trans.addr.lid;
+ cm->msg.saddr.lid = hca->ib_trans.addr.lid;
dapl_os_memcpy(&cm->msg.saddr.gid[0],
&hca->ib_trans.addr.gid, 16);
+
+ /* link CM object to EP */
+ dapl_ep_link_cm(ep, cm);
+ cm->ep = ep;
}
return cm;
bail:
cm->ep, cm->ref_count);
dapl_os_lock(&cm->lock);
- cm->state = DCM_FREE;
+ cm->state = MCM_FREE;
dapls_thread_signal(&cm->hca->ib_trans.signal);
dapl_os_unlock(&cm->lock);
}
/* free from internal workq, wait until EP is last ref */
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_FREE)
- cm->state = DCM_FREE;
+ if (cm->state != MCM_FREE)
+ cm->state = MCM_FREE;
dapl_os_unlock(&cm->lock);
dapls_thread_signal(&cm->hca->ib_trans.signal);
return;
dapl_os_lock(&cm->lock);
- if ((cm->state == DCM_DISCONNECTED) || (cm->state == DCM_FREE)) {
+ if ((cm->state == MCM_DISCONNECTED) || (cm->state == MCM_FREE)) {
dapl_os_unlock(&cm->lock);
return;
}
- cm->state = DCM_DISCONNECTED;
+ cm->state = MCM_DISCONNECTED;
dapl_os_unlock(&cm->lock);
if (cm->sp)
dapl_os_lock(&cm->lock);
switch (cm->state) {
- case DCM_CONNECTED:
+ case MCM_CONNECTED:
/* CONSUMER: move to err state to flush, if not UD */
if (cm->ep->qp_handle->qp->qp_type != IBV_QPT_UD)
dapls_modify_qp_state(cm->ep->qp_handle->qp, IBV_QPS_ERR,0,0,0);
/* send DREQ, event after DREP or DREQ timeout */
- cm->state = DCM_DISC_PENDING;
- cm->msg.op = htons(DCM_DREQ);
+ cm->state = MCM_DISC_PENDING;
+ cm->msg.op = htons(MCM_DREQ);
finalize = 0; /* wait for DREP, wakeup timer after DREQ sent */
wakeup = 1;
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_DREQ_TX);
break;
- case DCM_DISC_PENDING:
+ case MCM_DISC_PENDING:
/* DREQ timeout, resend until retries exhausted */
- cm->msg.op = htons(DCM_DREQ);
+ cm->msg.op = htons(MCM_DREQ);
if (cm->retries >= cm->hca->ib_trans.retries) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CM_DREQ: RETRIES EXHAUSTED:"
}
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREQ_RETRY);
break;
- case DCM_DISC_RECV:
+ case MCM_DISC_RECV:
/* CM_THREAD: move to err state to flush, if not UD */
if (cm->ep->qp_handle->qp->qp_type != IBV_QPT_UD)
dapls_modify_qp_state(cm->ep->qp_handle->qp, IBV_QPS_ERR,0,0,0);
/* DREQ received, send DREP and schedule event, finalize */
- cm->msg.op = htons(DCM_DREP);
+ cm->msg.op = htons(MCM_DREP);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_DREP_TX);
break;
- case DCM_DISCONNECTED:
+ case MCM_DISCONNECTED:
dapl_os_unlock(&cm->lock);
return DAT_SUCCESS;
default:
{
dapl_log(DAPL_DBG_TYPE_EP,
" connect: lid %x i_qpn %x lport %x p_sz=%d -> "
- " lid %x c_qpn %x rport %x\n",
+ " lid %x c_qpn %x rport %x, retries=%d\n",
htons(cm->msg.saddr.lid), htonl(cm->msg.saddr.qpn),
htons(cm->msg.sport), htons(cm->msg.p_size),
htons(cm->msg.daddr.lid), htonl(cm->msg.dqpn),
- htons(cm->msg.dport));
+ htons(cm->msg.dport), cm->tp->retries);
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_INIT && cm->state != DCM_REP_PENDING) {
+ if (cm->state != MCM_INIT && cm->state != MCM_REP_PENDING) {
dapl_os_unlock(&cm->lock);
return DAT_INVALID_STATE;
}
DAT_INVALID_ADDRESS_UNREACHABLE);
}
- cm->state = DCM_REP_PENDING;
- cm->msg.op = htons(DCM_REQ);
+ cm->state = MCM_REP_PENDING;
+ cm->msg.op = htons(MCM_REQ);
dapl_os_get_time(&cm->timer); /* reset reply timer */
- if (mcm_send(&cm->hca->ib_trans, &cm->msg,
- &cm->msg.p_data, ntohs(cm->msg.p_size))) {
- dapl_os_unlock(&cm->lock);
- goto bail;
+
+ if (cm->tp->scif_cm_ep) { /* MIC: CM service on MPXY */
+ if (dapli_mix_cm_req_out(cm, ep->qp_handle))
+ goto bail;
+ } else {
+ if (mcm_send(&cm->hca->ib_trans, &cm->msg,
+ &cm->msg.p_data, ntohs(cm->msg.p_size)))
+ goto bail;
}
dapl_os_unlock(&cm->lock);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)),
return DAT_SUCCESS;
bail:
+ dapl_os_unlock(&cm->lock);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR);
dapl_log(DAPL_DBG_TYPE_WARN,
" connect: snd ERR -> cm_lid %x cm_qpn %x r_psp %x p_sz=%d\n",
/*
* ACTIVE: exchange QP information, called from CR thread
*/
-static void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg)
+void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg)
{
DAPL_EP *ep = cm->ep;
ib_cm_events_t event = IB_CME_CONNECTED;
+ DAT_RETURN ret;
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_REP_PENDING) {
+ if (cm->state != MCM_REP_PENDING) {
dapl_log(DAPL_DBG_TYPE_WARN,
" CONN_RTU: UNEXPECTED state:"
" op %s, st %s <- lid %x sqpn %x sport %x\n",
/* save remote address information to EP and CM */
cm->msg.d_id = msg->s_id;
- dapl_os_memcpy(&ep->remote_ia_address,
- &msg->saddr, sizeof(dat_mcm_addr_t));
- dapl_os_memcpy(&cm->msg.daddr,
- &msg->saddr, sizeof(dat_mcm_addr_t));
+ dapl_os_memcpy(&ep->remote_ia_address, &msg->saddr2, sizeof(dat_mcm_addr_t));
+ dapl_os_memcpy(&cm->msg.daddr2, &msg->saddr2, sizeof(dat_mcm_addr_t));
+ dapl_os_memcpy(&cm->msg.daddr, &msg->saddr, sizeof(dat_mcm_addr_t));
/* validate private data size, and copy if necessary */
if (msg->p_size) {
- if (ntohs(msg->p_size) > DCM_MAX_PDATA_SIZE) {
+ if (ntohs(msg->p_size) > DAT_MCM_PDATA_SIZE) {
dapl_log(DAPL_DBG_TYPE_WARN,
" CONN_RTU: invalid p_size %d:"
- " st %s <- lid %x sqpn %x spsp %x\n",
+ " st %s <- lid %x sqpn %x s2qpn %x spsp %x\n",
ntohs(msg->p_size),
dapl_cm_state_str(cm->state),
ntohs(msg->saddr.lid),
ntohl(msg->saddr.qpn),
+ ntohl(msg->saddr2.qpn),
ntohs(msg->sport));
dapl_os_unlock(&cm->lock);
goto bail;
}
- dapl_os_memcpy(cm->msg.p_data,
- msg->p_data, ntohs(msg->p_size));
+ dapl_os_memcpy(cm->msg.p_data, msg->p_data, ntohs(msg->p_size));
}
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" CONN_RTU: DST lid=%x,"
- " iqp=%x, qp_type=%d, port=%x psize=%d\n",
- ntohs(cm->msg.daddr.lid),
- ntohl(cm->msg.daddr.qpn), cm->msg.daddr.qp_type,
+ " iqp=%x, iqp2=%x qp_type=%d, port=%x psize=%d\n",
+ ntohs(cm->msg.daddr.lid), ntohl(cm->msg.daddr.qpn),
+ ntohl(cm->msg.daddr2.qpn), cm->msg.daddr.qp_type,
ntohs(msg->sport), ntohs(msg->p_size));
- if (ntohs(msg->op) == DCM_REP)
+ if (ntohs(msg->op) == MCM_REP)
event = IB_CME_CONNECTED;
- else if (ntohs(msg->op) == DCM_REJ_USER)
+ else if (ntohs(msg->op) == MCM_REJ_USER)
event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
else {
dapl_log(DAPL_DBG_TYPE_WARN,
" Warning, non-user CR REJECT:"
- " cm %p op %s, st %s dlid %x iqp %x port %x <-"
+ " cm %p op %s, st %s dlid %x iqp %x iqp2 %xport %x <-"
" slid %x iqp %x port %x\n", cm,
- dapl_cm_op_str(ntohs(msg->op)),
- dapl_cm_state_str(cm->state),
- ntohs(msg->daddr.lid), ntohl(msg->daddr.qpn),
- ntohs(msg->dport), ntohs(msg->saddr.lid),
- ntohl(msg->saddr.qpn), ntohs(msg->sport));
+ dapl_cm_op_str(ntohs(msg->op)), dapl_cm_state_str(cm->state),
+ ntohs(msg->daddr.lid), ntohl(msg->daddr.qpn),ntohl(msg->daddr2.qpn),
+ ntohs(msg->dport), ntohs(msg->saddr.lid), ntohl(msg->saddr.qpn),
+ ntohs(msg->sport));
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_REJ_RX);
event = IB_CME_DESTINATION_REJECT;
}
ntohs(msg->dport), ntohs(msg->saddr.lid),
ntohl(msg->saddr.qpn), ntohs(msg->sport));
- cm->state = DCM_REJECTED;
+ cm->state = MCM_REJECTED;
dapl_os_unlock(&cm->lock);
goto bail;
}
dapl_os_unlock(&cm->lock);
- /* rdma_out, initiator, cannot exceed remote rdma_in max */
- if (ntohs(cm->msg.ver) >= 7)
- cm->ep->param.ep_attr.max_rdma_read_out =
- DAPL_MIN(cm->ep->param.ep_attr.max_rdma_read_out,
- cm->msg.rd_in);
-
- /* modify QP to RTR and then to RTS with remote info */
+ /* QP to RTR-RTS with remote QPt (daddr2) info */
dapl_os_lock(&cm->ep->header.lock);
- if (dapls_modify_qp_state(cm->ep->qp_handle->qp,
- IBV_QPS_RTR,
- cm->msg.daddr.qpn,
- cm->msg.daddr.lid,
- (ib_gid_handle_t)cm->msg.daddr.gid) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTR ERR %s <- lid %x iqp %x\n",
- strerror(errno), ntohs(cm->msg.daddr.lid),
- ntohl(cm->msg.daddr.qpn));
+ ret = dapls_modify_qp_rtu(cm->ep->qp_handle->qp,
+ cm->msg.daddr2.qpn,
+ cm->msg.daddr2.lid,
+ (ib_gid_handle_t)cm->msg.daddr2.gid);
+ if (ret != DAT_SUCCESS) {
dapl_os_unlock(&cm->ep->header.lock);
event = IB_CME_LOCAL_FAILURE;
goto bail;
}
- if (dapls_modify_qp_state(cm->ep->qp_handle->qp,
- IBV_QPS_RTS,
- cm->msg.daddr.qpn,
- cm->msg.daddr.lid,
- NULL) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTS ERR %s <- lid %x iqp %x\n",
- strerror(errno), ntohs(cm->msg.daddr.lid),
- ntohl(cm->msg.daddr.qpn));
- dapl_os_unlock(&cm->ep->header.lock);
- event = IB_CME_LOCAL_FAILURE;
- goto bail;
+
+ /* QP to RTR-RTS with remote QPr (daddr2) info */
+ if (!cm->tp->scif_ep) { /* NON-MIC, sQP is local and not on MPXYD */
+ ret = dapls_modify_qp_rtu(
+ cm->ep->qp_handle->sqp,
+ cm->msg.daddr.qpn,
+ cm->msg.daddr.lid,
+ (ib_gid_handle_t)cm->msg.daddr.gid);
+ if (ret != DAT_SUCCESS) {
+ dapl_os_unlock(&cm->ep->header.lock);
+ event = IB_CME_LOCAL_FAILURE;
+ goto bail;
+ }
}
dapl_os_unlock(&cm->ep->header.lock);
/* Send RTU, no private data */
- cm->msg.op = htons(DCM_RTU);
+ cm->msg.op = htons(MCM_RTU);
dapl_os_lock(&cm->lock);
- cm->state = DCM_CONNECTED;
- if (mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0)) {
- dapl_os_unlock(&cm->lock);
- goto bail;
+ cm->state = MCM_CONNECTED;
+ if (cm->tp->scif_ep) { /* MPXYD */
+ dapli_mix_cm_rtu_out(cm);
+ } else {
+ if (mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0)) {
+ dapl_os_unlock(&cm->lock);
+ goto bail;
+ }
}
dapl_os_unlock(&cm->lock);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_RTU_TX);
dp_ib_cm_handle_t acm;
/* Allocate accept CM and setup passive references */
- if ((acm = dapls_ib_cm_create(NULL)) == NULL) {
+ if ((acm = dapls_cm_create(cm->hca, NULL)) == NULL) {
dapl_log(DAPL_DBG_TYPE_WARN, " accept: ERR cm_create\n");
return;
}
/* dest CM info from CR msg, source CM info from listen */
acm->sp = cm->sp;
acm->hca = cm->hca;
+ acm->tp = cm->tp;
acm->msg.op = msg->op;
acm->msg.dport = msg->sport;
acm->msg.dqpn = msg->sqpn;
/* CR saddr is CM daddr info, need EP for local saddr */
dapl_os_memcpy(&acm->msg.daddr, &msg->saddr, sizeof(dat_mcm_addr_t));
+ dapl_os_memcpy(&acm->msg.daddr2, &msg->saddr2, sizeof(dat_mcm_addr_t));
dapl_log(DAPL_DBG_TYPE_CM,
- " accept: DST port=%x lid=%x, iqp=%x, psize=%d\n",
+ " accept: DST port=%x lid=%x, iqp=%x, iqp2=%x, psize=%d\n",
ntohs(acm->msg.dport), ntohs(acm->msg.daddr.lid),
- htonl(acm->msg.daddr.qpn), htons(acm->msg.p_size));
+ htonl(acm->msg.daddr.qpn), htonl(acm->msg.daddr2.qpn), htons(acm->msg.p_size));
/* validate private data size before reading */
- if (ntohs(msg->p_size) > DCM_MAX_PDATA_SIZE) {
+ if (ntohs(msg->p_size) > DAT_MCM_PDATA_SIZE) {
dapl_log(DAPL_DBG_TYPE_WARN, " accept: psize (%d) wrong\n",
ntohs(msg->p_size));
goto bail;
dapl_os_memcpy(acm->msg.p_data,
msg->p_data, ntohs(msg->p_size));
- acm->state = DCM_ACCEPTING;
+ acm->state = MCM_ACCEPTING;
dapli_queue_conn(acm);
/* trigger CR event and return SUCCESS */
static void mcm_accept_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg)
{
dapl_os_lock(&cm->lock);
- if ((ntohs(msg->op) != DCM_RTU) || (cm->state != DCM_RTU_PENDING)) {
+ if ((ntohs(msg->op) != MCM_RTU) || (cm->state != MCM_RTU_PENDING)) {
dapl_log(DAPL_DBG_TYPE_WARN,
" accept_rtu: UNEXPECTED op, state:"
- " op %s, st %s <- lid %x iqp %x sport %x\n",
+ " op %s, st %s <- lid %x iqp %x iqp2 %x sport %x\n",
dapl_cm_op_str(ntohs(msg->op)),
dapl_cm_state_str(cm->state),
ntohs(msg->saddr.lid), ntohl(msg->saddr.qpn),
- ntohs(msg->sport));
+ ntohl(msg->saddr.qpn), ntohs(msg->sport));
dapl_os_unlock(&cm->lock);
goto bail;
}
- cm->state = DCM_CONNECTED;
+ cm->state = MCM_CONNECTED;
dapl_os_unlock(&cm->lock);
/* final data exchange if remote QP state is good to go */
static int mcm_reply(dp_ib_cm_handle_t cm)
{
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_RTU_PENDING) {
+ if (cm->state != MCM_RTU_PENDING) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CM_REPLY: wrong state ep %p cm %p %s refs=%d"
- " %x %x i_%x -> %x %x i_%x l_pid %x r_pid %x\n",
+ " %x %x i_%x i2 %x -> %x %x i_%x i_2 %x l_pid %x r_pid %x\n",
cm->ep, cm, dapl_cm_state_str(cm->state),
cm->ref_count,
- htons(cm->msg.saddr.lid),
- htons(cm->msg.sport),
- htonl(cm->msg.saddr.qpn),
- htons(cm->msg.daddr.lid),
- htons(cm->msg.dport),
- htonl(cm->msg.daddr.qpn),
- ntohl(cm->msg.s_id),
- ntohl(cm->msg.d_id));
+ htons(cm->msg.saddr.lid), htons(cm->msg.sport),
+ htonl(cm->msg.saddr.qpn), htonl(cm->msg.saddr2.qpn),
+ htons(cm->msg.daddr.lid), htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn), htonl(cm->msg.daddr2.qpn),
+ ntohl(cm->msg.s_id), ntohl(cm->msg.d_id));
dapl_os_unlock(&cm->lock);
return -1;
}
if (cm->retries == cm->hca->ib_trans.retries) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CM_REPLY: RETRIES EXHAUSTED (lid port qpn)"
- " %x %x %x -> %x %x %x\n",
- htons(cm->msg.saddr.lid),
- htons(cm->msg.sport),
- htonl(cm->msg.saddr.qpn),
- htons(cm->msg.daddr.lid),
- htons(cm->msg.dport),
- htonl(cm->msg.daddr.qpn));
+ " %x %x %x %x -> %x %x %x %x \n",
+ htons(cm->msg.saddr.lid), htons(cm->msg.sport),
+ htonl(cm->msg.saddr.qpn), htonl(cm->msg.saddr2.qpn),
+ htons(cm->msg.daddr.lid), htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn), htonl(cm->msg.daddr2.qpn));
dapl_os_unlock(&cm->lock);
{
DAPL_IA *ia = ep->header.owner_ia;
dp_ib_cm_handle_t cm = cr->ib_cm_handle;
+ int ret;
- if (p_size > DCM_MAX_PDATA_SIZE)
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " MCM_ACCEPT_USR: ep %p cm %p %s refs=%d"
+ " %x %x i_%x i2_%x <- %x %x i_%x i2_%x l_pid %x r_pid %x\n",
+ cm->ep, cm, dapl_cm_state_str(cm->state),
+ cm->ref_count, htons(cm->hca->ib_trans.addr.lid),
+ htons(cm->msg.sport), ep->qp_handle->qp->qp_num,
+ ep->qp_handle->sqp->qp_num,
+ htons(cm->msg.daddr.lid), htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn), htonl(cm->msg.daddr2.qpn),
+ ntohl(cm->msg.s_id), ntohl(cm->msg.d_id));
+
+ if (p_size > DAT_MCM_PDATA_SIZE)
return DAT_LENGTH_ERROR;
dapl_os_lock(&cm->lock);
- if (cm->state != DCM_ACCEPTING) {
+ if (cm->state != MCM_ACCEPTING) {
dapl_log(DAPL_DBG_TYPE_ERR,
" CM_ACCEPT_USR: wrong state ep %p cm %p %s refs=%d"
- " %x %x i_%x -> %x %x i_%x l_pid %x r_pid %x\n",
+ " %x %x i_%x i2_ %x <- %x %x i_%x i2_%x l_pid %x r_pid %x\n",
cm->ep, cm, dapl_cm_state_str(cm->state),
- cm->ref_count,
- htons(cm->hca->ib_trans.addr.lid),
- htons(cm->msg.sport),
- htonl(ep->qp_handle->qp->qp_num),
- htons(cm->msg.daddr.lid),
- htons(cm->msg.dport),
- htonl(cm->msg.daddr.qpn),
- ntohl(cm->msg.s_id),
- ntohl(cm->msg.d_id));
+ cm->ref_count, htons(cm->hca->ib_trans.addr.lid),
+ htons(cm->msg.sport), ep->qp_handle->qp->qp_num,
+ ep->qp_handle->sqp->qp_num,
+ htons(cm->msg.daddr.lid), htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn), htonl(cm->msg.daddr2.qpn),
+ ntohl(cm->msg.s_id), ntohl(cm->msg.d_id));
dapl_os_unlock(&cm->lock);
return DAT_INVALID_STATE;
}
p_size);
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR: remote GID subnet %016llx id %016llx\n",
- (unsigned long long)htonll(*(uint64_t*)&cm->msg.daddr.gid[0]),
- (unsigned long long)htonll(*(uint64_t*)&cm->msg.daddr.gid[8]));
+ " ACCEPT_USR: remote GID subnet %s\n",
+ inet_ntop(AF_INET6, cm->msg.daddr.gid, gid_str, sizeof(gid_str)));
- /* rdma_out, initiator, cannot exceed remote rdma_in max */
- if (ntohs(cm->msg.ver) >= 7)
- ep->param.ep_attr.max_rdma_read_out =
- DAPL_MIN(ep->param.ep_attr.max_rdma_read_out,
- cm->msg.rd_in);
+ /* rdma_out, initiator, cannot exceed remote rdma_in max */
+ ep->param.ep_attr.max_rdma_read_out =
+ DAPL_MIN(ep->param.ep_attr.max_rdma_read_out, cm->msg.rd_in);
- /* modify QP to RTR and then to RTS with remote info already read */
+ /* modify QPr to RTR and then to RTS, QPr (qp) to remote QPt (daddr2) */
dapl_os_lock(&ep->header.lock);
- if (dapls_modify_qp_state(ep->qp_handle->qp,
- IBV_QPS_RTR,
- cm->msg.daddr.qpn,
- cm->msg.daddr.lid,
- (ib_gid_handle_t)&cm->msg.daddr.gid[0]) != DAT_SUCCESS) {
+ ret = dapls_modify_qp_rtu(ep->qp_handle->qp,
+ cm->msg.daddr2.qpn,
+ cm->msg.daddr2.lid,
+ (ib_gid_handle_t)cm->msg.daddr2.gid);
+ if (ret) {
dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT_USR: QPS_RTR ERR %s -> lid %x qpn %x\n",
strerror(errno), ntohs(cm->msg.daddr.lid),
dapl_os_unlock(&ep->header.lock);
goto bail;
}
- if (dapls_modify_qp_state(ep->qp_handle->qp,
- IBV_QPS_RTS,
- cm->msg.daddr.qpn,
- cm->msg.daddr.lid,
- NULL) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: QPS_RTS ERR %s -> lid %x qpn %x\n",
- strerror(errno), ntohs(cm->msg.daddr.lid),
- ntohl(cm->msg.daddr.qpn));
- dapl_os_unlock(&ep->header.lock);
- goto bail;
+ /* modify QPt to RTR and then to RTS, QPt (sqp) to remote QPr (daddr) */
+ if (!cm->tp->scif_ep) { /* NON-MIC, sQP is local and not on MPXYD */
+ ret = dapls_modify_qp_rtu(ep->qp_handle->sqp,
+ cm->msg.daddr.qpn,
+ cm->msg.daddr.lid,
+ (ib_gid_handle_t)cm->msg.daddr.gid);
+ if (ret) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: QPS_RTS ERR %s -> lid %x qpn %x\n",
+ strerror(errno), ntohs(cm->msg.daddr.lid),
+ ntohl(cm->msg.daddr.qpn));
+ dapl_os_unlock(&ep->header.lock);
+ goto bail;
+ }
+ cm->msg.saddr2.qpn = htonl(ep->qp_handle->sqp->qp_num);
+ cm->msg.saddr2.lid = cm->hca->ib_trans.addr.lid;
+ cm->msg.saddr2.qp_type = ep->qp_handle->qp->qp_type;
+ dapl_os_memcpy(&cm->msg.saddr2.gid[0],
+ &cm->hca->ib_trans.addr.gid, 16);
}
dapl_os_unlock(&ep->header.lock);
- /* save remote address information */
+ /* save remote address information, QPr */
dapl_os_memcpy(&ep->remote_ia_address,
&cm->msg.saddr, sizeof(dat_mcm_addr_t));
- /* setup local QP info and type from EP, copy pdata, for reply */
- cm->msg.op = htons(DCM_REP);
+ /* setup local QPr info and type from EP, copy pdata, for reply */
+ cm->msg.op = htons(MCM_REP);
cm->msg.rd_in = ep->param.ep_attr.max_rdma_read_in;
cm->msg.saddr.qpn = htonl(ep->qp_handle->qp->qp_num);
cm->msg.saddr.qp_type = ep->qp_handle->qp->qp_type;
/* Send RTU and change state under CM lock */
dapl_os_lock(&cm->lock);
- cm->state = DCM_RTU_PENDING;
+ cm->state = MCM_RTU_PENDING;
dapl_os_get_time(&cm->timer); /* RTU expected */
if (mcm_send(&cm->hca->ib_trans, &cm->msg, cm->p_data, cm->p_size)) {
dapl_log(DAPL_DBG_TYPE_ERR," accept ERR: ucm reply send()\n");
IN DAT_COUNT p_size, IN void *p_data)
{
DAPL_EP *ep = (DAPL_EP *)ep_handle;
+ DAPL_HCA *hca = ep->header.owner_ia->hca_ptr;
dp_ib_cm_handle_t cm;
/* create CM object, initialize SRC info from EP */
- cm = dapls_ib_cm_create(ep);
+ cm = dapls_cm_create(hca, ep);
if (cm == NULL)
return DAT_INSUFFICIENT_RESOURCES;
cm->msg.p_size = htons(p_size);
dapl_os_memcpy(&cm->msg.p_data, p_data, p_size);
}
-
- cm->state = DCM_INIT;
+ cm->state = MCM_INIT;
/* link EP and CM, put on work queue */
dapli_queue_conn(cm);
ia, sid, sp);
/* cm_create will setup saddr for listen server */
- if ((cm = dapls_ib_cm_create(NULL)) == NULL)
+ if ((cm = dapls_cm_create(ia->hca_ptr, NULL)) == NULL)
return DAT_INSUFFICIENT_RESOURCES;
/* LISTEN: init DST address and QP info to local CM server info */
}
/* queue up listen socket to process inbound CR's */
- cm->state = DCM_LISTEN;
+ cm->state = MCM_LISTEN;
dapli_queue_listen(cm);
DAPL_CNTR(ia, DCNT_IA_CM_LISTEN);
" reject(cm %p reason %x, pdata %p, psize %d)\n",
cm, reason, pdata, psize);
- if (psize > DCM_MAX_PDATA_SIZE)
+ if (psize > DAT_MCM_PDATA_SIZE)
return DAT_LENGTH_ERROR;
/* cr_thread will destroy CR, update saddr lid, gid, qp_type info */
ntohs(cm->msg.sport), ntohs(cm->msg.daddr.lid),
ntohl(cm->msg.daddr.qpn), ntohs(cm->msg.dport));
- cm->state = DCM_REJECTED;
+ cm->state = MCM_REJECTED;
cm->msg.saddr.lid = cm->hca->ib_trans.addr.lid;
cm->msg.saddr.qp_type = cm->msg.daddr.qp_type;
dapl_os_memcpy(&cm->msg.saddr.gid[0],
&cm->hca->ib_trans.addr.gid, 16);
if (reason == IB_CM_REJ_REASON_CONSUMER_REJ)
- cm->msg.op = htons(DCM_REJ_USER);
+ cm->msg.op = htons(MCM_REJ_USER);
else
- cm->msg.op = htons(DCM_REJ_CM);
+ cm->msg.op = htons(MCM_REJ_CM);
DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)),
reason == IB_CM_REJ_REASON_CONSUMER_REJ ?
int dapls_ib_private_data_size(
IN DAPL_HCA *hca_ptr)
{
- return DCM_MAX_PDATA_SIZE;
+ return DAT_MCM_PDATA_SIZE;
}
void cm_thread(void *arg)
dapl_fd_set(hca->ib_hca_handle->async_fd, set, DAPL_FD_READ);
dapl_fd_set(hca->ib_trans.rch_fd, set, DAPL_FD_READ);
dapl_fd_set(hca->ib_trans.scif_ep, set, DAPL_FD_READ);
+ dapl_fd_set(hca->ib_trans.scif_cm_ep, set, DAPL_FD_READ);
dapl_fd_set(hca->ib_trans.ib_cq->fd, set, DAPL_FD_READ);
if (!dapl_llist_is_empty(&hca->ib_trans.list))
(DAPL_LLIST_ENTRY *)&cm->local_entry);
dapls_cm_acquire(cm); /* hold thread ref */
dapl_os_lock(&cm->lock);
- if (cm->state == DCM_FREE ||
+ if (cm->state == MCM_FREE ||
hca->ib_trans.cm_state != IB_THREAD_RUN) {
dapl_os_unlock(&cm->lock);
dapl_log(DAPL_DBG_TYPE_CM,
DAPL_FD_READ) == DAPL_FD_READ) {
dapli_mix_recv(hca, hca->ib_trans.scif_ep);
}
+ if (dapl_poll(hca->ib_trans.scif_cm_ep,
+ DAPL_FD_READ) == DAPL_FD_READ) {
+ dapli_mix_recv(hca, hca->ib_trans.scif_cm_ep);
+ }
if (dapl_poll(hca->ib_hca_handle->async_fd,
DAPL_FD_READ) == DAPL_FD_READ) {
mcm_async_event(hca);
/* prototypes */
void cm_thread(void *arg);
void mcm_async_event(struct dapl_hca *hca);
+void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg);
void dapli_cq_event_cb(struct _ib_hca_transport *tp);
void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr);
void dapls_cm_release(dp_ib_cm_handle_t cm_ptr);
void dapls_cm_free(dp_ib_cm_handle_t cm_ptr);
+DAT_RETURN dapls_modify_qp_rtu(struct ibv_qp *qp, uint32_t qpn, uint16_t lid, ib_gid_handle_t gid);
/* MIC indirect eXchange (MIX) operations */
int dapli_mix_open(ib_hca_transport_t *tp, char *name, int port);
void dapli_mix_close(ib_hca_transport_t *tp);
int dapli_mix_listen(dp_ib_cm_handle_t cm, uint16_t sid);
int dapli_mix_listen_free(dp_ib_cm_handle_t cm);
-int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr);
+int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr,
+ ib_cq_handle_t req_cq, ib_cq_handle_t rcv_cq);
int dapli_mix_qp_free(ib_qp_handle_t m_qp);
int dapli_mix_cq_create(ib_cq_handle_t m_cq);
int dapli_mix_cq_free(ib_cq_handle_t m_cq);
+int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp);
+int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm);
+int dapli_mix_post_send(ib_qp_handle_t m_qp, int len, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr);
int dapli_mix_recv(DAPL_HCA *hca, int scif_ep);
#include "dapl_osd.h"
#include <stdlib.h>
-
-static void ucm_service_destroy(IN DAPL_HCA *hca);
-static int ucm_service_create(IN DAPL_HCA *hca);
+static void mcm_service_destroy(IN DAPL_HCA *hca);
+static int mcm_service_create(IN DAPL_HCA *hca);
static int32_t create_os_signal(IN DAPL_HCA * hca_ptr)
{
dapl_llist_init_head(&hca_ptr->ib_trans.llist);
/* create uCM qp services */
- if (ucm_service_create(hca_ptr))
+ if (mcm_service_create(hca_ptr))
goto bail;
if (create_os_signal(hca_ptr)) {
return dat_status;
bail:
- ucm_service_destroy(hca_ptr);
+ mcm_service_destroy(hca_ptr);
ibv_close_device(hca_ptr->ib_hca_handle);
hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
dapl_os_lock_destroy(&hca_ptr->ib_trans.llock);
destroy_os_signal(hca_ptr);
- ucm_service_destroy(hca_ptr);
+ mcm_service_destroy(hca_ptr);
if (hca_ptr->ib_trans.ib_cq)
ibv_destroy_comp_channel(hca_ptr->ib_trans.ib_cq);
}
/* Create uCM endpoint services, allocate remote_ah's array */
-static void ucm_service_destroy(IN DAPL_HCA *hca)
+static void mcm_service_destroy(IN DAPL_HCA *hca)
{
ib_hca_transport_t *tp = &hca->ib_trans;
int msg_size = sizeof(ib_cm_msg_t);
dapl_os_free(tp->sbuf, (msg_size * tp->qpe));
}
-static int ucm_service_create(IN DAPL_HCA *hca)
+static int mcm_service_create(IN DAPL_HCA *hca)
{
struct ibv_qp_init_attr qp_create;
ib_hca_transport_t *tp = &hca->ib_trans;
int i, mlen = sizeof(ib_cm_msg_t);
int hlen = sizeof(struct ibv_grh); /* hdr included with UD recv */
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ucm_create: \n");
-
- /* CM service via MPXYD, no need for local IB UD CM service */
- if (tp->scif_ep)
- return 0;
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " mcm_create: \n");
/* setup CM timers and queue sizes */
tp->retries = dapl_os_get_env_val("DAPL_UCM_RETRY", DCM_RETRY_CNT);
tp->qpe = dapl_os_get_env_val("DAPL_UCM_QP_SIZE", DCM_QP_SIZE);
tp->cqe = dapl_os_get_env_val("DAPL_UCM_CQ_SIZE", DCM_CQ_SIZE);
tp->burst = dapl_os_get_env_val("DAPL_UCM_TX_BURST", DCM_TX_BURST);
+
+ /* CM service via MPXYD, no need for local IB UD CM service */
+ if (tp->scif_ep)
+ return 0;
+
tp->pd = ibv_alloc_pd(hca->ib_hca_handle);
if (!tp->pd)
goto bail;
dapl_log(DAPL_DBG_TYPE_UTIL,
- " create_service: pd %p ctx %p handle 0x%x\n",
- tp->pd, tp->pd->context, tp->pd->handle);
+ " create_service: pd %p ctx %p handle 0x%x\n",
+ tp->pd, tp->pd->context, tp->pd->handle);
tp->rch = ibv_create_comp_channel(hca->ib_hca_handle);
if (!tp->rch)
bail:
dapl_log(DAPL_DBG_TYPE_ERR,
" ucm_create_services: ERR %s\n", strerror(errno));
- ucm_service_destroy(hca);
+ mcm_service_destroy(hca);
return -1;
}
switch (event.event_type) {
case IBV_EVENT_CQ_ERR:
{
- struct dapl_ep *evd_ptr =
+ struct dapl_evd *evd_ptr =
event.element.cq->cq_context;
dapl_log(DAPL_DBG_TYPE_ERR,
/* report up if async callback still setup */
if (tp->async_cq_error)
tp->async_cq_error(hca->ib_hca_handle,
- event.element.cq,
+ evd_ptr->ib_cq_handle,
&event, (void *)evd_ptr);
break;
}
}
dapl_log(DAPL_DBG_TYPE_EXTENSION," SCIF node_id: %d\n", (uint16_t)tp->self.node);
-#if 0 /* let run on Xeon for testing */
if (tp->self.node == 0) {
dapl_log(DAPL_DBG_TYPE_EXTENSION," Not running on MIC, no MPXY connect required\n");
tp->scif_ep = 0;
return 0;
}
dapl_log(DAPL_DBG_TYPE_EXTENSION," Running on MIC, MPXY connect required\n");
-#endif
+
/* MPXYD is running on node 0 and well-known OFED port */
tp->peer.node = 0;
tp->peer.port = SCIF_OFED_PORT_8;
/* MIX_QP_CREATE */
-int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr)
+int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr,
+ ib_cq_handle_t req_cq, ib_cq_handle_t rcv_cq)
{
dat_mix_qp_t msg;
scif_epd_t mix_ep = m_qp->tp->scif_ep;
msg.qp_r.state = m_qp->qp->state;
msg.qp_r.max_recv_wr = attr->cap.max_recv_wr;
msg.qp_r.max_recv_sge = attr->cap.max_recv_sge;
- msg.qp_r.rcq_id = attr->recv_cq->handle; /* ??? */
+ msg.qp_r.max_send_wr = attr->cap.max_send_wr;
+ msg.qp_r.max_send_sge = attr->cap.max_send_sge;
+ msg.qp_r.rcq_id = rcv_cq->cq_id;
msg.qp_t.qp_type = attr->qp_type;
msg.qp_t.max_inline_data = attr->cap.max_inline_data;
msg.qp_t.max_send_wr = attr->cap.max_send_wr;
msg.qp_t.max_send_sge = attr->cap.max_send_sge;
- msg.qp_t.scq_id = attr->send_cq->handle; /* ??? */
+ msg.qp_t.max_recv_wr = attr->cap.max_recv_wr;
+ msg.qp_t.max_recv_sge = attr->cap.max_recv_sge;
+ msg.qp_t.scq_id = req_cq->cq_id;
len = sizeof(dat_mix_qp_t);
ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK);
}
dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %d request on SCIF EP\n", msg.hdr.op);
-
/* wait for response */
ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK);
if (ret != len) {
m_qp->sqp_id = msg.qp_t.qp_id;
m_qp->sqp_ctx = msg.qp_t.ctx;
- dapl_log(DAPL_DBG_TYPE_EXTENSION,
- " reply on SCIF EP -> sqp_id 0x%x, ctx %p\n",
+ dapl_log(DAPL_DBG_TYPE_EXTENSION, " MIX_QP_CREATE: reply, sqp_id 0x%x, ctx %p\n",
m_qp->sqp_id, (void*)m_qp->sqp_ctx );
return 0;
msg.hdr.status = 0;
msg.hdr.flags = MIX_OP_REQ;
msg.cq_len = m_cq->ib_cq->cqe;
+ msg.cq_ctx = (uint64_t)m_cq;
+ msg.cq_id = 0;
len = sizeof(dat_mix_cq_t);
ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK);
}
dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %d request on SCIF EP\n", msg.hdr.op);
-
/* wait for response */
ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK);
if (ret != len) {
return 0;
}
+/**** speed path, TODO optimize, optimize, optimize ****/
+int dapli_mix_post_send(ib_qp_handle_t m_qp, int txlen, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr)
+{
+ dat_mix_send_t msg; /* TODO cache-aligned msg pool instead of stack? */
+ scif_epd_t mix_ep = m_qp->tp->scif_ep; /* TODO maybe scif_dto_ep ? */
+ int ret, len, i;
+
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," MIX_SEND, sge=%d len=%d\n", mix_ep);
+
+ /* POST SEND request, send recv for now, optimize later with pre-registered WR memory pool */
+ msg.hdr.ver = DAT_MIX_VER;
+ msg.hdr.op = MIX_SEND;
+ msg.hdr.status = 0;
+ msg.hdr.flags = MIX_OP_REQ;
+ msg.len = txlen;
+ msg.qp_id = m_qp->sqp_id;
+ msg.qp_ctx = m_qp->sqp_ctx;
+ memcpy(&msg.wr, wr, sizeof(*wr));
+
+ if (wr->opcode & IBV_WR_SEND)
+ msg.hdr.flags |= MIX_OP_INLINE;
+ else
+ memcpy(msg.sge, wr->sg_list, sizeof(struct ibv_sge) * wr->num_sge);
+
+ len = sizeof(dat_mix_send_t);
+ ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ dapl_log(1, " ERR: send on %d, ret %d, exp %d\n", mix_ep, ret, len);
+ return -1;
+ }
+ for (i=0; i < wr->num_sge && (msg.hdr.flags & MIX_OP_INLINE); i++) {
+ dapl_log(1, " MSG_SEND sge[%d] addr %p, len %d\n",
+ i, wr->sg_list[i].addr, wr->sg_list[i].length);
+ ret = scif_send(mix_ep, (void*)wr->sg_list[i].addr, wr->sg_list[i].length, SCIF_SEND_BLOCK);
+ if (ret != wr->sg_list[i].length)
+ dapl_log(1, " ERR: send on %d, ret %d, exp %d\n",
+ mix_ep, ret, wr->sg_list[i].length);
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent MIX_SEND on SCIF EP %d, len=%d\n", mix_ep, txlen);
+
+ /* POST SEND response */
+ ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d\n", mix_ep, ret, len);
+ return -1;
+ }
+ if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_SEND ||
+ msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) {
+ dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n",
+ msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status);
+ return -1;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," received MIX_SEND reply on SCIF EP\n");
+
+ return 0;
+}
+
+
+
+
+
+/* MIX CM operations: use CM channel on SCIF */
+
/* MIX_CM_REQ */
-int dapli_mix_connect(dp_ib_cm_handle_t m_cm)
+int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp)
{
dat_mix_cm_t msg;
- scif_epd_t mix_ep = m_cm->tp->scif_ep;
+ scif_epd_t mix_ep = m_cm->tp->scif_cm_ep; /* use cm channel */
int ret, len;
/* request: QP_r local, QP_t shadowed */
msg.hdr.op = MIX_CM_REQ;
msg.hdr.status = 0;
msg.hdr.flags = MIX_OP_REQ;
+ msg.qp_id = m_qp->sqp_id;
+ msg.cm_id = m_cm->cm_id;
msg.cm_ctx = (uint64_t)m_cm;
+ memcpy(&msg.msg, &m_cm->msg, sizeof(dat_mcm_msg_t));
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," -> dport 0x%x, dqpn 0x%x dlid 0x%x\n",
+ ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr.lid) );
-
- len = sizeof(dat_mix_cq_t);
+ len = sizeof(dat_mix_cm_t);
ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK);
if (ret != len) {
dapl_log(1, " ERR: send on %d, ret %d, exp %d\n", mix_ep, ret, len);
}
dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %d request on SCIF EP\n", msg.hdr.op);
-
/* wait for response */
ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK);
if (ret != len) {
return -1;
}
- /* save CQ_t id and ctx, needed for polling */
+ /* CM object linking: MIC to MPXYD */
m_cm->scm_id = msg.cm_id;
m_cm->scm_ctx = msg.cm_ctx;
- dapl_log(DAPL_DBG_TYPE_EXTENSION,
- " reply on SCIF EP -> cm_id 0x%x, ctx %p\n",
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," reply on SCIF EP -> cm_id 0x%x, ctx %p\n",
m_cm->scm_id, (void*)m_cm->scm_ctx );
return 0;
}
-/* MIX recv, messages from MPXYD */
-int dapli_mix_recv(DAPL_HCA *hca, int scif_ep)
+/* MIX_CM_RTU */
+int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm)
{
+ dat_mix_cm_t msg;
+ scif_epd_t mix_ep = m_cm->tp->scif_cm_ep; /* use cm channel */
+ int ret, len;
+ /* connect RTU: QP_r local, QP_t shadowed */
+ msg.hdr.ver = DAT_MIX_VER;
+ msg.hdr.op = MIX_CM_RTU;
+ msg.hdr.status = 0;
+ msg.hdr.flags = MIX_OP_REQ;
+ msg.cm_id = m_cm->scm_id;
+ msg.cm_ctx = (uint64_t)m_cm;
+
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," RTU -> id 0x%x dport 0x%x, dqpn 0x%x dlid 0x%x\n",
+ msg.cm_id, ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr.lid) );
+
+ len = sizeof(dat_mix_cm_t);
+ ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ dapl_log(1, " ERR: send on %d, ret %d, exp %d\n", mix_ep, ret, len);
+ return -1;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %d request on SCIF EP\n", msg.hdr.op);
return 0;
}
+/* locate CM object by context, address of object for now--- TODO change to ID */
+dp_ib_cm_handle_t dapli_mix_get_cm(ib_hca_transport_t *tp, uint64_t cm_ctx)
+{
+ dp_ib_cm_handle_t cm = NULL;
+
+ dapl_os_lock(&tp->lock);
+ if (!dapl_llist_is_empty(&tp->list))
+ cm = dapl_llist_peek_head(&tp->list);
+
+ while (cm) {
+ if (cm == (void*)cm_ctx)
+ break;
+
+ cm = dapl_llist_next_entry(&tp->list, &cm->local_entry);
+ }
+ dapl_os_unlock(&tp->lock);
+
+ return cm;
+}
+
+int dapli_mix_cm_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_event_t *pmsg)
+{
+ int len, ret;
+ dp_ib_cm_handle_t cm;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_cm_event_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d\n", ret, len);
+ return ret;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION,
+ " MIX_CM_EVENT <-: id %d ctx %p event 0x%x\n",
+ pmsg->cm_id, pmsg->cm_ctx, pmsg->event);
+
+ /* Find the CM and EP for event processing */
+ cm = dapli_mix_get_cm(tp, pmsg->cm_ctx);
+ if (!cm) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx);
+ return -1;
+ }
+
+ switch (pmsg->event) {
+ case DAT_CONNECTION_EVENT_TIMED_OUT:
+ if (cm->sp)
+ dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE, NULL, 0, cm->sp);
+ else
+ dapl_evd_connection_callback(cm, IB_CME_DESTINATION_UNREACHABLE, NULL, 0, cm->ep);
+
+ break;
+
+ case DAT_CONNECTION_EVENT_ESTABLISHED:
+ case DAT_CONNECTION_REQUEST_EVENT:
+ case DAT_DTO_COMPLETION_EVENT:
+ case DAT_CONNECTION_EVENT_PEER_REJECTED:
+ case DAT_CONNECTION_EVENT_NON_PEER_REJECTED:
+ case DAT_CONNECTION_EVENT_ACCEPT_COMPLETION_ERROR:
+ case DAT_CONNECTION_EVENT_DISCONNECTED:
+ case DAT_CONNECTION_EVENT_BROKEN:
+ case DAT_CONNECTION_EVENT_UNREACHABLE:
+
+ default:
+ break;
+ }
+
+
+ return 0;
+}
+
+int dapli_mix_dto_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_dto_event_t *pmsg)
+{
+ int len, ret;
+ struct dcm_ib_cq *m_cq;
+ DAT_EVENT *event;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_dto_event_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d\n", ret, len);
+ return ret;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION,
+ " MIX_DTO_EVENT <-: id %d ctx %p \n", pmsg->cq_id, pmsg->cq_ctx);
+
+ /* Get cq and post DTO event with this WC entry */
+ m_cq = pmsg->cq_ctx;
+
+ dapls_evd_cqe_to_event(m_cq->evd, &pmsg->wc);
+
+ return 0;
+}
+
+int dapli_mix_cm_rep_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t *pmsg)
+{
+ int len, ret;
+ dp_ib_cm_handle_t cm;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d\n", ret, len);
+ return ret;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION,
+ " MIX_CM_REP_IN <-: id %d ctx %p \n", pmsg->cm_id, pmsg->cm_ctx);
+
+ /* Find the CM and EP for event processing */
+ cm = dapli_mix_get_cm(tp, pmsg->cm_ctx);
+ if (!cm) {
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx);
+ return -1;
+ }
+
+ mcm_connect_rtu(cm, &pmsg->msg);
+
+ return 0;
+}
+
+/*
+ * MIX recv, unsolicited messages from MPXYD, operations and CM messages
+ * scif_ep will be set accordingly
+ *
+ */
+int dapli_mix_recv(DAPL_HCA *hca, int scif_ep)
+{
+ char cmd[DAT_MIX_MSG_MAX];
+ dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd;
+ ib_hca_transport_t *tp = &hca->ib_trans;
+ int ret, len;
+
+ len = sizeof(dat_mix_hdr_t);
+ ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK);
+ if ((ret != len) || (phdr->ver != DAT_MIX_VER) || phdr->flags != MIX_OP_REQ) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d flgs=%d\n",
+ scif_ep, ret, len, phdr->ver, phdr->flags);
+ return -1;
+ }
+ dapl_log(DAPL_DBG_TYPE_EXTENSION, " ver %d, op %d, flags %d\n", phdr->ver, phdr->op, phdr->flags);
+
+ switch (phdr->op) {
+ case MIX_DTO_EVENT:
+ ret = dapli_mix_dto_event_in(tp, scif_ep, (dat_mix_dto_event_t*)phdr);
+ break;
+ case MIX_CM_EVENT:
+ ret = dapli_mix_cm_event_in(tp, scif_ep, (dat_mix_cm_event_t*)phdr);
+ break;
+
+ case MIX_CM_REQ:
+ case MIX_CM_REP:
+ ret = dapli_mix_cm_rep_in(tp, scif_ep, (dat_mix_cm_t*)phdr);
+ break;
+
+ case MIX_CM_ACCEPT:
+ case MIX_CM_REJECT:
+ case MIX_CM_RTU:
+ case MIX_CM_EST:
+ case MIX_CM_DISC:
+ case MIX_CM_DREP:
+ break;
+ default:
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERROR!!! unknown MIX operation: %d\n", phdr->op);
+ return -1;
+ }
+
+ return ret;
+}
+
#include <getopt.h>
#include <fcntl.h>
#include <scif.h>
+#include <arpa/inet.h>
#include <infiniband/verbs.h>
#include "dat2/udat.h"
#include "dat2/dat_mic_extensions.h"
static short scif_sport = SCIF_OFED_PORT_8;
static scif_epd_t scif_listen_ep;
static struct scif_portID scif_id;
+static char gid_str[INET6_ADDRSTRLEN];
/* scif-rdma cmd and data channel parameters */
static int mix_buffer_mb = 4;
static int mcm_depth = 500;
static int mcm_size = 256;
static int mcm_signal = 100;
-static int mcm_retry = 10;
+static int mcm_retry = 3;
static int mcm_rep_ms = 800;
static int mcm_rtu_ms = 400;
static LLIST_ENTRY mcm_list;
static pthread_mutex_t mcm_llock;
-typedef enum mcm_state
-{
- MCM_INIT,
- MCM_LISTEN,
- MCM_CONN_PENDING,
- MCM_REP_PENDING,
- MCM_ACCEPTING,
- MCM_ACCEPTING_DATA,
- MCM_ACCEPTED,
- MCM_REJECTING,
- MCM_REJECTED,
- MCM_CONNECTED,
- MCM_RELEASE,
- MCM_DISC_PENDING,
- MCM_DISCONNECTED,
- MCM_DESTROY,
- MCM_RTU_PENDING,
- MCM_DISC_RECV,
- MCM_FREE,
-
-} MCM_STATE;
-
/* Support for IB devices - One service per device: UD QP for fabric CM services */
typedef struct mcm_ib_dev {
LLIST_ENTRY entry;
LLIST_ENTRY smd_list; /* MIC client open instances */
pthread_mutex_t slock; /* SCIF client device lock */
pthread_mutex_t plock; /* port space lock */
+ pthread_mutex_t txlock; /* MCM UD tx lock */
/* MCM - IB Device Resources */
struct ibv_device *ibdev;
struct ibv_context *ibctx;
struct ibv_cq *ib_cq;
struct ibv_comp_channel *ib_ch;
uint32_t cq_len;
+ uint32_t cq_id; /* MIC client */
+ uint64_t cq_ctx; /* MIC client */
} mcm_cq_t;
struct mcm_scif_dev *smd; /* mcm_scif_dev parent reference */
struct mcm_cm *l_ep; /* listen reference, passive */
uint16_t sid; /* service ID for endpoint */
+ uint32_t cm_id; /* id of client, QPr */
+ uint64_t cm_ctx; /* ctx of client, QPr */
uint64_t timer;
int ref_count;
int state;
int retries;
- struct ibv_comp_channel *ib_ch;
- struct ibv_pd *pd;
- struct ibv_cq *scq;
- struct ibv_cq *rcq;
struct mcm_qp *m_qp; /* pair of QP's, qp_t and qp_r */
uint16_t p_size; /* accept p_data, for retries */
uint8_t p_data[DAT_MCM_PDATA_SIZE];
} mcm_scif_dev_t;
+/* forward prototypes */
+static int mcm_cm_req_out(mcm_cm_t *m_cm);
+static int mcm_cm_rtu_out(mcm_cm_t *m_cm);
+
+static inline uint64_t mcm_time_us(void)
+{
+ struct timeval curtime;
+ timerclear(&curtime);
+ gettimeofday(&curtime, NULL);
+ return (uint64_t) curtime.tv_sec * 1000000 + (uint64_t) curtime.tv_usec;
+}
+#define mcm_time_ms() (mcm_time_us() / 1000)
+
#define mlog(level, format, ...) \
mpxy_write(level, "%s: "format, __func__, ## __VA_ARGS__)
return p_port[port];
}
-/* operation, state strings */
-static char * mcm_op_str(IN int op)
-{
- static char *ops[] = {
- "INVALID",
- "REQ",
- "REP",
- "REJ_USER",
- "REJ_CM",
- "RTU",
- "DREQ",
- "DREP",
- };
- return ((op < 1 || op > 7) ? "Invalid OP?" : ops[op]);
-}
-
-static char * mcm_state_str(IN int st)
-{
- static char *state[] = {
- "CM_INIT",
- "CM_LISTEN",
- "CM_CONN_PENDING",
- "CM_REP_PENDING",
- "CM_ACCEPTING",
- "CM_ACCEPTING_DATA",
- "CM_ACCEPTED",
- "CM_REJECTING",
- "CM_REJECTED",
- "CM_CONNECTED",
- "CM_RELEASE",
- "CM_DISC_PENDING",
- "CM_DISCONNECTED",
- "CM_DESTROY",
- "CM_RTU_PENDING",
- "CM_DISC_RECV",
- "CM_FREE"
- };
- return ((st < 0 || st > 16) ? "Invalid CM state?" : state[st]);
-}
+
static FILE *mpxy_open_log(void)
qp_attr.ah_attr.src_path_bits = 0;
qp_attr.ah_attr.port_num = md->port;
- mlog(2, "create_ah: port %x lid %x pd %p ctx %p handle 0x%x\n",
+ mlog(1, "create_ah: port %x lid %x pd %p ctx %p handle 0x%x\n",
md->port, qp_attr.ah_attr.dlid, pd, pd->context, pd->handle);
/* UD: create AH for remote side */
return NULL;
}
- mlog(2, "create_ah: AH %p for lid %x\n", ah, qp_attr.ah_attr.dlid);
+ mlog(1, "create_ah: AH %p for lid %x\n", ah, qp_attr.ah_attr.dlid);
return ah;
}
switch (qp_state) {
case IBV_QPS_RTR:
- mlog(1, " QPS_RTR: type %d qpn 0x%x gid %p (%d) lid 0x%x"
+ mlog(1, " QPS_RTR: l_qpn %x type %d qpn 0x%x gid %p (%d) lid 0x%x"
" port %d ep %p qp_state %d \n",
- qp_handle->qp_type, ntohl(qpn), gid,
+ qp_handle->qp_num, qp_handle->qp_type,
+ ntohl(qpn), gid,
m_qp->smd->md->dev_attr.global,
ntohs(lid), m_qp->smd->md->port,
m_qp, m_qp->qp_t.cur_state);
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
- IBV_QP_MIN_RNR_TIMER;
+ IBV_QP_MIN_RNR_TIMER |
+ IBV_QP_MAX_DEST_RD_ATOMIC;
qp_attr.dest_qp_num = ntohl(qpn);
qp_attr.rq_psn = 1;
qp_attr.path_mtu = m_qp->smd->md->dev_attr.mtu;
- qp_attr.max_dest_rd_atomic = 0;
+ qp_attr.max_dest_rd_atomic = 4;
qp_attr.min_rnr_timer = m_qp->smd->md->dev_attr.rnr_timer;
-
- /* address handle. RC and UD */
qp_attr.ah_attr.dlid = ntohs(lid);
qp_attr.ah_attr.sl = m_qp->smd->md->dev_attr.sl;
qp_attr.ah_attr.src_path_bits = 0;
mask |= IBV_QP_SQ_PSN |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
- IBV_QP_RNR_RETRY;
+ IBV_QP_RNR_RETRY |
+ IBV_QP_MAX_QP_RD_ATOMIC;
qp_attr.sq_psn = 1;
qp_attr.timeout = m_qp->smd->md->dev_attr.ack_timer;
qp_attr.retry_cnt = m_qp->smd->md->dev_attr.ack_retry;
qp_attr.rnr_retry = m_qp->smd->md->dev_attr.rnr_retry;
+ qp_attr.max_rd_atomic = 4;
mlog(1, " QPS_RTS: psn %x rd_atomic %d ack %d "
- " retry %d rnr_retry %d m_qp %p qp_state %d\n",
+ " retry %d rnr_retry %d qpn %x qp_state %d\n",
qp_attr.sq_psn, qp_attr.max_rd_atomic,
qp_attr.timeout, qp_attr.retry_cnt,
- qp_attr.rnr_retry, m_qp, m_qp->qp_t.cur_state);
+ qp_attr.rnr_retry, m_qp->ib_qp->qp_num,
+ m_qp->qp_t.cur_state);
break;
case IBV_QPS_INIT:
qp_attr.qp_access_flags =
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
- IBV_ACCESS_REMOTE_READ;
+ IBV_ACCESS_REMOTE_READ |
+ IBV_ACCESS_REMOTE_ATOMIC |
+ IBV_ACCESS_MW_BIND;
qp_attr.pkey_index = m_qp->smd->md->dev_attr.pkey_idx;
qp_attr.port_num = m_qp->smd->md->port;
+ qp_attr.qkey = 0;
mlog(1, " QPS_INIT: pi %x port %x acc %x qkey 0x%x\n",
qp_attr.pkey_index, qp_attr.port_num,
m_qp->qp_t.cur_state = m_qp->qp_t.state = qp_state;
return 0;
} else {
- mlog(0, " RTR ERR (%s): type %d qpn 0x%x lid 0x%x"
+ mlog(0, " ERR (%s): l_qpn %x type %d qpn 0x%x lid 0x%x"
" port %d state %d mtu %d rd %d rnr %d sl %d\n",
- strerror(ret), qp_handle->qp_type, ntohl(qpn),
- ntohs(lid), m_qp->smd->md->port,
- m_qp->qp_t.cur_state,
- qp_attr.path_mtu, qp_attr.max_dest_rd_atomic,
- qp_attr.min_rnr_timer, qp_attr.ah_attr.sl);
+ strerror(ret), qp_handle->qp_num, qp_handle->qp_type,
+ ntohl(qpn), ntohs(lid), m_qp->smd->md->port,
+ m_qp->qp_t.cur_state, qp_attr.path_mtu,
+ qp_attr.max_dest_rd_atomic, qp_attr.min_rnr_timer,
+ qp_attr.ah_attr.sl);
}
return ret;
}
if (pthread_mutex_init(&cm->lock, NULL))
goto bail;
+ cm->state = MCM_INIT;
cm->smd = smd;
+ cm->md = smd->md;
cm->msg.ver = htons(DAT_MCM_VER);
- cm->msg.sqpn = htonl(smd->md->qp->qp_num); /* ucm */
+ cm->msg.sqpn = smd->md->addr.qpn; /* ucm, in network order */
/* ACTIVE: init source address QP info from MPXYD and MIC client */
if (m_qp) {
}
cm->m_qp = m_qp;
- /* MPXYD src IB info in network order, QP snd */
- cm->msg.saddr.qpn = htonl(cm->m_qp->qp_t.qp_num); /* ep */
- cm->msg.saddr.qp_type = cm->m_qp->qp_t.qp_type;
- cm->msg.saddr.lid = smd->md->addr.lid;
+ /* MPXYD src IB info in network order, QPs = saddr2 */
+ cm->msg.saddr2.qpn = htonl(m_qp->ib_qp->qp_num); /* ep */
+ cm->msg.saddr2.qp_type = m_qp->qp_t.qp_type;
+ cm->msg.saddr2.lid = smd->md->addr.lid;
memcpy(&cm->msg.saddr.gid[0], &smd->md->addr.gid, 16);
- /* MIC src IB info in network order, QP rcv */
- cm->msg.saddr2.qpn = htonl(cm->m_qp->qp_r.qp_num); /* ep */
- cm->msg.saddr2.qp_type = cm->m_qp->qp_r.qp_type;
- cm->msg.saddr2.lid = smd->md->addr.lid;
- memcpy(&cm->msg.saddr2.gid[0], &smd->md->addr.gid, 16);
+ /* MIC src IB info in network order, QPr = saddr */
+ cm->msg.saddr.qpn = htonl(cm->m_qp->qp_r.qp_num); /* ep */
+ cm->msg.saddr.qp_type = cm->m_qp->qp_r.qp_type;
+ cm->msg.saddr.lid = smd->md->addr.lid;
+ memcpy(&cm->msg.saddr.gid[0], &smd->md->addr.gid, 16);
+ mlog(1, " SRC: QPt qpn 0x%x, lid 0x%x, QPr qpn 0x%x,"
+ " lid 0x%x UCM: qpn 0x%x port=0x%x\n",
+ cm->m_qp->qp_t.qp_num, ntohs(cm->msg.saddr2.lid),
+ cm->m_qp->qp_r.qp_num, ntohs(cm->msg.saddr.lid),
+ ntohl(cm->msg.sqpn), ntohs(cm->msg.sport));
}
return cm;
bail:
pthread_mutex_unlock(&smd->llock);
}
+
/*
* Open IB device
*/
static int init_mcm_service(mcm_ib_dev_t *md)
{
struct ibv_qp_init_attr qp_create;
+ struct ibv_port_attr port_attr;
struct ibv_recv_wr recv_wr, *recv_err;
struct ibv_sge sge;
int i, mlen = 256; /* overhead for mcm_msg & ibv_grh */
md->cqe = mcm_depth;
md->signal = mcm_signal;
+ /* Save addr information */
+ /* get lid for this hca-port, convert to network order */
+ if (ibv_query_port(md->ibctx, md->port, &port_attr)) {
+ mlog(0, " get lid ERR for %s, err=%s\n",
+ ibv_get_device_name(md->ibdev),
+ strerror(errno));
+ goto bail;
+ } else
+ md->lid = md->addr.lid = htons(port_attr.lid);
+
+ /* get gid for this hca-port, in network order */
+ if (ibv_query_gid(md->ibctx, md->port, 0, (union ibv_gid *)&md->addr.gid)) {
+ mlog(1, " query GID ERR for %s, err=%s\n",
+ ibv_get_device_name(md->ibdev),
+ strerror(errno));
+ goto bail;
+ }
+
+ mlog(1, " IB LID 0x%x GID %s\n", ntohs(md->addr.lid),
+ inet_ntop(AF_INET6, md->addr.gid, gid_str, sizeof(gid_str)));
+
/* setup CM timers and queue sizes */
md->pd = ibv_alloc_pd(md->ibctx);
if (!md->pd)
if (!md->qp)
goto bail;
- mlog(1, " created QP\n");
+ mlog(1, " created QP, qp_num = 0x%x\n", md->qp->qp_num);
+
+ /* local addr info in network order */
+ md->addr.port = htons(md->port);
+ md->addr.qpn = htonl(md->qp->qp_num);
+ md->addr.qp_type = md->qp->qp_type;
md->ah = (struct ibv_ah **) malloc(sizeof(struct ibv_ah *) * 0xffff);
md->ports = (uint64_t*) malloc(sizeof(uint64_t) * 0xffff);
init_list(&md->smd_list);
pthread_mutex_init(&md->slock, NULL);
pthread_mutex_init(&md->plock, NULL);
+ pthread_mutex_init(&md->txlock, NULL);
strcpy(md->name, name);
md->port = port;
md->ibctx = open_ib_device(name, port);
return qp;
}
+/* locate CM object */
+mcm_cm_t *mix_get_cm(mcm_scif_dev_t *smd, uint32_t tid)
+{
+ mcm_cm_t *cm = NULL;
+
+ pthread_mutex_lock(&smd->clock);
+ cm = get_head_entry(&smd->clist);
+ while (cm) {
+ if (cm->entry.tid == tid)
+ break;
+ cm = get_next_entry(&cm->entry, &smd->clist);
+ }
+ pthread_mutex_unlock(&smd->clock);
+ return cm;
+}
+
/* destroy proxy CQ, fits in header */
static int mix_cq_destroy(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg)
{
memset(m_cq, 0, sizeof(mcm_cq_t));
init_list(&m_cq->entry);
m_cq->smd = smd;
+ m_cq->cq_ctx = pmsg->cq_ctx;
+ m_cq->cq_id = pmsg->cq_id;
m_cq->ib_ch = ibv_create_comp_channel(smd->md->ibctx);
if (!m_cq->ib_ch)
pthread_mutex_lock(&smd->cqlock);
insert_tail(&m_cq->entry, &smd->cqlist, m_cq);
pmsg->cq_id = m_cq->entry.tid;
+ pmsg->cq_ctx = (uint64_t)m_cq;
pthread_mutex_unlock(&smd->cqlock);
- mlog(1, " new cq_id %d\n", pmsg->cq_id);
+ mlog(1, " new cq_id %d, ctx=%p\n", pmsg->cq_id, pmsg->cq_ctx);
pmsg->hdr.status = MIX_SUCCESS;
goto resp;
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
- mlog(1, " MIX_QP_CREATE: QP_r - qpn 0x%x, id 0x%x, s_q %d,%d r_q %d,%d inline=%d cq_id %d\n",
- pmsg->qp_r.qp_num, pmsg->qp_r.qp_id, pmsg->qp_t.max_send_wr,
- pmsg->qp_t.max_send_sge, pmsg->qp_r.max_recv_wr, pmsg->qp_r.max_recv_sge,
- pmsg->qp_r.max_inline_data, pmsg->qp_t.scq_id);
+ mlog(1, " QP_r - qpn 0x%x, ctx %p, rq %d,%d sq %d,%d rcq_id %d\n",
+ pmsg->qp_r.qp_num, pmsg->qp_r.ctx, pmsg->qp_r.max_recv_wr,
+ pmsg->qp_r.max_recv_sge, pmsg->qp_r.max_send_wr,
+ pmsg->qp_r.max_send_sge, pmsg->qp_r.rcq_id);
- /* Create QP object */
+ /* Create QP object, save QPr info from MIC client */
m_qp = malloc(sizeof(mcm_qp_t));
if (!m_qp)
goto err;
init_list(&m_qp->entry);
m_qp->smd = smd;
memcpy(&m_qp->qp_r, &pmsg->qp_r, sizeof(dat_mix_qp_attr_t));
- memcpy(&m_qp->qp_t, &pmsg->qp_t, sizeof(dat_mix_qp_attr_t));
/* Find the CQ's for this QP for transmitting */
m_cq = mix_get_cq(smd, pmsg->qp_t.scq_id);
goto err;
}
+ mlog(1, " QP_t - wr %d sge %d inline %d\n",
+ pmsg->qp_t.max_send_wr, pmsg->qp_t.max_send_sge,
+ pmsg->qp_t.max_inline_data);
+
/* Setup attributes and create qp, for TX services */
memset((void *)&qp_create, 0, sizeof(qp_create));
qp_create.recv_cq = m_cq->ib_cq;
qp_create.cap.max_recv_wr = 1;
- qp_create.cap.max_recv_sge = 0;
+ qp_create.cap.max_recv_sge = 1;
qp_create.send_cq = m_cq->ib_cq;
qp_create.cap.max_send_wr = pmsg->qp_t.max_send_wr;
qp_create.cap.max_send_sge = pmsg->qp_t.max_send_sge;
goto err;
}
- /* insert on qp list, update proxy qp object tid */
+ /* init QPt with ib qp info */
+ m_qp->qp_t.ctx = (uint64_t)m_qp;
+ m_qp->qp_t.qp_type = m_qp->ib_qp->qp_type;
+ m_qp->qp_t.qp_num = m_qp->ib_qp->qp_num;
+ m_qp->qp_t.state = m_qp->ib_qp->state;
+ m_qp->qp_t.max_recv_wr = qp_create.cap.max_recv_wr;
+ m_qp->qp_t.max_recv_sge = qp_create.cap.max_recv_sge;
+ m_qp->qp_t.max_send_wr = qp_create.cap.max_send_wr;
+ m_qp->qp_t.max_send_sge = qp_create.cap.max_send_sge;
+ m_qp->qp_t.max_inline_data = qp_create.cap.max_inline_data;
+
+ /* return QPt info to MIC client, insert on QP list */
+ memcpy(&pmsg->qp_t, &m_qp->qp_t, sizeof(dat_mix_qp_attr_t));
+
pthread_mutex_lock(&smd->qplock);
insert_tail(&m_qp->entry, &smd->qplist, m_qp);
- pmsg->qp_t.qp_id = m_qp->entry.tid;
- pmsg->qp_t.ctx = (uint64_t)m_qp;
pthread_mutex_unlock(&smd->qplock);
+ /* id set with list insertion */
+ pmsg->qp_t.qp_id = m_qp->qp_t.qp_id = m_qp->entry.tid;
pmsg->hdr.status = MIX_SUCCESS;
+
+ mlog(1, " QP_t - qpn %x, q_id %d, sq %d,%d rq %d,%d scq_id %d\n",
+ pmsg->qp_t.qp_num, m_qp->entry.tid, pmsg->qp_t.max_send_wr,
+ pmsg->qp_t.max_send_sge, pmsg->qp_t.max_recv_wr,
+ pmsg->qp_t.max_recv_sge, m_cq->entry.tid);
+
goto resp;
err:
mlog(0, " ERR: %s\n", strerror(errno));
mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", smd->scif_ep, ret, len);
return ret;
}
- mlog(0, " MIX_QP_CREATE: QP_t - qpn 0x%x id 0x%x, ctx %p \n", m_qp->ib_qp->qp_num, pmsg->qp_t.qp_id, m_qp);
+ return 0;
+}
+
+static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc)
+{
+ dat_mix_dto_event_t msg;
+ int ret, len, i;
+
+ /* send DTO events to MIC client */
+ msg.hdr.ver = DAT_MIX_VER;
+ msg.hdr.op = MIX_DTO_EVENT;
+ msg.hdr.flags = MIX_OP_REQ;
+ msg.cq_id = m_cq->cq_id;
+ msg.cq_ctx = m_cq->cq_ctx;
+
+ for (i=0; i < nc; i++) {
+ memcpy(&msg.wc, &wc[i], sizeof(*wc));
+ len = sizeof(dat_mix_dto_event_t);
+ ret = scif_send(m_cq->smd->scif_ep, &msg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n",
+ m_cq->smd->scif_ep, ret, len);
+ return;
+ }
+ mlog(0, " MIX_DTO_EVENT: cq %p id %d ctx %p stat %d op %d ln %d wr_id %p\n",
+ m_cq, msg.cq_id, msg.cq_ctx, msg.wc.status, msg.wc.opcode,
+ msg.wc.byte_len, msg.wc.wr_id);
+ }
+}
+
+static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event)
+{
+ dat_mix_cm_event_t msg;
+ int ret, len;
+
+ /* send event to MIC client */
+ msg.hdr.ver = DAT_MIX_VER;
+ msg.hdr.op = MIX_CM_EVENT;
+ msg.hdr.flags = MIX_OP_REQ;
+ msg.cm_id = m_cm->cm_id;
+ msg.cm_ctx = m_cm->cm_ctx;
+ msg.event = event;
+
+ len = sizeof(dat_mix_cm_event_t);
+ ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume small msgs are sent without waiting on receiver */
+ if (ret != len) {
+ mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len);
+ return -1;
+ }
+ mlog(0, " MIX_CM_EVENT: cm %p cm_id %d, ctx %p, event 0x%x\n", m_cm, msg.cm_id, msg.cm_ctx, event);
+ return 0;
+}
+
+/* New connection request, create CM object */
+static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+{
+ int len, ret;
+ struct mcm_qp *m_qp;
+ struct mcm_cm *m_cm;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(smd->scif_cm_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: ret %d, exp %d\n", ret, len);
+ return ret;
+ }
+ mlog(1, " MIX_CM_REQ_OUT: cm_id %d, cm_ctx %p, qp_id %d \n",
+ pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id);
+
+ /* Find the QP for linking */
+ m_qp = mix_get_qp(smd, pmsg->qp_id);
+ if (!m_qp) {
+ mlog(0, " ERR: mcm_get_qp, id %d, not found\n", pmsg->qp_id);
+ goto err;
+ }
+
+ /* Create CM object, init saddr info for QPt and QPr */
+ m_cm = mcm_cm_create(smd, m_qp);
+ if (!m_cm)
+ goto err;
+
+ /* MIC client CM id, ctx, daddr = remote CM QP,
+ * saddr = IB QPr from MIC, setup with cm_create
+ * saddr2 = IB QPt from MPXYD, setup with cm_create
+ */
+ m_cm->cm_id = pmsg->cm_id;
+ m_cm->cm_ctx = pmsg->cm_ctx;
+ m_cm->msg.dqpn = pmsg->msg.dqpn;
+ m_cm->msg.dport = pmsg->msg.dport;
+ m_cm->msg.p_size = pmsg->msg.p_size;
+ if (m_cm->msg.p_size)
+ memcpy(m_cm->msg.p_data, pmsg->msg.p_data, htons(m_cm->msg.p_size));
+ memcpy(&m_cm->msg.daddr, &pmsg->msg.daddr, sizeof(dat_mcm_addr_t));
+
+ mlog(1," QPs 0x%x QPr 0x%x -> dport 0x%x, dqpn 0x%x dlid 0x%x psize %d\n",
+ m_cm->m_qp->qp_t.qp_num, m_cm->m_qp->qp_r.qp_num,
+ ntohs(m_cm->msg.dport), ntohl(m_cm->msg.dqpn),
+ ntohs(m_cm->msg.daddr.lid), ntohs(m_cm->msg.p_size));
+
+ /* send request on wire */
+ if (mcm_cm_req_out(m_cm))
+ goto err;
+
+ /* insert on cm list, update proxy CM object tid */
+ mcm_qconn(smd, m_cm);
+ pmsg->cm_id = m_cm->entry.tid;
+ pmsg->cm_ctx = (uint64_t)m_cm;
+ pmsg->hdr.status = MIX_SUCCESS;
+ goto resp;
+err:
+ mlog(0, " ERR: %s\n", strerror(errno));
+ if (m_cm)
+ free(m_cm);
+
+ pmsg->hdr.status = MIX_EINVAL;
+resp:
+ /* send back response */
+ pmsg->hdr.flags = MIX_OP_RSP;
+ len = sizeof(dat_mix_cm_t);
+ ret = scif_send(smd->scif_cm_ep, pmsg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", smd->scif_cm_ep, ret, len);
+ return ret;
+ }
+ mlog(0, " MIX_CM_REQ_OUT: MPXYD id 0x%x, ctx %p - MIC id 0x%x, ctx %p\n",
+ pmsg->cm_id, pmsg->cm_ctx, m_cm->cm_id, m_cm->cm_ctx);
+
+ return 0;
+}
+
+/* Active, reply received, send RTU */
+static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+{
+ int len, ret;
+ struct mcm_cm *m_cm;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(smd->scif_cm_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: ret %d, exp %d\n", ret, len);
+ return ret;
+ }
+ mlog(1, " MIX_CM_RTU_OUT: cm_id %d, cm_ctx %p, qp_id %d \n",
+ pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id);
+
+ /* Find the QP for linking */
+ m_cm = mix_get_cm(smd, pmsg->cm_id);
+ if (!m_cm) {
+ mlog(0, " ERR: mcm_get_qp, id %d, not found\n", pmsg->qp_id);
+ return -1;
+ }
+
+ mlog(1," QPs 0x%x QPr 0x%x -> dport 0x%x, dqpn 0x%x dlid 0x%x psize %d\n",
+ m_cm->m_qp->qp_t.qp_num, m_cm->m_qp->qp_r.qp_num,
+ ntohs(m_cm->msg.dport), ntohl(m_cm->msg.dqpn),
+ ntohs(m_cm->msg.daddr.lid), ntohs(m_cm->msg.p_size));
+
+ /* send RTU on wire */
+ mcm_cm_rtu_out(m_cm);
+
+ return 0;
+}
+
+static int mix_cm_reply_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len)
+{
+ dat_mix_cm_t msg;
+ int ret, len;
+
+ mlog(0, " cm_id %d, ctx %p\n", m_cm->cm_id, m_cm->cm_ctx);
+
+ /* Forward, as is, conn_reply message to MIC client, with remote QP info */
+ msg.hdr.ver = DAT_MIX_VER;
+ msg.hdr.flags = MIX_OP_REQ;
+ msg.hdr.op = MIX_CM_REP;
+ msg.cm_id = m_cm->cm_id;
+ msg.cm_ctx = m_cm->cm_ctx;
+ memcpy(&msg.msg, pkt, pkt_len);
+
+ /* save dst id, daddr info, private data, in MPXYD object */
+ m_cm->msg.d_id = pkt->s_id;
+ memcpy(&m_cm->msg.daddr, &pkt->saddr, sizeof(dat_mcm_addr_t));
+ memcpy(&m_cm->msg.daddr2, &pkt->saddr2, sizeof(dat_mcm_addr_t));
+ memcpy(m_cm->msg.p_data, &pkt->p_data, ntohs(pkt->p_size));
+
+ /* modify_qp RTR,RTS: QPt (ib_qp) saddr2, to QPr (daddr) */
+ ret = modify_qp(m_cm->m_qp->ib_qp, IBV_QPS_RTR,
+ m_cm->msg.daddr.qpn, m_cm->msg.daddr.lid,
+ (union ibv_gid *)m_cm->msg.daddr.gid);
+ if (ret)
+ goto err;
+
+ ret = modify_qp(m_cm->m_qp->ib_qp, IBV_QPS_RTS,
+ m_cm->msg.daddr.qpn, m_cm->msg.daddr.lid, NULL);
+ if (ret)
+ goto err;
+
+ len = sizeof(dat_mix_cm_t);
+ ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len);
+ return -1;
+ }
+ mlog(0, " success cm_id %d\n", msg.cm_id);
+ pthread_mutex_lock(&m_cm->lock);
+ m_cm->state = MCM_REP_RCV;
+ pthread_mutex_unlock(&m_cm->lock);
+ return 0;
+err:
+ mlog(0, " ERR %s: my_id %d, mic_id %d, %p\n",
+ strerror(errno), m_cm->entry.tid, m_cm->cm_id, m_cm->cm_ctx);
+ return -1;
+}
+
+static int mix_cm_disc_in(mcm_cm_t *m_cm)
+{
+ dat_mix_hdr_t msg;
+ int ret, len;
+
+ /* send disconnect to MIC client */
+ msg.ver = DAT_MIX_VER;
+ msg.flags = MIX_OP_REQ;
+ msg.op = MIX_CM_DISC;
+ msg.req_id = m_cm->cm_id;
+
+ len = sizeof(dat_mix_hdr_t);
+ ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume sent without waiting on receiver */
+ if (ret != len) {
+ mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len);
+ return -1;
+ }
+ mlog(0, " MIX_CM_DISC_IN: cm_id %d\n", msg.req_id);
+ return 0;
+}
+
+/* Active, reply received, send RTU */
+static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg)
+{
+ int len, ret;
+ struct mcm_qp *m_qp;
+ struct ibv_send_wr *bad_wr;
+ struct ibv_sge sge;
+
+ /* hdr already read, get operation data */
+ len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(smd->scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: ret %d, exp %d\n", ret, len);
+ goto err;
+ }
+ mlog(1, " MIX_POST_SEND: q_id %d, q_ctx %p, len %d\n",
+ pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len);
+
+ m_qp = (struct mcm_qp*)pmsg->qp_ctx; /* trust me !!! */
+
+ /* XXX quick test, need to copy in msg_pool and adjust offset for pipelining */
+ len = pmsg->len;
+ ret = scif_recv(smd->scif_ep, smd->m_buf, len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: ret %d, exp %d\n", ret, len);
+ goto err;
+ }
+
+ pmsg->wr.sg_list = &sge;
+ sge.addr = (uint64_t)smd->m_mr->addr;
+ sge.lkey = smd->m_mr->lkey;
+ sge.length = len;
+ ret = ibv_post_send(m_qp->ib_qp, &pmsg->wr, &bad_wr);
+ if (ret) {
+ mlog(0, " ERR: ret %d, exp %d\n", ret, len);
+ goto err;
+ }
+ pmsg->hdr.status = MIX_SUCCESS;
+ goto resp;
+err:
+ mlog(0, " ERR: %s\n", strerror(errno));
+ pmsg->hdr.status = MIX_EINVAL;
+resp:
+ /* send back immediate errors */
+ pmsg->hdr.flags = MIX_OP_RSP;
+ len = sizeof(dat_mix_hdr_t);
+ ret = scif_send(smd->scif_ep, pmsg, len, SCIF_SEND_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: send on scif_ep %d, ret %d, exp %d\n", smd->scif_cm_ep, ret, len);
+ return ret;
+ }
+
+ /* if post fails return error */
+ mlog(1, " MIX_POST_SEND: success q_id %d, q_ctx %p, len %d\n",
+ pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len);
+
return 0;
}
/* receive MIX operations on connected SCIF endpoint */
-static int mix_scif_recv(mcm_scif_dev_t *smd)
+static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep)
{
char cmd[DAT_MIX_MSG_MAX];
dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd;
int ret, len;
len = sizeof(*phdr);
- ret = scif_recv(smd->scif_ep, phdr, len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK);
if ((ret != len) || (phdr->ver != DAT_MIX_VER)) {
mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n",
smd->scif_ep, ret, len, phdr->ver);
return -1;
}
-
mlog(0, " ver %d, op %d, flags %d\n", phdr->ver, phdr->op, phdr->flags);
switch (phdr->op) {
case MIX_CQ_FREE:
ret = mix_cq_destroy(smd, phdr);
break;
- case MIX_WRITE:
case MIX_SEND:
+ ret = mix_post_send(smd, (dat_mix_send_t *)phdr);
+ break;
case MIX_LISTEN:
ret = mix_listen(smd, (dat_mix_listen_t *)phdr);
break;
ret = mix_listen_free(smd, phdr);
break;
case MIX_CM_REQ:
-
+ ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr);
+ break;
case MIX_CM_REP:
case MIX_CM_ACCEPT:
case MIX_CM_REJECT:
case MIX_CM_RTU:
+ ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr);
+ break;
case MIX_CM_EST:
case MIX_CM_DISC:
- case MIX_CM_REPLY:
+ case MIX_CM_DREP:
default:
mlog(0, " ERROR!!! unknown MIX operation: %d\n", phdr->op);
return -1;
return ret;
}
-/* receive MIX CM messages on connected SCIF endpoint */
-static int mix_scif_recv_cm(mcm_scif_dev_t *smd)
-{
- char cmd[DAT_MIX_MSG_MAX];
- dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd;
- int ret, len;
-
- len = sizeof(*phdr);
- ret = scif_recv(smd->scif_ep, phdr, len, SCIF_RECV_BLOCK);
- if ((ret != len) || (phdr->ver != DAT_MIX_VER)) {
- mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n",
- smd->scif_ep, ret, len, phdr->ver);
- return -1;
- }
-
- mlog(0, " ver %d, op %d, flags %d\n", phdr->ver, phdr->op, phdr->flags);
-
- switch (phdr->op) {
- case MIX_CM_REQ:
-
- default:
- mlog(0, " ERROR!!! unknown MIX CM message: %d\n", phdr->op);
- return -1;
- }
-
- return ret;
-}
-
-
/*
*
* Fabric side MCM messages, IB UD QP
}
ibv_ack_async_event(&event);
}
+ /* no need to send event to MIX client, it has same IB device opened */
}
/* Get CM UD message from send queue, called with s_lock held */
uint16_t dlid = ntohs(msg->daddr.lid);
/* Get message from send queue, copy data, and send */
- pthread_mutex_lock(&md->slock);
+ pthread_mutex_lock(&md->txlock);
if ((smsg = mcm_get_smsg(md)) == NULL) {
mlog(0, " mcm_send ERR: get_smsg(hd=%d,tl=%d) \n", md->s_hd, md->s_tl);
goto bail;
sge.lkey = md->mr_sbuf->lkey;
sge.addr = (uintptr_t)smsg;
- mlog(2," mcm_send: op %s ln %d lid %x c_qpn %x rport %x\n",
- mcm_op_str(ntohs(smsg->op)),
- sge.length, htons(smsg->daddr.lid),
- htonl(smsg->dqpn), htons(smsg->dport));
+ mlog(1," mcm_send: op %s ln %d lid %x c_qpn %x rport %x, p_size %d\n",
+ mcm_op_str(ntohs(smsg->op)), sge.length, ntohs(smsg->daddr.lid),
+ ntohl(smsg->dqpn), ntohs(smsg->dport), p_size);
/* empty slot, then create AH */
if (!md->ah[dlid]) {
- md->ah[dlid] =
- mcm_create_ah(md, md->pd, md->qp, dlid, NULL);
- if (!md->ah[dlid])
+ md->ah[dlid] = mcm_create_ah(md, md->pd, md->qp, dlid, NULL);
+ if (!md->ah[dlid]) {
+ mlog(0, " ERR: create_ah %s\n", strerror(errno));
goto bail;
+ }
}
wr.wr.ud.ah = md->ah[dlid];
wr.wr.ud.remote_qkey = DAT_MCM_UD_QKEY;
ret = ibv_post_send(md->qp, &wr, &bad_wr);
- if (ret)
- mlog(0, " mcm_send ERR: post_send() %s\n", strerror(errno));
bail:
- pthread_mutex_unlock(&md->slock);
+ if (ret)
+ mlog(0, " ERR: ibv_post_send() %s\n", strerror(errno));
+
+ pthread_mutex_unlock(&md->txlock);
return ret;
}
return (ibv_post_recv(md->qp, &recv_wr, &recv_err));
}
-static int mcm_reject(mcm_ib_dev_t *md, dat_mcm_msg_t *msg)
+static int mcm_cm_rej_out(mcm_ib_dev_t *md, dat_mcm_msg_t *msg)
{
dat_mcm_msg_t smsg;
return (mcm_send(md, &smsg, NULL, 0));
}
-static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm)
+static int mcm_cm_rep_out(mcm_cm_t *cm)
+{
+ pthread_mutex_lock(&cm->lock);
+ if (cm->state != MCM_RTU_PENDING) {
+ mlog(1, " CM_REPLY: wrong state qp %p cm %p %s refs=%d"
+ " %x %x i_%x -> %x %x i_%x l_pid %x r_pid %x\n",
+ cm->m_qp, cm, mcm_state_str(cm->state),
+ cm->ref_count,
+ htons(cm->msg.saddr.lid),
+ htons(cm->msg.sport),
+ htonl(cm->msg.saddr.qpn),
+ htons(cm->msg.daddr.lid),
+ htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn),
+ ntohl(cm->msg.s_id),
+ ntohl(cm->msg.d_id));
+ pthread_mutex_unlock(&cm->lock);
+ return -1;
+ }
+
+ if (cm->retries == cm->md->retries) {
+ mlog(0, " CM_REPLY: RETRIES EXHAUSTED (lid port qpn)"
+ " %x %x %x -> %x %x %x\n",
+ htons(cm->msg.saddr.lid),
+ htons(cm->msg.sport),
+ htonl(cm->msg.saddr.qpn),
+ htons(cm->msg.daddr.lid),
+ htons(cm->msg.dport),
+ htonl(cm->msg.daddr.qpn));
+
+ pthread_mutex_unlock(&cm->lock);
+ mix_cm_event(cm, DAT_CONNECTION_EVENT_TIMED_OUT);
+ return -1;
+ }
+
+ cm->timer = mcm_time_us(); /* RTU expected */
+ if (mcm_send(cm->md, &cm->msg, cm->p_data, cm->p_size)) {
+ mlog(0," accept ERR: mcm reply send()\n");
+ pthread_mutex_unlock(&cm->lock);
+ return -1;
+ }
+ pthread_mutex_unlock(&cm->lock);
+ return 0;
+}
+
+
+static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm, int len)
{
pthread_mutex_lock(&cm->lock);
switch (cm->state) {
//mcm_accept_rtu(cm, msg);
break;
case MCM_REP_PENDING: /* active */
+ mlog(1, "REP_PENDING: cm %p, my_id %d, mic_id %d\n", cm, cm->entry.tid, cm->cm_id);
+ pthread_mutex_unlock(&cm->lock);
+ mix_cm_reply_in(cm, msg, len);
+ break;
+ case MCM_REP_RCV: /* active */
+ if (ntohs(msg->op) == MCM_REP)
+ mlog(1, "REP_RCV: DUPLICATE cm %p, my_id %d, mic_id %d\n", cm, cm->entry.tid, cm->cm_id);
pthread_mutex_unlock(&cm->lock);
- //mcm_connect_rtu(cm, msg);
break;
case MCM_CONNECTED: /* active and passive */
/* DREQ, change state and process */
pthread_mutex_t *lock;
int listenq = 0;
+ mlog(1, " <- rmsg: op %s [lid, port, cqp, iqp, iqp2, pid]: "
+ "%x %x %x %x %x <- %x %x %x %x %x l_pid %x r_pid %x\n",
+ mcm_op_str(ntohs(msg->op)), ntohs(msg->daddr.lid), ntohs(msg->dport),
+ ntohl(msg->dqpn), ntohl(msg->daddr.qpn), ntohl(msg->daddr2.qpn),
+ ntohl(msg->d_id), ntohs(msg->saddr.lid), ntohs(msg->sport), ntohl(msg->sqpn),
+ ntohl(msg->saddr.qpn), ntohl(msg->saddr2.qpn), ntohl(msg->s_id),
+ ntohl(msg->d_id));
+
/* conn list first, duplicate requests for MCM_REQ */
list = &smd->clist;
lock = &smd->clock;
if (cm->state == MCM_DESTROY || cm->state == MCM_FREE)
continue;
+ mlog(1, " CM %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n",
+ mcm_state_str(cm->state),
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id));
+
/* CM sPORT + QPN, match is good enough for listenq */
if (listenq &&
cm->msg.sport == msg->dport &&
ntohs(msg->saddr.lid), ntohs(msg->sport),
ntohl(msg->saddr.qpn));
- mcm_reject(smd->md, msg);
+ mcm_cm_rej_out(smd->md, msg);
}
if (!found) {
ntohl(msg->d_id));
if (ntohs(msg->op) == MCM_DREP) {
- /* DREP_DUP */
+ /* DREP_DUP, counters */
}
}
int i, ret, notify = 0;
struct ibv_cq *ibv_cq = NULL;
-
/* POLLIN on channel FD */
ret = ibv_get_cq_event(md->rch, &ibv_cq, (void *)&md);
if (ret == 0) {
for (i = 0; i < ret; i++) {
msg = (dat_mcm_msg_t*) (uintptr_t) wc[i].wr_id;
- mlog(2, " mcm_recv: stat=%d op=%s ln=%d id=%p sqp=%x\n",
+ mlog(1, " mcm_recv: stat=%d op=%s ln=%d id=%p sqp=%x\n",
wc[i].status, mcm_op_str(ntohs(msg->op)),
wc[i].byte_len,
(void*)wc[i].wr_id, wc[i].src_qp);
}
/* match, process it */
- mcm_process_recv(md, msg, cm);
+ mcm_process_recv(md, msg, cm, wc[i].byte_len);
mcm_post_rmsg(md, msg);
}
goto retry;
}
+static int mcm_cm_req_out(mcm_cm_t *m_cm)
+{
+
+ mlog(1, " %s 0x%x %x 0x%x -> 0x%x %x 0x%x\n",
+ mcm_state_str(m_cm->state),
+ htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn),
+ htons(m_cm->msg.sport), htons(m_cm->msg.daddr.lid),
+ htonl(m_cm->msg.dqpn), htons(m_cm->msg.dport));
+
+ pthread_mutex_lock(&m_cm->lock);
+ if (m_cm->state != MCM_INIT && m_cm->state != MCM_REP_PENDING)
+ goto bail;
+
+ if (m_cm->retries == m_cm->md->retries) {
+ mlog(0, " CM_REQ: RETRIES EXHAUSTED: 0x%x %x 0x%x -> 0x%x %x 0x%x\n",
+ htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn), htons(m_cm->msg.sport),
+ htons(m_cm->msg.daddr.lid), htonl(m_cm->msg.dqpn), htons(m_cm->msg.dport));
+
+ mix_cm_event(m_cm, DAT_CONNECTION_EVENT_TIMED_OUT);
+ m_cm->state = MCM_FREE;
+ goto bail;
+ }
+
+ mlog(1, " m_cm %p, state = %d, retries =%d\n", m_cm, m_cm->state,m_cm->md->retries);
+
+ m_cm->state = MCM_REP_PENDING;
+ m_cm->msg.op = htons(MCM_REQ);
+ m_cm->timer = mcm_time_us(); /* reset reply timer */
+
+ if (mcm_send(m_cm->md, &m_cm->msg, &m_cm->msg.p_data, ntohs(m_cm->msg.p_size)))
+ return -1;
+
+ pthread_mutex_unlock(&m_cm->lock);
+ return 0;
+bail:
+ /* send CM event */
+ pthread_mutex_unlock(&m_cm->lock);
+ return -1;
+}
+
+static int mcm_cm_rtu_out(mcm_cm_t *m_cm)
+{
+ mlog(1, " %s 0x%x %x 0x%x -> 0x%x %x 0x%x\n",
+ mcm_state_str(m_cm->state),
+ htons(m_cm->msg.saddr.lid), htonl(m_cm->msg.saddr.qpn),
+ htons(m_cm->msg.sport), htons(m_cm->msg.daddr.lid),
+ htonl(m_cm->msg.dqpn), htons(m_cm->msg.dport));
+
+ pthread_mutex_lock(&m_cm->lock);
+ if (m_cm->state != MCM_REP_RCV) {
+ mlog(0, " state %s wrong, s/be REP_RCV\n", mcm_state_str(m_cm->state));
+ goto bail;
+ }
+
+ m_cm->state = MCM_CONNECTED;
+ m_cm->msg.op = htons(MCM_RTU);
+ m_cm->timer = mcm_time_us(); /* reset reply timer */
+
+ if (mcm_send(m_cm->md, &m_cm->msg, NULL, 0))
+ return -1;
+
+ pthread_mutex_unlock(&m_cm->lock);
+ return 0;
+bail:
+ /* send CM event */
+ pthread_mutex_unlock(&m_cm->lock);
+ return -1;
+}
+
+
+/* SMD device lock held */
+static void mcm_check_timers(mcm_scif_dev_t *smd, int *timer)
+{
+ uint64_t time;
+ mcm_cm_t *cm;
+
+ pthread_mutex_lock(&smd->clock);
+ cm = get_head_entry(&smd->clist);
+ while (cm) {
+ pthread_mutex_lock(&cm->lock);
+ time = mcm_time_us();
+ switch (cm->state) {
+ case MCM_REP_PENDING:
+ *timer = cm->md->cm_timer;
+ /* wait longer each retry */
+ if ((time - cm->timer)/1000 > (cm->md->rep_time << cm->retries)) {
+ mlog(1, " CM_REQ retry %p %d [lid, port, cqp, iqp]:"
+ " %x %x %x %x -> %x %x %x %x Time(ms) %d > %d\n",
+ cm, cm->retries+1,
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn),
+ (time - cm->timer)/1000,
+ cm->md->rep_time << cm->retries);
+ cm->retries++;
+ pthread_mutex_unlock(&cm->lock);
+ mcm_cm_req_out(cm);
+ pthread_mutex_lock(&cm->lock);
+ break;
+ }
+ break;
+ case MCM_RTU_PENDING:
+ *timer = cm->md->cm_timer;
+ if ((time - cm->timer)/1000 > (cm->md->rtu_time << cm->retries)) {
+ mlog(1, " CM_REPLY retry %d %s [lid, port, cqp, iqp]:"
+ " %x %x %x %x -> %x %x %x %x r_pid %x Time(ms) %d > %d\n",
+ cm->retries+1,
+ mcm_op_str(ntohs(cm->msg.op)),
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn),
+ ntohl(cm->msg.d_id),
+ (time - cm->timer)/1000,
+ cm->md->rtu_time << cm->retries);
+ cm->retries++;
+ pthread_mutex_unlock(&cm->lock);
+ mcm_cm_rep_out(cm);
+ pthread_mutex_lock(&cm->lock);
+ break;
+ }
+ break;
+ case MCM_DISC_PENDING:
+ *timer = cm->md->cm_timer;
+ /* wait longer each retry */
+ if ((time - cm->timer)/1000 > (cm->md->rtu_time << cm->retries)) {
+ mlog(1, " CM_DREQ retry %d [lid, port, cqp, iqp]:"
+ " %x %x %x %x -> %x %x %x %x r_pid %x Time(ms) %d > %d\n",
+ cm->retries+1,
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn),
+ ntohl(cm->msg.d_id),
+ (time - cm->timer)/1000,
+ cm->md->rtu_time << cm->retries);
+ cm->retries++;
+ pthread_mutex_unlock(&cm->lock);
+ mix_cm_disc_in(cm);
+ pthread_mutex_lock(&cm->lock);
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+ pthread_mutex_unlock(&cm->lock);
+ cm = get_next_entry(&cm->entry, &smd->clist);
+ }
+ pthread_mutex_unlock(&smd->clock);
+}
+
+void mcm_cq_event(struct mcm_cq *m_cq)
+{
+ struct ibv_cq *ib_cq;
+ void *cq_ctx;
+ int ret, notify = 0;
+ struct ibv_wc wc[10];
+
+ mlog(1," m_cq(%p) \n", m_cq);
+
+ ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx);
+ mlog(1," m_cq %p == cq_ctx %p \n", m_cq, cq_ctx);
+ if (ret == 0) {
+ ibv_ack_cq_events(m_cq->ib_ch, 1);
+ }
+retry:
+ ret = ibv_poll_cq(m_cq->ib_cq, 10, wc);
+ mlog(1," completions = %d \n", ret);
+ if (ret <= 0) {
+ if (!ret && !notify) {
+ ibv_req_notify_cq(m_cq->ib_cq, 0);
+ notify = 1;
+ goto retry;
+ }
+ return;
+ } else
+ notify = 0;
+
+ mix_dto_event(m_cq, wc, ret);
+ goto retry;
+
+}
+
+
/*
* MPXY server will listen on both a IB UD QP for fabric CM messages
* and a SCIF port for inter-bus MCM operation messages to/from MIC MCM clients.
* one thread for both SCIF and IB traffic, try FD select for now
* and move to polling memory if we can't get pipelining at wire speeds.
*
- * TBD: need another thread dedicated as a data mover
+ * TODO: need another thread dedicated as a data mover ??
*
*/
static void mpxy_server(void)
struct mcm_fd_set *set;
struct mcm_ib_dev *md;
struct mcm_scif_dev *smd, *next;
+ struct mcm_cq *m_cq;
int time_ms, ret;
/* FD array */
/* SCIF listen EP, MIC client open requests */
mcm_fd_set(scif_listen_ep, set, POLLIN);
- /* trigger on all active IB devices */
+ /* all active IB devices */
pthread_mutex_lock(&mcm_llock);
md = get_head_entry(&mcm_list);
while (md) {
mcm_fd_set(md->ibctx->async_fd, set, POLLIN);
mcm_fd_set(md->rch->fd, set, POLLIN);
- /* trigger on all active SCIF ep's */
+ /* all active SCIF MIC clients */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd) {
+ pthread_mutex_lock(&smd->cqlock);
+ m_cq = get_head_entry(&smd->cqlist);
+ while (m_cq) {
+ mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN);
+ m_cq = get_next_entry(&m_cq->entry, &smd->cqlist);
+ }
+ pthread_mutex_unlock(&smd->cqlock);
mcm_fd_set(smd->scif_ep, set, POLLIN);
mcm_fd_set(smd->scif_cm_ep, set, POLLIN);
+ mcm_check_timers(smd, &time_ms);
smd = get_next_entry(&smd->entry, &md->smd_list);
}
pthread_mutex_unlock(&md->slock);
pthread_mutex_lock(&mcm_llock);
md = get_head_entry(&mcm_list);
while (md) {
- /* process MCM events: async device and CM msgs */
+ /* MCM IB events: async device and CM msgs */
if (mcm_poll(md->rch->fd, POLLIN) == POLLIN)
mcm_ib_recv(md);
if (mcm_poll(md->ibctx->async_fd, POLLIN) == POLLIN)
mcm_ib_async_event(md);
- /* process SCIF operation and CM channels */
+ /* SCIF MIC client ops and conn messages */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd) {
+ pthread_mutex_lock(&smd->cqlock);
+ m_cq = get_head_entry(&smd->cqlist);
+ while (m_cq) {
+ ret = mcm_poll(m_cq->ib_ch->fd, POLLIN);
+ if (ret == POLLIN)
+ mcm_cq_event(m_cq);
+ m_cq = get_next_entry(&m_cq->entry, &smd->cqlist);
+ }
+ pthread_mutex_unlock(&smd->cqlock);
ret = mcm_poll(smd->scif_ep, POLLIN); /* OP */
if (ret == POLLIN)
- ret = mix_scif_recv(smd);
-
- ret = mcm_poll(smd->scif_cm_ep, POLLIN); /* CM */
- if (ret == POLLIN)
- ret = mix_scif_recv_cm(smd);
-
+ ret = mix_scif_recv(smd, smd->scif_ep);
+ if (!ret) {
+ ret = mcm_poll(smd->scif_cm_ep, POLLIN); /* CM */
+ if (ret == POLLIN)
+ ret = mix_scif_recv(smd, smd->scif_cm_ep);
+ }
next = get_next_entry(&smd->entry, &md->smd_list);
if (ret)
mix_close_device(md, smd);
-
smd = next;
}
pthread_mutex_unlock(&md->slock);
typedef enum dat_mcm_op
{
- MCM_REQ = 1,
+ MCM_INVALID,
+ MCM_REQ,
MCM_REP,
MCM_REJ_USER, /* user reject */
MCM_REJ_CM, /* cm reject */
} DAT_MCM_OP;
+static inline char * mcm_op_str(IN int op)
+{
+ static char *ops[] = {
+ "INVALID",
+ "REQ",
+ "REP",
+ "REJ_USER",
+ "REJ_CM",
+ "RTU",
+ "DREQ",
+ "DREP",
+ };
+ return ((op < 1 || op > 7) ? "Invalid OP?" : ops[op]);
+}
+
+typedef enum dat_mcm_state
+{
+ MCM_INIT,
+ MCM_LISTEN,
+ MCM_CONN_PENDING,
+ MCM_REP_PENDING,
+ MCM_REP_RCV,
+ MCM_ACCEPTING,
+ MCM_ACCEPTING_DATA,
+ MCM_ACCEPTED,
+ MCM_REJECTING,
+ MCM_REJECTED,
+ MCM_CONNECTED,
+ MCM_RELEASE,
+ MCM_DISC_PENDING,
+ MCM_DISCONNECTED,
+ MCM_DESTROY,
+ MCM_RTU_PENDING,
+ MCM_DISC_RECV,
+ MCM_FREE,
+
+} DAT_MCM_STATE;
+
+static inline char * mcm_state_str(IN int st)
+{
+ static char *state[] = {
+ "INIT",
+ "LISTEN",
+ "CONN_PENDING",
+ "REP_PENDING",
+ "REP_RECV",
+ "ACCEPTING",
+ "ACCEPTING_DATA",
+ "ACCEPTED",
+ "REJECTING",
+ "REJECTED",
+ "CONNECTED",
+ "RELEASE",
+ "DISC_PENDING",
+ "DISCONNECTED",
+ "DESTROY",
+ "RTU_PENDING",
+ "DISC_RECV",
+ "FREE"
+ };
+ return ((st < 0 || st > 16) ? "Invalid CM state?" : state[st]);
+}
+
/* MCM address, 28 bytes */
typedef struct dat_mcm_addr
{
uint32_t s_id; /* src pid */
uint32_t d_id; /* dst pid */
uint8_t rd_in; /* atomic_rd_in */
- uint8_t resv[5]; /* 2 connections for MCM endpoints */
- dat_mcm_addr_t saddr; /* 1st RC - local MPXY QP -> */
- dat_mcm_addr_t daddr; /* <- remote MIC QP */
- dat_mcm_addr_t saddr2; /* 2nd RC - local MIC QP -> */
- dat_mcm_addr_t daddr2; /* <- remote MPXY QP */
+ uint8_t resv[5];/* Shadow QP's, 2 connections */
+ dat_mcm_addr_t saddr; /* QPt local, MPXY or MCM on non-MIC node */
+ dat_mcm_addr_t saddr2; /* QPr local, MIC or MCM on non-MIC node */
+ dat_mcm_addr_t daddr; /* QPt remote, MPXY or MCM on non-MIC node */
+ dat_mcm_addr_t daddr2; /* QPr remote, MIC or MCM on non-MIC node */
uint8_t p_data[DAT_MCM_PDATA_SIZE];
} dat_mcm_msg_t;
typedef enum dat_mix_ops
{
- MIX_IA_OPEN = 1,
+ MIX_IA_OPEN = 2,
MIX_IA_CLOSE,
MIX_LISTEN,
MIX_LISTEN_FREE,
MIX_CM_RTU,
MIX_CM_EST,
MIX_CM_DISC,
- MIX_CM_REPLY,
- MIX_WRITE,
+ MIX_CM_DREP,
+ MIX_CM_EVENT,
+ MIX_DTO_EVENT,
MIX_SEND,
} dat_mix_ops_t;
typedef enum dat_mix_op_flags
{
- MIX_OP_REQ = 0x00,
- MIX_OP_RSP = 0x01,
- MIX_OP_SYNC = 0x02,
- MIX_OP_ASYNC = 0x04,
+ MIX_OP_REQ = 0x01,
+ MIX_OP_RSP = 0x02,
+ MIX_OP_SYNC = 0x04,
+ MIX_OP_ASYNC = 0x08,
+ MIX_OP_INLINE = 0x10,
} dat_mix_op_flags_t;
dat_mix_hdr_t hdr;
uint64_t cm_ctx;
uint32_t cm_id;
+ uint32_t qp_id;
dat_mcm_msg_t msg;
} dat_mix_cm_t;
+typedef struct dat_mix_cm_event
+{
+ dat_mix_hdr_t hdr;
+ uint64_t cm_ctx;
+ uint64_t qp_ctx;
+ uint32_t cm_id;
+ uint32_t qp_id;
+ uint32_t event;
+
+} dat_mix_cm_event_t;
+
+typedef struct dat_mix_dto_event
+{
+ dat_mix_hdr_t hdr;
+ uint64_t cq_ctx;
+ uint32_t cq_id;
+ struct ibv_wc wc;
+
+} dat_mix_dto_event_t;
+
+#define DAT_MIX_SGE_MAX 7
+typedef struct dat_mix_send
+{
+ dat_mix_hdr_t hdr;
+ uint32_t qp_id;
+ uint32_t len;
+ uint64_t qp_ctx;
+ struct ibv_send_wr wr;
+ struct ibv_sge sge[DAT_MIX_SGE_MAX];
+
+} dat_mix_send_t;
+
#endif /* _DAT_MIC_EXTENSIONS_H_ */
#define CNO_TIMEOUT (1000*1000*1)
#define DTO_FLUSH_TIMEOUT (1000*1000*2)
#define CONN_TIMEOUT (1000*1000*100)
-#define SERVER_TIMEOUT 10000000
+#define SERVER_TIMEOUT DAT_TIMEOUT_INFINITE
#define RDMA_BUFFER_SIZE (64)
/* Global DAT vars */
ret = send_msg(&rmr_send_msg,
sizeof(DAT_RMR_TRIPLET),
lmr_context_send_msg,
- cookie, DAT_COMPLETION_SUPPRESS_FLAG);
+ cookie, DAT_COMPLETION_DEFAULT_FLAG);
if (ret != DAT_SUCCESS) {
fprintf(stderr, "%d Error send_msg: %s\n",
} else
LOGPRINTF("%d send_msg completed\n", getpid());
+ if (collect_event(h_dto_req_evd,
+ &event,
+ DTO_TIMEOUT,
+ &poll_count) != DAT_SUCCESS)
+ return (DAT_ABORT);
+
+ printf("%d send event completed, waiting for received message \n", getpid());
+
+
/*
* Wait for remote RMR information for RDMA
*/