From 80676fc554ab7c9b68625a6424755ec15c0dc807 Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Wed, 11 Jul 2012 09:33:19 -0700 Subject: [PATCH] Sends/receives working, Xeon to MIC --- dapl/openib_common/cq.c | 24 ++++ dapl/openib_common/dapl_ib_common.h | 5 + dapl/openib_common/qp.c | 4 + dapl/openib_mcm/cm.c | 16 +-- dapl/openib_mcm/dapl_ib_util.h | 5 +- dapl/openib_mcm/mix.c | 160 +++++++++++++---------- dapl/svc/mpxyd.c | 177 ++++++++++++++++---------- dat/include/dat2/dat_mic_extensions.h | 47 ++++++- test/dtest/dtest.c | 6 - 9 files changed, 291 insertions(+), 153 deletions(-) diff --git a/dapl/openib_common/cq.c b/dapl/openib_common/cq.c index 30f2354..557cba0 100644 --- a/dapl/openib_common/cq.c +++ b/dapl/openib_common/cq.c @@ -290,6 +290,12 @@ dapls_evd_dto_wakeup(IN DAPL_EVD * evd_ptr) if (evd_ptr->cno_ptr) dapl_os_wait_object_wakeup(&evd_ptr->wait_object); +#ifdef _OPENIB_MCM_ + if ((evd_ptr->ib_cq_handle->tp->scif_ep) && + (evd_ptr->ib_cq_handle->type & DCM_CQ_SND)) + dapl_os_wait_object_wakeup(&evd_ptr->wait_object); +#endif + /* otherwise, no wake up mechanism */ return DAT_SUCCESS; } @@ -339,6 +345,12 @@ dapls_evd_dto_wait(IN DAPL_EVD * evd_ptr, IN uint32_t timeout) " cq_object_wait: EVD %p time %d\n", evd_ptr, timeout); +#ifdef _OPENIB_MCM_ + if ((evd_ptr->ib_cq_handle->tp->scif_ep) && + (evd_ptr->ib_cq_handle->type & DCM_CQ_SND)) { + return (dapl_os_wait_object_wait(&evd_ptr->wait_object, timeout)); + } +#endif status = dapls_wait_comp_channel(channel, timeout); if (!status) { if (!ibv_get_cq_event(channel, &ibv_cq, &context)) { @@ -489,6 +501,18 @@ DAT_RETURN dapls_ib_completion_poll(IN DAPL_HCA * hca_ptr, { int ret; +#ifdef _OPENIB_MCM_ + if ((evd_ptr->ib_cq_handle->tp->scif_ep) && + (evd_ptr->ib_cq_handle->type & DCM_CQ_SND)) { + ret = dapli_mix_cq_poll(evd_ptr->ib_cq_handle); + if (ret == 0) + DAT_QUEUE_EMPTY; + else if (ret == 1) + return DAT_SUCCESS; + else if (ret == -1) + return DAT_ABORT; + } +#endif ret = ibv_poll_cq(evd_ptr->ib_cq_handle->ib_cq, 1, wc_ptr); if (ret == 1) return DAT_SUCCESS; diff --git a/dapl/openib_common/dapl_ib_common.h b/dapl/openib_common/dapl_ib_common.h index 07acd54..6d290e5 100644 --- a/dapl/openib_common/dapl_ib_common.h +++ b/dapl/openib_common/dapl_ib_common.h @@ -43,6 +43,9 @@ #define true 1 #endif /*__cplusplus */ +#define DCM_CQ_SND 0x1 +#define DCM_CQ_RCV 0x2 + /* Typedefs to map common DAPL provider types to IB verbs */ struct dcm_ib_qp { uint64_t qp_ctx; /* local */ @@ -64,6 +67,8 @@ struct dcm_ib_cq { struct ibv_comp_channel *ib_ch; uint32_t cq_id; /* local */ uint32_t scq_id; /* shadow */ + int type; + }; typedef struct dcm_ib_cq *ib_cq_handle_t; diff --git a/dapl/openib_common/qp.c b/dapl/openib_common/qp.c index a05596f..81cfec4 100644 --- a/dapl/openib_common/qp.c +++ b/dapl/openib_common/qp.c @@ -189,6 +189,10 @@ dapls_ib_qp_alloc(IN DAPL_IA * ia_ptr, #else #ifdef _OPENIB_MCM_ + /* mark type of CQ */ + req_cq->type = DCM_CQ_SND; + rcv_cq->type != DCM_CQ_RCV; + /* save resources, 1st QP is receiver, 2nd is sender */ if (ia_ptr->hca_ptr->ib_trans.scif_ep) { qp_create.cap.max_inline_data = 0; diff --git a/dapl/openib_mcm/cm.c b/dapl/openib_mcm/cm.c index 7828d70..2cc7919 100644 --- a/dapl/openib_mcm/cm.c +++ b/dapl/openib_mcm/cm.c @@ -102,8 +102,7 @@ static int dapl_select(struct dapl_fd_set *set, int time_ms) { int ret; - dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n", - set->index); + dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n", set->index); ret = poll(set->set, set->index, time_ms); dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup, ret=0x%x\n", ret); return ret; @@ -1026,7 +1025,7 @@ dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm) cm->msg.op = htons(MCM_REQ); dapl_os_get_time(&cm->timer); /* reset reply timer */ - if (cm->tp->scif_cm_ep) { /* MIC: CM service on MPXY */ + if (cm->tp->scif_ep) { /* MIC: proxy CR to MPXYD */ if (dapli_mix_cm_req_out(cm, ep->qp_handle)) goto bail; } else { @@ -1951,8 +1950,7 @@ void cm_thread(void *arg) dapl_fd_set(hca->ib_trans.signal.scm[0], set, DAPL_FD_READ); dapl_fd_set(hca->ib_hca_handle->async_fd, set, DAPL_FD_READ); dapl_fd_set(hca->ib_trans.rch_fd, set, DAPL_FD_READ); - dapl_fd_set(hca->ib_trans.scif_ep, set, DAPL_FD_READ); - dapl_fd_set(hca->ib_trans.scif_cm_ep, set, DAPL_FD_READ); + dapl_fd_set(hca->ib_trans.scif_un_ep, set, DAPL_FD_READ); dapl_fd_set(hca->ib_trans.ib_cq->fd, set, DAPL_FD_READ); if (!dapl_llist_is_empty(&hca->ib_trans.list)) @@ -1997,13 +1995,9 @@ void cm_thread(void *arg) DAPL_FD_READ) == DAPL_FD_READ) { mcm_recv(&hca->ib_trans); } - if (dapl_poll(hca->ib_trans.scif_ep, + if (dapl_poll(hca->ib_trans.scif_un_ep, DAPL_FD_READ) == DAPL_FD_READ) { - dapli_mix_recv(hca, hca->ib_trans.scif_ep); - } - if (dapl_poll(hca->ib_trans.scif_cm_ep, - DAPL_FD_READ) == DAPL_FD_READ) { - dapli_mix_recv(hca, hca->ib_trans.scif_cm_ep); + dapli_mix_recv(hca, hca->ib_trans.scif_un_ep); } if (dapl_poll(hca->ib_hca_handle->async_fd, DAPL_FD_READ) == DAPL_FD_READ) { diff --git a/dapl/openib_mcm/dapl_ib_util.h b/dapl/openib_mcm/dapl_ib_util.h index 259b82d..ab90655 100644 --- a/dapl/openib_mcm/dapl_ib_util.h +++ b/dapl/openib_mcm/dapl_ib_util.h @@ -135,7 +135,7 @@ typedef struct _ib_hca_transport /* SCIF MIC indirect, EP to MPXYD services, if running on MIC */ struct scif_portID self; scif_epd_t scif_ep; /* FD operation processing */ - scif_epd_t scif_cm_ep; /* FD CM packet processing */ + scif_epd_t scif_un_ep; /* FD CM packet processing */ struct scif_portID peer; /* MPXYD op proxy addr info */ struct scif_portID peer_cm; /* MPXYD cm proxy addr info */ off_t scif_adr; /* MPXYD RDMA memory pool */ @@ -164,12 +164,15 @@ int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr, int dapli_mix_qp_free(ib_qp_handle_t m_qp); int dapli_mix_cq_create(ib_cq_handle_t m_cq); int dapli_mix_cq_free(ib_cq_handle_t m_cq); +int dapli_mix_cq_wait(ib_cq_handle_t m_cq, int time); +int dapli_mix_cq_poll(ib_cq_handle_t m_cq); int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp); int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm); int dapli_mix_post_send(ib_qp_handle_t m_qp, int len, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr); int dapli_mix_recv(DAPL_HCA *hca, int scif_ep); + #ifdef DAPL_COUNTERS void dapls_print_cm_list(IN DAPL_IA *ia_ptr); #endif diff --git a/dapl/openib_mcm/mix.c b/dapl/openib_mcm/mix.c index 5494a40..57fb923 100644 --- a/dapl/openib_mcm/mix.c +++ b/dapl/openib_mcm/mix.c @@ -76,12 +76,12 @@ int dapli_mix_open(ib_hca_transport_t *tp, char *name, int port) } dapl_log(DAPL_DBG_TYPE_EXTENSION,"Connected to node 0 for operations\n"); - tp->scif_cm_ep = scif_open(); - if (tp->scif_cm_ep < 0) { + tp->scif_un_ep = scif_open(); + if (tp->scif_un_ep < 0) { dapl_log(1, "scif_open() for cm_ep failed with error %d\n", errno); return -1; } - ret = scif_connect(tp->scif_cm_ep, &tp->peer); + ret = scif_connect(tp->scif_un_ep, &tp->peer); if (ret < 0) { dapl_log(1, "scif_connect() CM EP to port %d failed with error %d\n", errno); return -1; @@ -342,11 +342,11 @@ int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr, } /* save QP_t id and ctx, needed for posting WR */ - m_qp->sqp_id = msg.qp_t.qp_id; - m_qp->sqp_ctx = msg.qp_t.ctx; + m_qp->qp_id = msg.qp_t.qp_id; + m_qp->qp_ctx = msg.qp_t.ctx; - dapl_log(DAPL_DBG_TYPE_EXTENSION, " MIX_QP_CREATE: reply, sqp_id 0x%x, ctx %p\n", - m_qp->sqp_id, (void*)m_qp->sqp_ctx ); + dapl_log(DAPL_DBG_TYPE_EXTENSION, " MIX_QP_CREATE: reply, proxy qp_id 0x%x, ctx %p\n", + m_qp->qp_id, (void*)m_qp->qp_ctx ); return 0; } @@ -363,7 +363,7 @@ int dapli_mix_qp_free(ib_qp_handle_t m_qp) msg.op = MIX_QP_FREE; msg.status = 0; msg.flags = MIX_OP_REQ; - msg.req_id = m_qp->sqp_id; + msg.req_id = m_qp->qp_id; len = sizeof(dat_mix_hdr_t); ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); @@ -472,23 +472,66 @@ int dapli_mix_cq_free(ib_cq_handle_t m_cq) return 0; } +/* TODO: change for aperture/mapped memory, optimize */ +int dapli_mix_cq_poll(ib_cq_handle_t m_cq) +{ + dat_mix_dto_comp_t msg; + scif_epd_t mix_ep = m_cq->tp->scif_ep; + int ret, len; + + /* request */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CQ_POLL; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.cq_id = m_cq->cq_id; + msg.cq_ctx = m_cq->cq_ctx; + msg.wc_cnt = 1; + + len = sizeof(dat_mix_dto_comp_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: send on %d, ret %d, exp %d\n", mix_ep, ret, len); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " Sent %d request on SCIF EP p_msg %p\n", msg.hdr.op, &msg); + + /* response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d\n", mix_ep, ret, len); + return -1; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_CQ_POLL || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n", + msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," received reply on SCIF EP\n"); + + return msg.hdr.status; +} + + /**** speed path, TODO optimize, optimize, optimize ****/ int dapli_mix_post_send(ib_qp_handle_t m_qp, int txlen, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr) { dat_mix_send_t msg; /* TODO cache-aligned msg pool instead of stack? */ - scif_epd_t mix_ep = m_qp->tp->scif_ep; /* TODO maybe scif_dto_ep ? */ + scif_epd_t mix_ep = m_qp->tp->scif_ep; int ret, len, i; - dapl_log(DAPL_DBG_TYPE_EXTENSION," MIX_SEND, sge=%d len=%d\n", mix_ep); + dapl_log(DAPL_DBG_TYPE_EXTENSION," MIX_SEND, sge=%d len=%d\n", wr->num_sge, txlen); /* POST SEND request, send recv for now, optimize later with pre-registered WR memory pool */ + /* cm thread processes replies, todo process immediate errors here with recv? */ msg.hdr.ver = DAT_MIX_VER; msg.hdr.op = MIX_SEND; msg.hdr.status = 0; msg.hdr.flags = MIX_OP_REQ; msg.len = txlen; - msg.qp_id = m_qp->sqp_id; - msg.qp_ctx = m_qp->sqp_ctx; + msg.qp_id = m_qp->qp_id; + msg.qp_ctx = m_qp->qp_ctx; memcpy(&msg.wr, wr, sizeof(*wr)); if (wr->opcode & IBV_WR_SEND) @@ -503,43 +546,49 @@ int dapli_mix_post_send(ib_qp_handle_t m_qp, int txlen, struct ibv_send_wr *wr, return -1; } for (i=0; i < wr->num_sge && (msg.hdr.flags & MIX_OP_INLINE); i++) { - dapl_log(1, " MSG_SEND sge[%d] addr %p, len %d\n", - i, wr->sg_list[i].addr, wr->sg_list[i].length); + dapl_log(1, " MSG_SEND sge[%d] addr %p, len %d -> scif_send (ep=%d)\n", + i, wr->sg_list[i].addr, wr->sg_list[i].length, mix_ep); ret = scif_send(mix_ep, (void*)wr->sg_list[i].addr, wr->sg_list[i].length, SCIF_SEND_BLOCK); if (ret != wr->sg_list[i].length) dapl_log(1, " ERR: send on %d, ret %d, exp %d\n", mix_ep, ret, wr->sg_list[i].length); } dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent MIX_SEND on SCIF EP %d, len=%d\n", mix_ep, txlen); - - /* POST SEND response */ - ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); - if (ret != len) { - dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d\n", mix_ep, ret, len); - return -1; - } - if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_SEND || - msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { - dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n", - msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status); - return -1; - } - dapl_log(DAPL_DBG_TYPE_EXTENSION," received MIX_SEND reply on SCIF EP\n"); - return 0; } +/* MIX CM operations: + * Messaging channel (scif_un_ep) for unsolicited messages/events, + * Operation channel (scif_ep) for req/rep operations + */ + +/* locate CM object by context, address of object for now--- TODO change to ID */ +dp_ib_cm_handle_t dapli_mix_get_cm(ib_hca_transport_t *tp, uint64_t cm_ctx) +{ + dp_ib_cm_handle_t cm = NULL; + + dapl_os_lock(&tp->lock); + if (!dapl_llist_is_empty(&tp->list)) + cm = dapl_llist_peek_head(&tp->list); + + while (cm) { + if (cm == (void*)cm_ctx) + break; + cm = dapl_llist_next_entry(&tp->list, &cm->local_entry); + } + dapl_os_unlock(&tp->lock); -/* MIX CM operations: use CM channel on SCIF */ + return cm; +} -/* MIX_CM_REQ */ +/* MIX_CM_REQ operation */ int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp) { dat_mix_cm_t msg; - scif_epd_t mix_ep = m_cm->tp->scif_cm_ep; /* use cm channel */ + scif_epd_t mix_ep = m_cm->tp->scif_ep; /* use op channel */ int ret, len; /* request: QP_r local, QP_t shadowed */ @@ -547,7 +596,7 @@ int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp) msg.hdr.op = MIX_CM_REQ; msg.hdr.status = 0; msg.hdr.flags = MIX_OP_REQ; - msg.qp_id = m_qp->sqp_id; + msg.qp_id = m_qp->qp_id; msg.cm_id = m_cm->cm_id; msg.cm_ctx = (uint64_t)m_cm; memcpy(&msg.msg, &m_cm->msg, sizeof(dat_mcm_msg_t)); @@ -585,11 +634,11 @@ int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp) return 0; } -/* MIX_CM_RTU */ +/* MIX_CM_RTU message, scif_un_ep channel */ int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm) { dat_mix_cm_t msg; - scif_epd_t mix_ep = m_cm->tp->scif_cm_ep; /* use cm channel */ + scif_epd_t mix_ep = m_cm->tp->scif_un_ep; int ret, len; /* connect RTU: QP_r local, QP_t shadowed */ @@ -613,26 +662,7 @@ int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm) return 0; } -/* locate CM object by context, address of object for now--- TODO change to ID */ -dp_ib_cm_handle_t dapli_mix_get_cm(ib_hca_transport_t *tp, uint64_t cm_ctx) -{ - dp_ib_cm_handle_t cm = NULL; - - dapl_os_lock(&tp->lock); - if (!dapl_llist_is_empty(&tp->list)) - cm = dapl_llist_peek_head(&tp->list); - - while (cm) { - if (cm == (void*)cm_ctx) - break; - - cm = dapl_llist_next_entry(&tp->list, &cm->local_entry); - } - dapl_os_unlock(&tp->lock); - - return cm; -} - +/* unsolicited CM event, scif_un_ep channel */ int dapli_mix_cm_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_event_t *pmsg) { int len, ret; @@ -683,14 +713,14 @@ int dapli_mix_cm_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm return 0; } -int dapli_mix_dto_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_dto_event_t *pmsg) +/* unsolicited DTO event, scif_un_ep channel */ +int dapli_mix_dto_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_dto_comp_t *pmsg) { - int len, ret; + int len, ret, i; struct dcm_ib_cq *m_cq; - DAT_EVENT *event; /* hdr already read, get operation data */ - len = sizeof(dat_mix_dto_event_t) - sizeof(dat_mix_hdr_t); + len = sizeof(dat_mix_dto_comp_t) - sizeof(dat_mix_hdr_t); ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d\n", ret, len); @@ -700,9 +730,10 @@ int dapli_mix_dto_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_d " MIX_DTO_EVENT <-: id %d ctx %p \n", pmsg->cq_id, pmsg->cq_ctx); /* Get cq and post DTO event with this WC entry */ - m_cq = pmsg->cq_ctx; + m_cq = (void*)pmsg->cq_ctx; - dapls_evd_cqe_to_event(m_cq->evd, &pmsg->wc); + for (i=0; i< pmsg->wc_cnt; i++) + dapls_evd_cqe_to_event(m_cq->evd, &pmsg->wc[i]); return 0; } @@ -735,8 +766,7 @@ int dapli_mix_cm_rep_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t } /* - * MIX recv, unsolicited messages from MPXYD, operations and CM messages - * scif_ep will be set accordingly + * MIX recv, unsolicited messages from MPXYD on tp->scif_un_ep * */ int dapli_mix_recv(DAPL_HCA *hca, int scif_ep) @@ -758,17 +788,15 @@ int dapli_mix_recv(DAPL_HCA *hca, int scif_ep) switch (phdr->op) { case MIX_DTO_EVENT: - ret = dapli_mix_dto_event_in(tp, scif_ep, (dat_mix_dto_event_t*)phdr); + ret = dapli_mix_dto_event_in(tp, scif_ep, (dat_mix_dto_comp_t*)phdr); break; case MIX_CM_EVENT: ret = dapli_mix_cm_event_in(tp, scif_ep, (dat_mix_cm_event_t*)phdr); break; - case MIX_CM_REQ: case MIX_CM_REP: ret = dapli_mix_cm_rep_in(tp, scif_ep, (dat_mix_cm_t*)phdr); break; - case MIX_CM_ACCEPT: case MIX_CM_REJECT: case MIX_CM_RTU: diff --git a/dapl/svc/mpxyd.c b/dapl/svc/mpxyd.c index 82acf66..4a89e1a 100644 --- a/dapl/svc/mpxyd.c +++ b/dapl/svc/mpxyd.c @@ -231,7 +231,7 @@ typedef struct mcm_scif_dev { uint16_t cm_id; /* port ID MIC client, md->ports */ uint64_t *ports; /* EP port space MIC client */ scif_epd_t scif_ep; /* SCIF EP, MIX device operations */ - scif_epd_t scif_cm_ep; /* SCIF CM EP, MIX device CM messages */ + scif_epd_t scif_un_ep; /* SCIF CM EP, MIX device CM messages */ struct scif_portID peer; /* SCIF EP peer, MIC adapter */ struct scif_portID peer_cm; /* SCIF CM EP peer, MIC adapter */ char *m_buf; /* MIC proxy buffer, SCIF and IB */ @@ -1172,7 +1172,7 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e pthread_mutex_lock(&md->plock); smd->scif_ep = op_ep; - smd->scif_cm_ep = cm_ep; + smd->scif_un_ep = cm_ep; smd->cm_id = mcm_get_port(md->ports, 0, (uint64_t)smd); pthread_mutex_unlock(&md->plock); @@ -1280,9 +1280,9 @@ static void mix_close_device(mcm_ib_dev_t *md, mcm_scif_dev_t *smd) scif_close(smd->scif_ep); smd->scif_ep = 0; } - if (smd->scif_cm_ep) { - scif_close(smd->scif_cm_ep); - smd->scif_cm_ep = 0; + if (smd->scif_un_ep) { + scif_close(smd->scif_un_ep); + smd->scif_un_ep = 0; } mcm_destroy_smd(smd); @@ -1584,6 +1584,57 @@ resp: return 0; } +/* poll CQ and rearm */ +static int mix_cq_poll_notify(mcm_scif_dev_t *smd, dat_mix_dto_comp_t *pmsg) +{ + /* todo: for dat_evd_wait */ + return 0; + +} + +/* poll CQ, fits in header, TODO, this will go away when optimizing with mem-mappings */ +static int mix_cq_poll(mcm_scif_dev_t *smd, dat_mix_dto_comp_t *pmsg) +{ + int len, ret, pcnt; + struct mcm_cq *m_cq; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_dto_comp_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(smd->scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + mlog(0, " ERR: ret %d, exp %d\n", ret, len); + return ret; + } + mlog(1, " MIX_CQ_POLL: cq_id 0x%x, cq_ctx %p\n", pmsg->cq_id, pmsg->cq_ctx); + + /* get CQ object, set WC polling count */ + m_cq = pmsg->cq_ctx; + if (pmsg->wc_cnt > DAT_MIX_WC_MAX) + pcnt = DAT_MIX_WC_MAX; + else + pcnt = pmsg->wc_cnt; + + pmsg->wc_cnt = ibv_poll_cq(m_cq->ib_cq, pcnt, pmsg->wc); + mlog(1," completions = %d \n", ret); + if (pmsg->wc_cnt < 0) + pmsg->hdr.status = MIX_EFAULT; /* todo: add a errno convert to MIX */ + else + pmsg->hdr.status = 0; /* success, check pmsg->wc_cnt WC's */ + + /* send back response, with client context */ + pmsg->cq_id = m_cq->cq_id; + pmsg->cq_ctx = m_cq->cq_ctx; + pmsg->hdr.flags = MIX_OP_RSP; + len = sizeof(dat_mix_dto_comp_t); + ret = scif_send(smd->scif_ep, pmsg, len, SCIF_SEND_BLOCK); + if (ret != len) { + mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", smd->scif_ep, ret, len); + return ret; + } + return 0; +} + + /* destroy proxy QP, fits in hdr */ static int mix_qp_destroy(mcm_scif_dev_t *smd, dat_mix_hdr_t *pmsg) { @@ -1719,7 +1770,7 @@ static int mix_qp_create(mcm_scif_dev_t *smd, dat_mix_qp_t *pmsg) pmsg->hdr.status = MIX_SUCCESS; mlog(1, " QP_t - qpn %x, q_id %d, sq %d,%d rq %d,%d scq_id %d\n", - pmsg->qp_t.qp_num, m_qp->entry.tid, pmsg->qp_t.max_send_wr, + pmsg->qp_t.qp_num, pmsg->qp_t.qp_id, pmsg->qp_t.max_send_wr, pmsg->qp_t.max_send_sge, pmsg->qp_t.max_recv_wr, pmsg->qp_t.max_recv_sge, m_cq->entry.tid); @@ -1744,7 +1795,7 @@ resp: static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc) { - dat_mix_dto_event_t msg; + dat_mix_dto_comp_t msg; int ret, len, i; /* send DTO events to MIC client */ @@ -1755,17 +1806,18 @@ static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc) msg.cq_ctx = m_cq->cq_ctx; for (i=0; i < nc; i++) { + msg.wc_cnt = 1; /* todo, batch in groups of MIX_WC_MAX */ memcpy(&msg.wc, &wc[i], sizeof(*wc)); - len = sizeof(dat_mix_dto_event_t); - ret = scif_send(m_cq->smd->scif_ep, &msg, len, SCIF_SEND_BLOCK); + len = sizeof(dat_mix_dto_comp_t); + ret = scif_send(m_cq->smd->scif_un_ep, &msg, len, SCIF_SEND_BLOCK); /* BUG? have to send back on cm_ep ?? */ if (ret != len) { mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", - m_cq->smd->scif_ep, ret, len); + m_cq->smd->scif_un_ep, ret, len); return; } - mlog(0, " MIX_DTO_EVENT (ep=%d,sz=%d): cq %p id %d ctx %p stat %d op %d ln %d wr_id %p\n", - m_cq->smd->scif_ep, len, m_cq, msg.cq_id, msg.cq_ctx, - msg.wc.status, msg.wc.opcode, msg.wc.byte_len, msg.wc.wr_id); + mlog(0, " MIX_DTO_EVENT (ep=%d,sz=%d): cq %p id %d ctx %p stat %d op %d ln %d wr_id %p wc's %d\n", + m_cq->smd->scif_un_ep, len, m_cq, msg.cq_id, msg.cq_ctx, + msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len, msg.wc[i].wr_id, msg.wc_cnt); } } @@ -1783,17 +1835,17 @@ static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event) msg.event = event; len = sizeof(dat_mix_cm_event_t); - ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume small msgs are sent without waiting on receiver */ + ret = scif_send(m_cm->smd->scif_un_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume small msgs are sent without waiting on receiver */ if (ret != len) { - mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len); + mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_un_ep, ret, len); return -1; } mlog(0, " MIX_CM_EVENT: cm %p cm_id %d, ctx %p, event 0x%x\n", m_cm, msg.cm_id, msg.cm_ctx, event); return 0; } -/* New connection request, create CM object */ -static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) +/* New connection request operation, create CM object */ +static int mix_cm_req(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) { int len, ret; struct mcm_qp *m_qp; @@ -1801,7 +1853,7 @@ static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_cm_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(smd->scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -1859,9 +1911,9 @@ resp: /* send back response */ pmsg->hdr.flags = MIX_OP_RSP; len = sizeof(dat_mix_cm_t); - ret = scif_send(smd->scif_cm_ep, pmsg, len, SCIF_SEND_BLOCK); + ret = scif_send(smd->scif_ep, pmsg, len, SCIF_SEND_BLOCK); if (ret != len) { - mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", smd->scif_cm_ep, ret, len); + mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", smd->scif_ep, ret, len); return ret; } mlog(0, " MIX_CM_REQ_OUT: MPXYD id 0x%x, ctx %p - MIC id 0x%x, ctx %p\n", @@ -1870,7 +1922,7 @@ resp: return 0; } -/* Active, reply received, send RTU */ +/* Active, reply received, send RTU, unsolicited channel */ static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) { int len, ret; @@ -1878,7 +1930,7 @@ static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) /* hdr already read, get operation data */ len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); - ret = scif_recv(smd->scif_cm_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + ret = scif_recv(smd->scif_un_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { mlog(0, " ERR: ret %d, exp %d\n", ret, len); return ret; @@ -1904,6 +1956,7 @@ static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg) return 0; } +/* ACTIVE: CR reply from server, unsolicited channel */ static int mix_cm_reply_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) { dat_mix_cm_t msg; @@ -1938,9 +1991,9 @@ static int mix_cm_reply_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len) goto err; len = sizeof(dat_mix_cm_t); - ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK); + ret = scif_send(m_cm->smd->scif_un_ep, &msg, len, SCIF_SEND_BLOCK); if (ret != len) { - mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len); + mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_un_ep, ret, len); return -1; } mlog(0, " success cm_id %d\n", msg.cm_id); @@ -1954,6 +2007,7 @@ err: return -1; } +/* disconnect request from peer, unsolicited channel */ static int mix_cm_disc_in(mcm_cm_t *m_cm) { dat_mix_hdr_t msg; @@ -1966,16 +2020,16 @@ static int mix_cm_disc_in(mcm_cm_t *m_cm) msg.req_id = m_cm->cm_id; len = sizeof(dat_mix_hdr_t); - ret = scif_send(m_cm->smd->scif_cm_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume sent without waiting on receiver */ + ret = scif_send(m_cm->smd->scif_un_ep, &msg, len, SCIF_SEND_BLOCK); /* ??? assume sent without waiting on receiver */ if (ret != len) { - mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_cm_ep, ret, len); + mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d\n", m_cm->smd->scif_un_ep, ret, len); return -1; } mlog(0, " MIX_CM_DISC_IN: cm_id %d\n", msg.req_id); return 0; } -/* Active, reply received, send RTU */ +/* Post SEND message request, operation channel */ static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) { int len, ret; @@ -1987,11 +2041,12 @@ static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t); ret = scif_recv(smd->scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); if (ret != len) { - mlog(0, " ERR: ret %d, exp %d\n", ret, len); - goto err; + mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len); + return -1; } - mlog(1, " MIX_POST_SEND: q_id %d, q_ctx %p, len %d\n", - pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len); + mlog(1, " enter: q_id %d, q_ctx %p, len %d, wr_id %p, sge %d, op %x flgs %x \n", + pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, pmsg->wr.wr_id, + pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags); m_qp = (struct mcm_qp*)pmsg->qp_ctx; /* trust me !!! */ @@ -1999,42 +2054,34 @@ static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg) len = pmsg->len; ret = scif_recv(smd->scif_ep, smd->m_buf, len, SCIF_RECV_BLOCK); if (ret != len) { - mlog(0, " ERR: ret %d, exp %d\n", ret, len); - goto err; + mlog(0, " ERR: scif_recv inline DATA, ret %d, exp %d\n", ret, len); + return -1; } pmsg->wr.sg_list = &sge; + pmsg->wr.num_sge = 1; sge.addr = (uint64_t)smd->m_mr->addr; sge.lkey = smd->m_mr->lkey; sge.length = len; ret = ibv_post_send(m_qp->ib_qp, &pmsg->wr, &bad_wr); if (ret) { - mlog(0, " ERR: ret %d, exp %d\n", ret, len); - goto err; - } - pmsg->hdr.status = MIX_SUCCESS; - goto resp; -err: - mlog(0, " ERR: %s\n", strerror(errno)); - pmsg->hdr.status = MIX_EINVAL; -resp: - /* send back immediate errors */ - pmsg->hdr.flags = MIX_OP_RSP; - len = sizeof(dat_mix_hdr_t); - ret = scif_send(smd->scif_ep, pmsg, len, SCIF_SEND_BLOCK); - if (ret != len) { - mlog(0, " ERR: send on scif_ep %d, ret %d, exp %d\n", smd->scif_cm_ep, ret, len); - return ret; - } + struct ibv_wc wc; + mlog(0, " ERR ibv_post_send: %s - wr_id %p\n", strerror(errno), pmsg->wr.wr_id); - /* if post fails return error */ - mlog(1, " MIX_POST_SEND: success q_id %d, q_ctx %p, len %d\n", - pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len); + wc.wr_id = pmsg->wr.wr_id; + wc.byte_len = 0; + wc.status = IBV_WC_GENERAL_ERR; + wc.opcode = IBV_WC_SEND; + wc.vendor_err = ret; + mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1); + } + mlog(1, " exit: q_id %d, q_ctx %p, len %d\n", + pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len); return 0; } -/* receive MIX operations on connected SCIF endpoint */ +/* receive data on connected SCIF endpoints, operation and unsolicited channels */ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) { char cmd[DAT_MIX_MSG_MAX]; @@ -2045,7 +2092,7 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK); if ((ret != len) || (phdr->ver != DAT_MIX_VER)) { mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n", - smd->scif_ep, ret, len, phdr->ver); + scif_ep, ret, len, phdr->ver); return -1; } mlog(0, " ver %d, op %d, flags %d\n", phdr->ver, phdr->op, phdr->flags); @@ -2068,6 +2115,9 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) case MIX_CQ_FREE: ret = mix_cq_destroy(smd, phdr); break; + case MIX_CQ_POLL: + ret = mix_cq_poll(smd, (dat_mix_dto_comp_t *)phdr); + break; case MIX_SEND: ret = mix_post_send(smd, (dat_mix_send_t *)phdr); break; @@ -2078,7 +2128,7 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep) ret = mix_listen_free(smd, phdr); break; case MIX_CM_REQ: - ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr); + ret = mix_cm_req(smd, (dat_mix_cm_t *)phdr); break; case MIX_CM_REP: case MIX_CM_ACCEPT: @@ -2792,7 +2842,7 @@ void mcm_cq_event(struct mcm_cq *m_cq) ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx); mlog(1," m_cq %p == cq_ctx %p \n", m_cq, cq_ctx); if (ret == 0) { - ibv_ack_cq_events(m_cq->ib_ch, 1); + ibv_ack_cq_events(m_cq->ib_cq, 1); } retry: ret = ibv_poll_cq(m_cq->ib_cq, 10, wc); @@ -2809,7 +2859,6 @@ retry: mix_dto_event(m_cq, wc, ret); goto retry; - } @@ -2868,8 +2917,8 @@ static void mpxy_server(void) m_cq = get_next_entry(&m_cq->entry, &smd->cqlist); } pthread_mutex_unlock(&smd->cqlock); + mcm_fd_set(smd->scif_un_ep, set, POLLIN); mcm_fd_set(smd->scif_ep, set, POLLIN); - mcm_fd_set(smd->scif_cm_ep, set, POLLIN); mcm_check_timers(smd, &time_ms); smd = get_next_entry(&smd->entry, &md->smd_list); } @@ -2907,14 +2956,12 @@ static void mpxy_server(void) m_cq = get_next_entry(&m_cq->entry, &smd->cqlist); } pthread_mutex_unlock(&smd->cqlock); - ret = mcm_poll(smd->scif_ep, POLLIN); /* OP */ + ret = mcm_poll(smd->scif_un_ep, POLLIN); /* unsolicited msgs */ + if (ret == POLLIN) + ret = mix_scif_recv(smd, smd->scif_un_ep); + ret = mcm_poll(smd->scif_ep, POLLIN); /* operations */ if (ret == POLLIN) ret = mix_scif_recv(smd, smd->scif_ep); - if (!ret) { - ret = mcm_poll(smd->scif_cm_ep, POLLIN); /* CM */ - if (ret == POLLIN) - ret = mix_scif_recv(smd, smd->scif_cm_ep); - } next = get_next_entry(&smd->entry, &md->smd_list); if (ret) mix_close_device(md, smd); diff --git a/dat/include/dat2/dat_mic_extensions.h b/dat/include/dat2/dat_mic_extensions.h index de37860..c407b20 100755 --- a/dat/include/dat2/dat_mic_extensions.h +++ b/dat/include/dat2/dat_mic_extensions.h @@ -185,6 +185,8 @@ typedef enum dat_mix_ops MIX_QP_FREE, MIX_CQ_CREATE, MIX_CQ_FREE, + MIX_CQ_POLL, + MIX_CQ_POLL_NOTIFY, MIX_CM_REQ, MIX_CM_REP, MIX_CM_ACCEPT, @@ -196,9 +198,44 @@ typedef enum dat_mix_ops MIX_CM_EVENT, MIX_DTO_EVENT, MIX_SEND, + MIX_WRITE, } dat_mix_ops_t; +static inline char * mix_op_str(IN int op) +{ + static char *mix_ops[] = { + "", + "", + "IA_OPEN", + "IA_CLOSE", + "LISTEN", + "LISTEN_FREE", + "MR_CREATE", + "MR_FREE", + "QP_CREATE", + "QP_MODIFY", + "QP_FREE", + "CQ_CREATE", + "CQ_FREE", + "CQ_POLL", + "CQ_POLL_NOTIFY", + "CM_REQ", + "CM_REP", + "CM_ACCEPT", + "CM_REJECT", + "CM_RTU", + "CM_EST", + "CM_DISC", + "CM_DREP", + "CM_EVENT", + "DTO_EVENT", + "POST_SEND", + "POST_WRITE", + }; + return ((op < 3 || op > 25) ? "Invalid OP?" : mix_ops[op]); +} + typedef enum dat_mix_op_flags { MIX_OP_REQ = 0x01, @@ -374,22 +411,24 @@ typedef struct dat_mix_cm_event } dat_mix_cm_event_t; -typedef struct dat_mix_dto_event +#define DAT_MIX_WC_MAX 4 +typedef struct dat_mix_dto_comp { dat_mix_hdr_t hdr; uint64_t cq_ctx; uint32_t cq_id; - struct ibv_wc wc; + uint32_t wc_cnt; + struct ibv_wc wc[DAT_MIX_WC_MAX]; -} dat_mix_dto_event_t; +} dat_mix_dto_comp_t; #define DAT_MIX_SGE_MAX 7 typedef struct dat_mix_send { dat_mix_hdr_t hdr; + uint64_t qp_ctx; uint32_t qp_id; uint32_t len; - uint64_t qp_ctx; struct ibv_send_wr wr; struct ibv_sge sge[DAT_MIX_SGE_MAX]; diff --git a/test/dtest/dtest.c b/test/dtest/dtest.c index 7a658ba..fa9725f 100755 --- a/test/dtest/dtest.c +++ b/test/dtest/dtest.c @@ -1144,12 +1144,6 @@ no_resolution: } else LOGPRINTF("%d send_msg completed\n", getpid()); - if (collect_event(h_dto_req_evd, - &event, - DTO_TIMEOUT, - &poll_count) != DAT_SUCCESS) - return (DAT_ABORT); - printf("%d send event completed, waiting for received message \n", getpid()); -- 2.41.0