From 574a863607d1bad07aa4b97b1f35ce2b13a53df7 Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Mon, 21 Jul 2014 10:58:37 -0700 Subject: [PATCH] add new dapl MIC provider (MCM) to support MIC RDMA proxy services Provider supports all modes of connectivity and will setup data paths based on endpoint locality and platform constraints. Provides transparent DAT API support for RDMA writes, RDMA write with immediate data, Sends, and Recvs. No RDMA read or atomic support. To use MCM provider an application can use the new ofa-v2-mcm device definations in dat.conf. Intel MPSS is required for for MCM provider build and usage. The following shows connectivity modes and data paths: HST -> HST to HCA MSS -> MIC to HCA same socket MXS -> MIC to HCA cross socket 1. HST->HST: Xeon->HCA->fabric->HCA->Xeon (direct->direct) HST<-HST: Xeon<-HCA<-fabric<-HCA<-Xeon (direct<-direct) 2. MSS->MSS: KNC->Xeon->HCA->fabric->HCA->KNC (proxy->direct) MSS<-MSS: KNC<-HCA<-fabric<-HCA<-Xeon<-KNC (direct<-proxy) 3. MSX->MSX: KNC->Xeon->HCA->fabric->HCA->Xeon->KNC (proxy->proxy) MSX<-MSX: KNC<-Xeon<-HCA<-fabric<-HCA<-Xeon<-KNC (proxy<-proxy) 4. MSS->MSX: KNC->Xeon->HCA->fabric->HCA->Xeon->KNC (proxy->proxy) MSS<-MXS: KNC<-HCA<-fabric<-HCA<-Xeon<-KNC (direct<-proxy) 5. MSS->HST: KNC->Xeon->HCA->fabric->HCA->Xeon (proxy->direct) MSS<-HST: KNC<-HCA<-fabric<-HCA<-Xeon (direct<-direct) 6. MSX->HST: KNC->Xeon->HCA->fabric->HCA->Xeon (proxy->direct) MSX<-HST: KNC<-Xeon<-HCA<-fabric<-HCA<-Xeon (proxy<-direct) Signed-off-by: Arlin Davis --- dapl/openib_mcm/cm.c | 2325 ++++++++++++++++++++++++++++ dapl/openib_mcm/dapl_ib_util.h | 206 +++ dapl/openib_mcm/device.c | 620 ++++++++ dapl/openib_mcm/linux/openib_osd.h | 37 + dapl/openib_mcm/mix.c | 1293 ++++++++++++++++ dapl/openib_mcm/proxy.c | 501 ++++++ 6 files changed, 4982 insertions(+) create mode 100644 dapl/openib_mcm/cm.c create mode 100644 dapl/openib_mcm/dapl_ib_util.h create mode 100644 dapl/openib_mcm/device.c create mode 100644 dapl/openib_mcm/linux/openib_osd.h create mode 100644 dapl/openib_mcm/mix.c create mode 100644 dapl/openib_mcm/proxy.c diff --git a/dapl/openib_mcm/cm.c b/dapl/openib_mcm/cm.c new file mode 100644 index 0000000..4c6778a --- /dev/null +++ b/dapl/openib_mcm/cm.c @@ -0,0 +1,2325 @@ +/* + * Copyright (c) 2009-2014 Intel Corporation. All rights reserved. + * + * This Software is licensed under one of the following licenses: + * + * 1) under the terms of the "Common Public License 1.0" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/cpl.php. + * + * 2) under the terms of the "The BSD License" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/bsd-license.php. + * + * 3) under the terms of the "GNU General Public License (GPL) Version 2" a + * copy of which is available from the Open Source Initiative, see + * http://www.opensource.org/licenses/gpl-license.php. + * + * Licensee has the right to choose one of the above licenses. + * + * Redistributions of source code must retain the above copyright + * notice and one of the license notices. + * + * Redistributions in binary form must reproduce both the above copyright + * notice, one of the license notices in the documentation + * and/or other materials provided with the distribution. + */ + +#include "dapl.h" +#include "dapl_adapter_util.h" +#include "dapl_evd_util.h" +#include "dapl_cr_util.h" +#include "dapl_name_service.h" +#include "dapl_ib_util.h" +#include "dapl_ep_util.h" +#include "dapl_osd.h" + +extern char gid_str[INET6_ADDRSTRLEN]; + +enum DAPL_FD_EVENTS { + DAPL_FD_READ = POLLIN, + DAPL_FD_WRITE = POLLOUT, + DAPL_FD_ERROR = POLLERR +}; + +struct dapl_fd_set { + int index; + struct pollfd set[DAPL_FD_SETSIZE]; +}; + +static struct dapl_fd_set *dapl_alloc_fd_set(void) +{ + return dapl_os_alloc(sizeof(struct dapl_fd_set)); +} + +static void dapl_fd_zero(struct dapl_fd_set *set) +{ + set->index = 0; +} + +static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set, + enum DAPL_FD_EVENTS event) +{ + if (!s) + return 0; + + if (set->index == DAPL_FD_SETSIZE - 1) { + dapl_log(DAPL_DBG_TYPE_ERR, + " ERR: cm_thread exceeded FD_SETSIZE %d\n", + set->index + 1); + return -1; + } + + set->set[set->index].fd = s; + set->set[set->index].revents = 0; + set->set[set->index++].events = event; + return 0; +} + +static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event) +{ + struct pollfd fds; + int ret; + + if (!s) + return 0; + + fds.fd = s; + fds.events = event; + fds.revents = 0; + ret = poll(&fds, 1, 0); + dapl_log(DAPL_DBG_TYPE_THREAD, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n", + s, ret, fds.revents); + if (ret == 0) + return 0; + else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL)) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " dapl_poll: ERR: fd=%d ret=%d, revent=0x%x\n", + s,ret,fds.revents); + return DAPL_FD_ERROR; + } else + return fds.revents; +} + +static int dapl_select(struct dapl_fd_set *set, int time_ms) +{ + int ret; + + dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: sleep, fds=%d\n", set->index); + ret = poll(set->set, set->index, time_ms); + dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: wakeup, ret=0x%x\n", ret); + return ret; +} + +/* forward declarations */ +static int mcm_reply(dp_ib_cm_handle_t cm); +static void mcm_accept(ib_cm_srvc_handle_t cm, dat_mcm_msg_t *msg); +static void mcm_accept_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg); +static int mcm_send(ib_hca_transport_t *tp, dat_mcm_msg_t *msg, DAT_PVOID p_data, DAT_COUNT p_size); +DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm); +DAT_RETURN dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm); +static void mcm_log_addrs(int lvl, struct dat_mcm_msg *msg, int state, int in); + +/* Service ids - port space */ +static uint16_t mcm_get_port(ib_hca_transport_t *tp, uint16_t port) +{ + int i = 0; + + dapl_os_lock(&tp->plock); + /* get specific ID */ + if (port) { + if (tp->sid[port] == 0) { + tp->sid[port] = 1; + i = port; + } + goto done; + } + + /* get any free ID */ + for (i = 0xffff; i > 0; i--) { + if (tp->sid[i] == 0) { + tp->sid[i] = 1; + break; + } + } +done: + dapl_os_unlock(&tp->plock); + return i; +} + +static void mcm_free_port(ib_hca_transport_t *tp, uint16_t port) +{ + dapl_os_lock(&tp->plock); + tp->sid[port] = 0; + dapl_os_unlock(&tp->plock); +} + +static void mcm_check_timers(dp_ib_cm_handle_t cm, int *timer) +{ + DAPL_OS_TIMEVAL time; + + if (cm->tp->scif_ep) /* CM timers running on MPXYD */ + return; + + dapl_os_lock(&cm->lock); + dapl_os_get_time(&time); + switch (cm->state) { + case MCM_REP_PENDING: + *timer = cm->hca->ib_trans.cm_timer; + /* wait longer each retry */ + if ((time - cm->timer)/1000 > + (cm->hca->ib_trans.rep_time << cm->retries)) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " CM_REQ retry %p %d [lid, port, cqp, iqp]:" + " %x %x %x %x -> %x %x %x %x Time(ms) %d > %d\n", + cm, cm->retries+1, + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr1.qpn), + (time - cm->timer)/1000, + cm->hca->ib_trans.rep_time << cm->retries); + cm->retries++; + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_REQ_RETRY); + dapl_os_unlock(&cm->lock); + dapli_cm_connect(cm->ep, cm); + return; + } + break; + case MCM_RTU_PENDING: + *timer = cm->hca->ib_trans.cm_timer; + if ((time - cm->timer)/1000 > + (cm->hca->ib_trans.rtu_time << cm->retries)) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " CM_REPLY retry %d %s [lid, port, cqp, iqp]:" + " %x %x %x %x -> %x %x %x %x r_pid %x Time(ms) %d > %d\n", + cm->retries+1, + dapl_cm_op_str(ntohs(cm->msg.op)), + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id), + (time - cm->timer)/1000, + cm->hca->ib_trans.rtu_time << cm->retries); + cm->retries++; + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_REP_RETRY); + dapl_os_unlock(&cm->lock); + mcm_reply(cm); + return; + } + break; + case MCM_DISC_PENDING: + *timer = cm->hca->ib_trans.cm_timer; + /* wait longer each retry */ + if ((time - cm->timer)/1000 > + (cm->hca->ib_trans.rtu_time << cm->retries)) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " CM_DREQ retry %d [lid, port, cqp, iqp]:" + " %x %x %x %x -> %x %x %x %x r_pid %x Time(ms) %d > %d\n", + cm->retries+1, + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id), + (time - cm->timer)/1000, + cm->hca->ib_trans.rtu_time << cm->retries); + cm->retries++; + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREQ_RETRY); + dapl_os_unlock(&cm->lock); + dapli_cm_disconnect(cm); + return; + } + break; + default: + break; + } + dapl_os_unlock(&cm->lock); +} + +/* SEND CM MESSAGE PROCESSING */ + +/* Get CM UD message from send queue, called with s_lock held */ +static dat_mcm_msg_t *mcm_get_smsg(ib_hca_transport_t *tp) +{ + dat_mcm_msg_t *msg = NULL; + int ret, polled = 1, hd = tp->s_hd; + + hd++; + + if (hd == tp->qpe) + hd = 0; +retry: + if (hd == tp->s_tl) { + msg = NULL; + if (polled % 1000000 == 0) + dapl_log(DAPL_DBG_TYPE_WARN, + " mcm_get_smsg: FULLq hd %d == tl %d," + " completions stalled, polls=%d\n", + hd, tp->s_tl, polled); + } + else { + msg = &tp->sbuf[hd]; + tp->s_hd = hd; /* new hd */ + } + + /* if empty, process some completions */ + if (msg == NULL) { + struct ibv_wc wc; + + /* process completions, based on mcm_TX_BURST */ + ret = ibv_poll_cq(tp->scq, 1, &wc); + if (ret < 0) { + dapl_log(DAPL_DBG_TYPE_WARN, + " get_smsg: cq %p %s\n", + tp->scq, strerror(errno)); + return NULL; + } + /* free up completed sends, update tail */ + if (ret > 0) + tp->s_tl = (int)wc.wr_id; + + polled++; + goto retry; + } + DAPL_CNTR_DATA(((DAPL_IA *)dapl_llist_peek_head(&tp->hca->ia_list_head)), DCNT_IA_CM_ERR_REQ_FULLQ, polled > 1 ? 1:0); + DAPL_CNTR_DATA(((DAPL_IA *)dapl_llist_peek_head(&tp->hca->ia_list_head)), DCNT_IA_CM_REQ_FULLQ_POLL, polled - 1); + return msg; +} + +/* RECEIVE CM MESSAGE PROCESSING */ + +static int mcm_post_rmsg(ib_hca_transport_t *tp, dat_mcm_msg_t *msg) +{ + struct ibv_recv_wr recv_wr, *recv_err; + struct ibv_sge sge; + + recv_wr.next = NULL; + recv_wr.sg_list = &sge; + recv_wr.num_sge = 1; + recv_wr.wr_id = (uint64_t)(uintptr_t) msg; + sge.length = sizeof(dat_mcm_msg_t) + sizeof(struct ibv_grh); + sge.lkey = tp->mr_rbuf->lkey; + sge.addr = (uintptr_t)((char *)msg - sizeof(struct ibv_grh)); + + return (ibv_post_recv(tp->qp, &recv_wr, &recv_err)); +} + +static int mcm_reject(ib_hca_transport_t *tp, dat_mcm_msg_t *msg) +{ + dat_mcm_msg_t smsg; + + /* setup op, rearrange the src, dst cm and addr info */ + (void)dapl_os_memzero(&smsg, sizeof(smsg)); + smsg.ver = htons(DAT_MCM_VER); + smsg.op = htons(MCM_REJ_CM); + smsg.dport = msg->sport; + smsg.dqpn = msg->sqpn; + smsg.sport = msg->dport; + smsg.sqpn = msg->dqpn; + + dapl_os_memcpy(&smsg.daddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); + + /* no dst_addr IB info in REQ, init lid, gid, get type from saddr1 */ + smsg.saddr1.lid = tp->addr.lid; + smsg.saddr1.qp_type = msg->saddr1.qp_type; + dapl_os_memcpy(&smsg.saddr1.gid[0], + &tp->addr.gid, 16); + + dapl_os_memcpy(&smsg.saddr1, &msg->daddr1, sizeof(dat_mcm_addr_t)); + + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " CM reject -> LID %x, QPN %x PORT %x\n", + ntohs(smsg.daddr1.lid), + ntohl(smsg.dqpn), ntohs(smsg.dport)); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&tp->hca->ia_list_head)), DCNT_IA_CM_ERR_REJ_TX); + return (mcm_send(tp, &smsg, NULL, 0)); +} + +void mcm_process_recv(ib_hca_transport_t *tp, + dat_mcm_msg_t *msg, + dp_ib_cm_handle_t cm) +{ + dapl_os_lock(&cm->lock); + switch (cm->state) { + case MCM_LISTEN: /* passive */ + dapl_os_unlock(&cm->lock); + mcm_accept(cm, msg); + break; + case MCM_RTU_PENDING: /* passive */ + dapl_os_unlock(&cm->lock); + mcm_accept_rtu(cm, msg); + break; + case MCM_REP_PENDING: /* active */ + dapl_os_unlock(&cm->lock); + mcm_connect_rtu(cm, msg); + break; + case MCM_CONNECTED: /* active and passive */ + /* DREQ, change state and process */ + cm->retries = 2; + if (ntohs(msg->op) == MCM_DREQ) { + cm->state = MCM_DISC_RECV; + dapl_os_unlock(&cm->lock); + dapli_cm_disconnect(cm); + break; + } + /* active: RTU was dropped, resend */ + if (ntohs(msg->op) == MCM_REP) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " RESEND RTU: op %s st %s [lid, port, cqp, iqp]:" + " %x %x %x %x -> %x %x %x %x r_pid %x\n", + dapl_cm_op_str(ntohs(cm->msg.op)), + dapl_cm_state_str(cm->state), + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id)); + + cm->msg.op = htons(MCM_RTU); + mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_RTU_RETRY); + } + dapl_os_unlock(&cm->lock); + break; + case MCM_DISC_PENDING: /* active and passive */ + /* DREQ or DREP, finalize */ + dapl_os_unlock(&cm->lock); + mcm_disconnect_final(cm); + break; + case MCM_DISCONNECTED: + case MCM_FREE: + /* DREQ dropped, resend */ + if (ntohs(msg->op) == MCM_DREQ) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " RESEND DREP: op %s st %s [lid, port, qpn]:" + " %x %x %x -> %x %x %x\n", + dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->saddr1.lid), + ntohs(msg->sport), + ntohl(msg->saddr1.qpn), + ntohs(msg->daddr1.lid), + ntohs(msg->dport), + ntohl(msg->daddr1.qpn)); + cm->msg.op = htons(MCM_DREP); + mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREP_RETRY); + + } else if (ntohs(msg->op) != MCM_DREP){ + /* DREP ok to ignore, any other print warning */ + dapl_log(DAPL_DBG_TYPE_WARN, + " mcm_recv: UNEXPECTED MSG on cm %p" + " <- op %s, st %s spsp %x sqpn %x\n", + cm, dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->sport), ntohl(msg->sqpn)); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_UNEXPECTED); + } + dapl_os_unlock(&cm->lock); + break; + case MCM_REJECTED: + if (ntohs(msg->op) == MCM_REJ_USER) { + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_USER_REJ_RX); + dapl_os_unlock(&cm->lock); + break; + } + default: + dapl_log(DAPL_DBG_TYPE_WARN, + " mcm_recv: Warning, UNKNOWN state" + " <- op %s, %s spsp %x sqpn %x slid %x\n", + dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->sport), ntohl(msg->sqpn), + ntohs(msg->saddr1.lid)); + dapl_os_unlock(&cm->lock); + break; + } +} + +/* Find matching CM object for this receive message, return CM reference, timer */ +dp_ib_cm_handle_t mcm_cm_find(ib_hca_transport_t *tp, dat_mcm_msg_t *msg) +{ + dp_ib_cm_handle_t cm = NULL, next, found = NULL; + struct dapl_llist_entry **list; + DAPL_OS_LOCK *lock; + int listenq = 0; + + /* conn list first, duplicate requests for MCM_REQ */ + list = &tp->list; + lock = &tp->lock; + +retry_listenq: + dapl_os_lock(lock); + if (!dapl_llist_is_empty(list)) + next = dapl_llist_peek_head(list); + else + next = NULL; + + while (next) { + cm = next; + next = dapl_llist_next_entry(list, + (DAPL_LLIST_ENTRY *)&cm->local_entry); + if (cm->state == MCM_DESTROY || cm->state == MCM_FREE) + continue; + + /* CM sPORT + QPN, match is good enough for listenq */ + if (listenq && + cm->msg.sport == msg->dport && + cm->msg.sqpn == msg->dqpn) { + found = cm; + break; + } + /* connectq, check src and dst plus id's, check duplicate conn_reqs */ + if (!listenq && + cm->msg.sport == msg->dport && cm->msg.sqpn == msg->dqpn && + cm->msg.dport == msg->sport && cm->msg.dqpn == msg->sqpn && + cm->msg.daddr1.lid == msg->saddr1.lid) { + if (ntohs(msg->op) != MCM_REQ) { + found = cm; + break; + } else { + /* duplicate; bail and throw away */ + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " DUPLICATE: cm %p op %s (%s) st %s" + " [lid, port, cqp, iqp]:" + " %x %x %x %x <- (%x %x %x %x :" + " %x %x %x %x) -> %x %x %x %x\n", + cm, dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_op_str(ntohs(cm->msg.op)), + dapl_cm_state_str(cm->state), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr1.qpn), + ntohs(msg->saddr1.lid), ntohs(msg->sport), + ntohl(msg->sqpn), ntohl(msg->saddr1.qpn), + ntohs(msg->daddr1.lid), ntohs(msg->dport), + ntohl(msg->dqpn), ntohl(msg->daddr1.qpn), + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr1.qpn)); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), + DCNT_IA_CM_ERR_REQ_DUP); + + dapl_os_unlock(lock); + return NULL; + } + } + } + dapl_os_unlock(lock); + /* no duplicate request on connq, check listenq for new request */ + if (ntohs(msg->op) == MCM_REQ && !listenq && !found) { + listenq = 1; + list = &tp->llist; + lock = &tp->llock; + goto retry_listenq; + } + + /* not match on listenq for valid request, send reject */ + if (ntohs(msg->op) == MCM_REQ && !found) { + dapl_log(DAPL_DBG_TYPE_WARN, + " mcm_recv: NO LISTENER for %s %x %x i%x c%x" + " < %x %x %x, sending reject\n", + dapl_cm_op_str(ntohs(msg->op)), + ntohs(msg->daddr1.lid), ntohs(msg->dport), + ntohl(msg->daddr1.qpn), ntohl(msg->sqpn), + ntohs(msg->saddr1.lid), ntohs(msg->sport), + ntohl(msg->saddr1.qpn)); + + mcm_reject(tp, msg); + } + + if (!found) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " NO MATCH: op %s [lid, port, cqp, iqp, pid]:" + " %x %x %x %x %x <- %x %x %x %x l_pid %x r_pid %x\n", + dapl_cm_op_str(ntohs(msg->op)), + ntohs(msg->daddr1.lid), ntohs(msg->dport), + ntohl(msg->dqpn), ntohl(msg->daddr1.qpn), + ntohl(msg->d_id), ntohs(msg->saddr1.lid), + ntohs(msg->sport), ntohl(msg->sqpn), + ntohl(msg->saddr1.qpn), ntohl(msg->s_id), + ntohl(msg->d_id)); + + if (ntohs(msg->op) == MCM_DREP) { + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&tp->hca->ia_list_head)), DCNT_IA_CM_ERR_DREP_DUP); + } + } + + return found; +} + +/* Get rmsgs from CM completion queue, 10 at a time */ +static void mcm_recv(ib_hca_transport_t *tp) +{ + struct ibv_wc wc[10]; + dat_mcm_msg_t *msg; + dp_ib_cm_handle_t cm; + int i, ret, notify = 0; + struct ibv_cq *ibv_cq = NULL; + DAPL_HCA *hca; + + /* POLLIN on channel FD */ + ret = ibv_get_cq_event(tp->rch, &ibv_cq, (void *)&hca); + if (ret == 0) { + ibv_ack_cq_events(ibv_cq, 1); + } +retry: + ret = ibv_poll_cq(tp->rcq, 10, wc); + if (ret <= 0) { + if (!ret && !notify) { + ibv_req_notify_cq(tp->rcq, 0); + notify = 1; + goto retry; + } + return; + } else + notify = 0; + + for (i = 0; i < ret; i++) { + msg = (dat_mcm_msg_t*) (uintptr_t) wc[i].wr_id; + + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " mcm_recv: stat=%d op=%s ln=%d id=%p qp2=%x\n", + wc[i].status, dapl_cm_op_str(ntohs(msg->op)), + wc[i].byte_len, + (void*)wc[i].wr_id, wc[i].src_qp); + + /* validate CM message, version */ + if (ntohs(msg->ver) != DAT_MCM_VER) { + dapl_log(DAPL_DBG_TYPE_WARN, + " mcm_recv: UNKNOWN msg %p, ver %d\n", + msg, msg->ver); + mcm_post_rmsg(tp, msg); + continue; + } + if (!(cm = mcm_cm_find(tp, msg))) { + mcm_post_rmsg(tp, msg); + continue; + } + + /* match, process it */ + mcm_process_recv(tp, msg, cm); + mcm_post_rmsg(tp, msg); + } + + /* finished this batch of WC's, poll and rearm */ + goto retry; +} + +/* ACTIVE/PASSIVE: build and send CM message out of CM object */ +static int mcm_send(ib_hca_transport_t *tp, dat_mcm_msg_t *msg, DAT_PVOID p_data, DAT_COUNT p_size) +{ + dat_mcm_msg_t *smsg = NULL; + struct ibv_send_wr wr, *bad_wr; + struct ibv_sge sge; + int len, ret = -1; + uint16_t dlid = ntohs(msg->daddr1.lid); + + /* Get message from send queue, copy data, and send */ + dapl_os_lock(&tp->slock); + if ((smsg = mcm_get_smsg(tp)) == NULL) { + dapl_log(DAPL_DBG_TYPE_ERR, + " mcm_send ERR: get_smsg(hd=%d,tl=%d) \n", + tp->s_hd, tp->s_tl); + goto bail; + } + + len = sizeof(dat_mcm_msg_t); + dapl_os_memcpy(smsg, msg, len); + if (p_size) { + smsg->p_size = ntohs(p_size); + dapl_os_memcpy(&smsg->p_data, p_data, p_size); + } else + smsg->p_size = 0; + + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; + wr.opcode = IBV_WR_SEND; + wr.wr_id = (unsigned long)tp->s_hd; + wr.send_flags = (wr.wr_id % tp->burst) ? 0 : IBV_SEND_SIGNALED; + if (len <= tp->max_inline_send) + wr.send_flags |= IBV_SEND_INLINE; + + sge.length = len; + sge.lkey = tp->mr_sbuf->lkey; + sge.addr = (uintptr_t)smsg; + + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " mcm_send: op %s ln %d lid %x c_qpn %x rport %x\n", + dapl_cm_op_str(ntohs(smsg->op)), + sge.length, htons(smsg->daddr1.lid), + htonl(smsg->dqpn), htons(smsg->dport)); + + /* empty slot, then create AH */ + if (!tp->ah[dlid]) { + tp->ah[dlid] = + dapls_create_ah(tp->hca, tp->pd, tp->qp, + htons(dlid), NULL); + if (!tp->ah[dlid]) + goto bail; + } + + wr.wr.ud.ah = tp->ah[dlid]; + wr.wr.ud.remote_qpn = ntohl(smsg->dqpn); + wr.wr.ud.remote_qkey = DAT_MCM_UD_QKEY; + + ret = ibv_post_send(tp->qp, &wr, &bad_wr); + if (ret) { + dapl_log(DAPL_DBG_TYPE_ERR, + " mcm_send ERR: post_send() %s\n", + strerror(errno) ); + } + +bail: + dapl_os_unlock(&tp->slock); + return ret; +} + +/* ACTIVE/PASSIVE: CM objects */ +static void dapli_cm_dealloc(dp_ib_cm_handle_t cm) { + + dapl_os_assert(!cm->ref_count); + dapl_os_lock_destroy(&cm->lock); + dapl_os_wait_object_destroy(&cm->d_event); + dapl_os_wait_object_destroy(&cm->f_event); + dapl_os_free(cm, sizeof(*cm)); +} + +void dapls_cm_acquire(dp_ib_cm_handle_t cm) +{ + dapl_os_lock(&cm->lock); + cm->ref_count++; + dapl_os_unlock(&cm->lock); +} + +void dapls_cm_release(dp_ib_cm_handle_t cm) +{ + dapl_os_lock(&cm->lock); + cm->ref_count--; + + if (cm->ref_count) { + if (cm->ref_count == 1) + dapl_os_wait_object_wakeup(&cm->f_event); + dapl_os_unlock(&cm->lock); + return; + } + + /* active, release local conn id port, if exists on client */ + if (!cm->sp && cm->msg.sport && cm->tp->sid) + mcm_free_port(cm->tp, ntohs(cm->msg.sport)); + + /* clean up any UD address handles */ + if (cm->ah) { + ibv_destroy_ah(cm->ah); + cm->ah = NULL; + } + dapl_os_unlock(&cm->lock); + dapli_cm_dealloc(cm); +} + +dp_ib_cm_handle_t dapls_cm_create(DAPL_HCA *hca, DAPL_EP *ep) +{ + dp_ib_cm_handle_t cm; + + /* Allocate CM, init lock, and initialize */ + if ((cm = dapl_os_alloc(sizeof(*cm))) == NULL) + return NULL; + + (void)dapl_os_memzero(cm, sizeof(*cm)); + if (dapl_os_lock_init(&cm->lock)) + goto bail; + + if (dapl_os_wait_object_init(&cm->f_event)) { + dapl_os_lock_destroy(&cm->lock); + goto bail; + } + if (dapl_os_wait_object_init(&cm->d_event)) { + dapl_os_lock_destroy(&cm->lock); + dapl_os_wait_object_destroy(&cm->f_event); + goto bail; + } + dapls_cm_acquire(cm); + cm->hca = hca; + cm->tp = &hca->ib_trans; + cm->msg.ver = htons(DAT_MCM_VER); + cm->msg.s_id = htonl(dapl_os_getpid()); /* process id for src id */ + cm->msg.sys_guid = hca->ib_trans.sys_guid; + + /* ACTIVE: init source address QP info from local EP */ + if (ep) { + if (!hca->ib_trans.scif_ep) { /* CM service local and not on MPXYD */ + + cm->msg.sport = htons(mcm_get_port(&hca->ib_trans, 0)); + if (!cm->msg.sport) { + dapl_os_wait_object_destroy(&cm->f_event); + dapl_os_wait_object_destroy(&cm->d_event); + dapl_os_lock_destroy(&cm->lock); + goto bail; + } + cm->msg.sqpn = htonl(hca->ib_trans.qp->qp_num); /* ucm */ + cm->msg.saddr2.qpn = htonl(ep->qp_handle->qp2->qp_num); /* QPt */ + cm->msg.saddr2.qp_type = ep->qp_handle->qp->qp_type; + cm->msg.saddr2.lid = hca->ib_trans.addr.lid; + cm->msg.saddr2.ep_map = hca->ib_trans.addr.ep_map; + dapl_os_memcpy(&cm->msg.saddr2.gid[0], + &hca->ib_trans.addr.gid, 16); + + } + /* QPr is on proxy when xsocket from device */ + if (!MXS_EP(&hca->ib_trans.addr)) { + cm->msg.saddr1.qpn = htonl(ep->qp_handle->qp->qp_num); /* QPr local*/ + cm->msg.saddr1.qp_type = ep->qp_handle->qp->qp_type; + cm->msg.saddr1.lid = hca->ib_trans.addr.lid; + cm->msg.saddr1.ep_map = hca->ib_trans.addr.ep_map; + dapl_os_memcpy(&cm->msg.saddr1.gid[0], + &hca->ib_trans.addr.gid, 16); + } + + /* link CM object to EP */ + dapl_ep_link_cm(ep, cm); + cm->ep = ep; + } + return cm; +bail: + dapl_os_free(cm, sizeof(*cm)); + return NULL; +} + +/* schedule destruction of CM object */ +void dapli_cm_free(dp_ib_cm_handle_t cm) +{ + dapl_os_lock(&cm->lock); + cm->state = MCM_FREE; + dapls_thread_signal(&cm->hca->ib_trans.signal); + dapl_os_unlock(&cm->lock); +} + +/* Blocking, ONLY called from dat_ep_free */ +void dapls_cm_free(dp_ib_cm_handle_t cm) +{ + /* free from internal workq, wait until EP is last ref */ + dapl_os_lock(&cm->lock); + cm->state = MCM_FREE; + if (cm->ref_count != 1) { + dapl_os_unlock(&cm->lock); + dapls_thread_signal(&cm->hca->ib_trans.signal); + dapl_os_wait_object_wait(&cm->f_event, DAT_TIMEOUT_INFINITE); + dapl_os_lock(&cm->lock); + } + dapl_os_unlock(&cm->lock); + + /* unlink, dequeue from EP. Final ref so release will destroy */ + dapl_ep_unlink_cm(cm->ep, cm); +} + +/* ACTIVE/PASSIVE: queue up connection object on CM list */ +void dapli_queue_conn(dp_ib_cm_handle_t cm) +{ + /* add to work queue, list, for cm thread processing */ + dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm->local_entry); + dapl_os_lock(&cm->hca->ib_trans.lock); + dapls_cm_acquire(cm); + dapl_llist_add_tail(&cm->hca->ib_trans.list, + (DAPL_LLIST_ENTRY *)&cm->local_entry, cm); + dapl_os_unlock(&cm->hca->ib_trans.lock); + if (!cm->hca->ib_trans.scif_ep) + dapls_thread_signal(&cm->hca->ib_trans.signal); +} + +/* PASSIVE: queue up listen object on listen list */ +static void dapli_queue_listen(dp_ib_cm_handle_t cm) +{ + /* add to work queue, llist, for cm thread processing */ + dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm->local_entry); + dapl_os_lock(&cm->hca->ib_trans.llock); + dapls_cm_acquire(cm); + dapl_llist_add_tail(&cm->hca->ib_trans.llist, + (DAPL_LLIST_ENTRY *)&cm->local_entry, cm); + dapl_os_unlock(&cm->hca->ib_trans.llock); +} + +static void dapli_dequeue_listen(dp_ib_cm_handle_t cm) +{ + DAPL_HCA *hca = cm->hca; + + dapl_os_lock(&hca->ib_trans.llock); + dapl_llist_remove_entry(&hca->ib_trans.llist, + (DAPL_LLIST_ENTRY *)&cm->local_entry); + dapls_cm_release(cm); + dapl_os_unlock(&hca->ib_trans.llock); +} + +/* called with local LIST and CM object lock */ +void dapli_cm_dequeue(dp_ib_cm_handle_t cm) +{ + /* Remove from work queue, cr thread processing */ + dapl_llist_remove_entry(&cm->hca->ib_trans.list, + (DAPL_LLIST_ENTRY *)&cm->local_entry); + dapls_cm_release(cm); +} + +void mcm_disconnect_final(dp_ib_cm_handle_t cm) +{ + /* no EP attachment or not RC, nothing to process */ + if (cm->ep == NULL || + cm->ep->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC) + return; + + dapl_os_lock(&cm->lock); + if ((cm->state == MCM_DISCONNECTED) || (cm->state == MCM_FREE)) { + dapl_os_unlock(&cm->lock); + return; + } + + cm->state = MCM_DISCONNECTED; + dapl_os_unlock(&cm->lock); + + if (cm->sp) + dapls_cr_callback(cm, IB_CME_DISCONNECTED, NULL, 0, cm->sp); + else + dapl_evd_connection_callback(cm, IB_CME_DISCONNECTED, NULL, 0, cm->ep); + + dapl_os_wait_object_wakeup(&cm->d_event); + +} + +/* + * called from consumer thread via ep_disconnect/ep_free or + * from cm_thread when receiving DREQ + */ +DAT_RETURN dapli_cm_disconnect(dp_ib_cm_handle_t cm) +{ + int finalize = 1; + int wakeup = 0; + + dapl_os_lock(&cm->lock); + switch (cm->state) { + case MCM_CONNECTED: + /* CONSUMER: move to err state to flush, if not UD */ + if (cm->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) + dapls_modify_qp_state(cm->ep->qp_handle->qp, IBV_QPS_ERR,0,0,0); + + /* send DREQ, event after DREP or DREQ timeout */ + cm->state = MCM_DISC_PENDING; + cm->msg.op = htons(MCM_DREQ); + finalize = 0; /* wait for DREP, wakeup timer after DREQ sent */ + wakeup = 1; + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_DREQ_TX); + break; + case MCM_DISC_PENDING: + /* DREQ timeout, resend until retries exhausted */ + cm->msg.op = htons(MCM_DREQ); + if (cm->retries >= cm->hca->ib_trans.retries) { + dapl_log(DAPL_DBG_TYPE_ERR, + " CM_DREQ: RETRIES EXHAUSTED:" + " %x %x %x -> %x %x %x\n", + htons(cm->msg.saddr1.lid), + htonl(cm->msg.saddr1.qpn), + htons(cm->msg.sport), + htons(cm->msg.daddr1.lid), + htonl(cm->msg.dqpn), + htons(cm->msg.dport)); + finalize = 1; + } + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_DREQ_RETRY); + break; + case MCM_DISC_RECV: + /* CM_THREAD: move to err state to flush, if not UD */ + if (cm->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) + dapls_modify_qp_state(cm->ep->qp_handle->qp, IBV_QPS_ERR,0,0,0); + + /* DREQ received, send DREP and schedule event, finalize */ + cm->msg.op = htons(MCM_DREP); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_DREP_TX); + break; + case MCM_DISCONNECTED: + dapl_os_unlock(&cm->lock); + return DAT_SUCCESS; + default: + dapl_log(DAPL_DBG_TYPE_EP, + " disconnect UNKNOWN state: ep %p cm %p %s %s" + " %x %x %x %s %x %x %x r_id %x l_id %x\n", + cm->ep, cm, + cm->msg.saddr1.qp_type == IBV_QPT_RC ? "RC" : "UD", + dapl_cm_state_str(cm->state), + ntohs(cm->msg.saddr1.lid), + ntohs(cm->msg.sport), + ntohl(cm->msg.saddr1.qpn), + cm->sp ? "<-" : "->", + ntohs(cm->msg.daddr1.lid), + ntohs(cm->msg.dport), + ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id), + ntohl(cm->msg.s_id)); + + dapl_os_unlock(&cm->lock); + return DAT_SUCCESS; + } + + dapl_os_get_time(&cm->timer); /* reply expected */ + mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0); + dapl_os_unlock(&cm->lock); + + if (wakeup) + dapls_thread_signal(&cm->hca->ib_trans.signal); + + if (finalize) + mcm_disconnect_final(cm); + + return DAT_SUCCESS; +} + +/* + * ACTIVE: get remote CM SID server info from r_addr. + * send, or resend CM msg via UD CM QP + */ +DAT_RETURN +dapli_cm_connect(DAPL_EP *ep, dp_ib_cm_handle_t cm) +{ + dapl_log(DAPL_DBG_TYPE_CM, + " MCM connect: lid %x QPr %x QPt %x lport %x p_sz=%d -> " + " lid %x c_qpn %x rport %x ep_map %d %s -> %d %s, retries=%d\n", + htons(cm->tp->addr.lid), htonl(cm->msg.saddr1.qpn), + htonl(cm->msg.saddr2.qpn), + htons(cm->msg.sport), htons(cm->msg.p_size), + htons(cm->msg.daddr1.lid), htonl(cm->msg.dqpn), + htons(cm->msg.dport), + cm->tp->addr.ep_map, mcm_map_str(cm->tp->addr.ep_map), + cm->msg.daddr1.ep_map, mcm_map_str(cm->msg.daddr1.ep_map), + cm->tp->retries); + + dapl_os_lock(&cm->lock); + if (cm->state != MCM_INIT && cm->state != MCM_REP_PENDING) { + dapl_os_unlock(&cm->lock); + return DAT_INVALID_STATE; + } + + if (cm->retries == cm->hca->ib_trans.retries) { + dapl_log(DAPL_DBG_TYPE_ERR, + " CM_REQ: RETRIES (%d) EXHAUSTED:" + " 0x%x %x 0x%x -> 0x%x %x 0x%x\n", + cm->retries, htons(cm->msg.saddr1.lid), + htonl(cm->msg.saddr1.qpn), + htons(cm->msg.sport), + htons(cm->msg.daddr1.lid), + htonl(cm->msg.dqpn), + htons(cm->msg.dport)); + + dapl_os_unlock(&cm->lock); + +#ifdef DAPL_COUNTERS + /* called from check_timers in cm_thread, cm lock held */ + if (g_dapl_dbg_type & DAPL_DBG_TYPE_CM_LIST) { + dapl_os_unlock(&cm->hca->ib_trans.lock); + dapls_print_cm_list(ep->header.owner_ia); + dapl_os_lock(&cm->hca->ib_trans.lock); + } +#endif + dapl_evd_connection_callback(cm, + IB_CME_DESTINATION_UNREACHABLE, + NULL, 0, ep); + + return DAT_ERROR(DAT_INVALID_ADDRESS, + DAT_INVALID_ADDRESS_UNREACHABLE); + } + + cm->state = MCM_REP_PENDING; + cm->msg.op = htons(MCM_REQ); + dapl_os_get_time(&cm->timer); /* reset reply timer */ + + if (cm->tp->scif_ep) { /* MIC: proxy CR to MPXYD */ + if (dapli_mix_cm_req_out(cm, ep->qp_handle)) + goto bail; + } else { + if (mcm_send(&cm->hca->ib_trans, &cm->msg, + &cm->msg.p_data, ntohs(cm->msg.p_size))) + goto bail; + } + dapl_os_unlock(&cm->lock); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), + ep->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC ? + DCNT_IA_CM_AH_REQ_TX : DCNT_IA_CM_REQ_TX); + + return DAT_SUCCESS; + +bail: + dapl_os_unlock(&cm->lock); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR); + dapl_log(DAPL_DBG_TYPE_WARN, + " connect: snd ERR -> cm_lid %x cm_qpn %x r_psp %x p_sz=%d\n", + htons(cm->msg.daddr1.lid), + htonl(cm->msg.dqpn), htons(cm->msg.dport), + htons(cm->msg.p_size)); + + dapli_cm_free(cm); + return DAT_INSUFFICIENT_RESOURCES; +} + +/* + * ACTIVE: exchange QP information, called from CR thread + */ +void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg) +{ + DAPL_EP *ep = cm->ep; + ib_cm_events_t event = IB_CME_CONNECTED; + DAT_RETURN ret; + + dapl_os_lock(&cm->lock); + if (cm->state != MCM_REP_PENDING) { + dapl_log(DAPL_DBG_TYPE_WARN, + " CONN_RTU: UNEXPECTED state:" + " op %s, st %s <- lid %x sqpn %x sport %x\n", + dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->saddr1.lid), ntohl(msg->saddr1.qpn), + ntohs(msg->sport)); + dapl_os_unlock(&cm->lock); + return; + } + + /* CM_REP: save remote address information to EP and CM */ + cm->msg.d_id = msg->s_id; + dapl_os_memcpy(&ep->remote_ia_address, &msg->saddr2, sizeof(dat_mcm_addr_t)); + dapl_os_memcpy(&cm->msg.daddr2, &msg->saddr2, sizeof(dat_mcm_addr_t)); + dapl_os_memcpy(&cm->msg.daddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); + dapl_os_memcpy(&cm->msg.p_proxy, &msg->p_proxy, DAT_MCM_PROXY_DATA); + + /* validate private data size, and copy if necessary */ + if (msg->p_size) { + if (ntohs(msg->p_size) > DAT_MCM_PDATA_SIZE) { + dapl_log(DAPL_DBG_TYPE_WARN, + " CONN_RTU: invalid p_size %d:" + " st %s <- lid %x sqpn %x s2qpn %x spsp %x\n", + ntohs(msg->p_size), + dapl_cm_state_str(cm->state), + ntohs(msg->saddr1.lid), + ntohl(msg->saddr1.qpn), + ntohl(msg->saddr2.qpn), + ntohs(msg->sport)); + dapl_os_unlock(&cm->lock); + goto bail; + } + dapl_os_memcpy(cm->msg.p_data, msg->p_data, ntohs(msg->p_size)); + } + + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " CONN_RTU: DST lid=%x, QPr=%x, QPt=%x qp_type=%d, port=%x psize=%d\n", + ntohs(cm->msg.daddr1.lid), ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.daddr2.qpn), cm->msg.daddr1.qp_type, + ntohs(msg->sport), ntohs(msg->p_size)); + + if (ntohs(msg->op) == MCM_REP) + event = IB_CME_CONNECTED; + else if (ntohs(msg->op) == MCM_REJ_USER) + event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA; + else { + dapl_log(DAPL_DBG_TYPE_WARN, + " Warning, non-user CR REJECT:" + " cm %p op %s, st %s dlid %x iqp %x iqp2 %xport %x <-" + " slid %x iqp %x port %x\n", cm, + dapl_cm_op_str(ntohs(msg->op)), dapl_cm_state_str(cm->state), + ntohs(msg->daddr1.lid), ntohl(msg->daddr1.qpn),ntohl(msg->daddr2.qpn), + ntohs(msg->dport), ntohs(msg->saddr1.lid), ntohl(msg->saddr1.qpn), + ntohs(msg->sport)); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR_REJ_RX); + event = IB_CME_DESTINATION_REJECT; + } + if (event != IB_CME_CONNECTED) { + dapl_log(DAPL_DBG_TYPE_CM, + " ACTIVE: CM_REQ REJECTED:" + " cm %p op %s, st %s dlid %x iqp %x port %x <-" + " slid %x iqp %x port %x\n", cm, + dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->daddr1.lid), ntohl(msg->daddr1.qpn), + ntohs(msg->dport), ntohs(msg->saddr1.lid), + ntohl(msg->saddr1.qpn), ntohs(msg->sport)); + + cm->state = MCM_REJECTED; + dapl_os_unlock(&cm->lock); + goto bail; + } + dapl_os_unlock(&cm->lock); + + /* QP to RTR-RTS with remote QPt (daddr2) info */ + dapl_os_lock(&cm->ep->header.lock); + + if (!MXS_EP(&cm->hca->ib_trans.addr)) { + ret = dapls_modify_qp_rtu(cm->ep->qp_handle->qp, + cm->msg.daddr2.qpn, + cm->msg.daddr2.lid, + (ib_gid_handle_t)cm->msg.daddr2.gid); + if (ret != DAT_SUCCESS) { + dapl_os_unlock(&cm->ep->header.lock); + event = IB_CME_LOCAL_FAILURE; + goto bail; + } + } + + /* QP to RTR-RTS with remote QPr (daddr1) info */ + if (!cm->tp->scif_ep) { /* NON-MIC, qp2 is local and not on MPXYD */ + ret = dapls_modify_qp_rtu( + cm->ep->qp_handle->qp2, + cm->msg.daddr1.qpn, + cm->msg.daddr1.lid, + (ib_gid_handle_t)cm->msg.daddr1.gid); + if (ret != DAT_SUCCESS) { + dapl_os_unlock(&cm->ep->header.lock); + event = IB_CME_LOCAL_FAILURE; + goto bail; + } + /* MXS peer: setup PI WC and save peer WR queue info */ + if (MXS_EP(&cm->msg.daddr1)) { + /* save PI WR info, create local WC_q, send back WC info */ + mcm_ntoh_wrc(&ep->qp_handle->wrc_rem, (mcm_wrc_info_t*)cm->msg.p_proxy); + mcm_create_wc_q(ep->qp_handle, ep->qp_handle->wrc_rem.wr_end + 1); + mcm_hton_wrc((mcm_wrc_info_t*)cm->msg.p_proxy, &ep->qp_handle->wrc); + ep->qp_handle->ep_map = cm->msg.daddr1.ep_map; + + /* post 0-byte rcv for inbound WC's via RW_imm */ + if (mcm_post_rcv_wc(ep->qp_handle, MCM_WRC_QLEN)) + goto bail; + + dapl_log(DAPL_DBG_TYPE_CM, "CONN_RTU: WR_rem %p sz %d, WC %p sz %d\n", + ep->qp_handle->wrc_rem.wr_addr, + ep->qp_handle->wrc_rem.wr_end+1, + ep->qp_handle->wrc.wc_addr, + ep->qp_handle->wrc.wc_end+1); + } + } + dapl_os_unlock(&cm->ep->header.lock); + + /* Send RTU, no private data */ + cm->msg.op = htons(MCM_RTU); + + dapl_os_lock(&cm->lock); + cm->state = MCM_CONNECTED; + if (cm->tp->scif_ep) { /* MPXYD */ + dapli_mix_cm_rtu_out(cm); + } else { + if (mcm_send(&cm->hca->ib_trans, &cm->msg, NULL, 0)) { + dapl_os_unlock(&cm->lock); + goto bail; + } + } + dapl_os_unlock(&cm->lock); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_RTU_TX); + + /* init cm_handle and post the event with private data */ + dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n"); + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ACTIVE_EST); + dapl_evd_connection_callback(cm, + IB_CME_CONNECTED, + cm->msg.p_data, ntohs(cm->msg.p_size), cm->ep); + + dapl_log(DAPL_DBG_TYPE_CM_EST, + " mcm_ACTIVE_CONN %p %d [lid port qpn] %x %x %x -> %x %x %x %s\n", + cm->hca, cm->retries, ntohs(cm->msg.saddr1.lid), + ntohs(cm->msg.sport), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), mcm_map_str(cm->msg.daddr1.ep_map)); + + mcm_log_addrs(DAPL_DBG_TYPE_CM_EST, &cm->msg, cm->state, 0); + + return; +bail: + dapl_evd_connection_callback(NULL, event, cm->msg.p_data, ntohs(cm->msg.p_size), cm->ep); + dapli_cm_free(cm); +} + +/* + * PASSIVE: Accept on listen CM PSP. + * create new CM object for this CR, + * receive peer QP information, private data, + * and post cr_event + */ +static void mcm_accept(ib_cm_srvc_handle_t cm, dat_mcm_msg_t *msg) +{ + dp_ib_cm_handle_t acm; + + /* Allocate accept CM and setup passive references */ + if ((acm = dapls_cm_create(cm->hca, NULL)) == NULL) { + dapl_log(DAPL_DBG_TYPE_WARN, " accept: ERR cm_create\n"); + return; + } + + /* dest CM info from CR msg, source CM info from listen */ + acm->sp = cm->sp; + acm->hca = cm->hca; + acm->tp = cm->tp; + acm->msg.op = msg->op; + acm->msg.dport = msg->sport; + acm->msg.dqpn = msg->sqpn; + acm->msg.sport = cm->msg.sport; + acm->msg.sqpn = cm->msg.sqpn; + acm->msg.p_size = msg->p_size; + acm->msg.d_id = msg->s_id; + acm->msg.rd_in = msg->rd_in; + + /* CR saddr1 is CM daddr1 info, need EP for local saddr1 */ + dapl_os_memcpy(&acm->msg.daddr1, &msg->saddr1, sizeof(dat_mcm_addr_t)); + dapl_os_memcpy(&acm->msg.daddr2, &msg->saddr2, sizeof(dat_mcm_addr_t)); + dapl_os_memcpy(&acm->msg.p_proxy, &msg->p_proxy, DAT_MCM_PROXY_DATA); + + dapl_log(DAPL_DBG_TYPE_CM, + " accept: DST port=%x lid=%x, iqp=%x, iqp2=%x, psize=%d\n", + ntohs(acm->msg.dport), ntohs(acm->msg.daddr1.lid), + htonl(acm->msg.daddr1.qpn), htonl(acm->msg.daddr2.qpn), htons(acm->msg.p_size)); + + /* validate private data size before reading */ + if (ntohs(msg->p_size) > DAT_MCM_PDATA_SIZE) { + dapl_log(DAPL_DBG_TYPE_WARN, " accept: psize (%d) wrong\n", + ntohs(msg->p_size)); + goto bail; + } + + /* read private data into cm_handle if any present */ + if (msg->p_size) + dapl_os_memcpy(acm->msg.p_data, + msg->p_data, ntohs(msg->p_size)); + + acm->state = MCM_ACCEPTING; + dapli_queue_conn(acm); + + /* trigger CR event and return SUCCESS */ + dapls_cr_callback(acm, + IB_CME_CONNECTION_REQUEST_PENDING, + acm->msg.p_data, ntohs(msg->p_size), acm->sp); + return; +bail: + /* schedule work thread cleanup */ + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR); + dapli_cm_free(acm); + return; +} + +/* + * PASSIVE: read RTU from active peer, post CONN event + */ +static void mcm_accept_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg) +{ + dapl_os_lock(&cm->lock); + if ((ntohs(msg->op) != MCM_RTU) || (cm->state != MCM_RTU_PENDING)) { + dapl_log(DAPL_DBG_TYPE_WARN, + " accept_rtu: UNEXPECTED op, state:" + " op %s, st %s <- lid %x iqp %x iqp2 %x sport %x\n", + dapl_cm_op_str(ntohs(msg->op)), + dapl_cm_state_str(cm->state), + ntohs(msg->saddr1.lid), ntohl(msg->saddr1.qpn), + ntohl(msg->saddr1.qpn), ntohs(msg->sport)); + dapl_os_unlock(&cm->lock); + goto bail; + } + cm->state = MCM_CONNECTED; + dapl_os_unlock(&cm->lock); + + /* final data exchange if remote QP state is good to go */ + dapl_dbg_log(DAPL_DBG_TYPE_CM, " PASSIVE: connected!\n"); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_PASSIVE_EST); + + dapls_cr_callback(cm, IB_CME_CONNECTED, NULL, 0, cm->sp); + + dapl_log(DAPL_DBG_TYPE_CM_EST, + " PASSIVE_CONN %p %d [lid port qpn] %x %x %x <- %x %x %x %s\n", + cm->hca, cm->retries, ntohs(cm->msg.saddr1.lid), + ntohs(cm->msg.sport), ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.daddr1.lid), ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), mcm_map_str(cm->msg.daddr1.ep_map)); + + mcm_log_addrs(DAPL_DBG_TYPE_CM_EST, &cm->msg, cm->state, 1); + return; +bail: + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), DCNT_IA_CM_ERR); + dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE, NULL, 0, cm->sp); + dapli_cm_free(cm); +} + +/* + * PASSIVE: user accepted, check and re-send reply message, called from cm_thread. + */ +static int mcm_reply(dp_ib_cm_handle_t cm) +{ + dapl_os_lock(&cm->lock); + if (cm->state != MCM_RTU_PENDING) { + dapl_log(DAPL_DBG_TYPE_ERR, + " CM_REPLY: wrong state ep %p cm %p %s refs=%d" + " %x %x i_%x i2 %x -> %x %x i_%x i_2 %x l_pid %x r_pid %x\n", + cm->ep, cm, dapl_cm_state_str(cm->state), + cm->ref_count, + htons(cm->msg.saddr1.lid), htons(cm->msg.sport), + htonl(cm->msg.saddr1.qpn), htonl(cm->msg.saddr2.qpn), + htons(cm->msg.daddr1.lid), htons(cm->msg.dport), + htonl(cm->msg.daddr1.qpn), htonl(cm->msg.daddr2.qpn), + ntohl(cm->msg.s_id), ntohl(cm->msg.d_id)); + dapl_os_unlock(&cm->lock); + return -1; + } + + if (cm->retries == cm->hca->ib_trans.retries) { + dapl_log(DAPL_DBG_TYPE_ERR, + " CM_REPLY: RETRIES EXHAUSTED (lid port qpn)" + " %x %x %x %x -> %x %x %x %x \n", + htons(cm->msg.saddr1.lid), htons(cm->msg.sport), + htonl(cm->msg.saddr1.qpn), htonl(cm->msg.saddr2.qpn), + htons(cm->msg.daddr1.lid), htons(cm->msg.dport), + htonl(cm->msg.daddr1.qpn), htonl(cm->msg.daddr2.qpn)); + + dapl_os_unlock(&cm->lock); + +#ifdef DAPL_COUNTERS + if (g_dapl_dbg_type & DAPL_DBG_TYPE_CM_LIST) { + dapl_os_unlock(&cm->hca->ib_trans.lock); + dapls_print_cm_list(dapl_llist_peek_head(&cm->hca->ia_list_head)); + dapl_os_lock(&cm->hca->ib_trans.lock); + } +#endif + + dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE, NULL, 0, cm->sp); + return -1; + } + + dapl_os_get_time(&cm->timer); /* RTU expected */ + if (mcm_send(&cm->hca->ib_trans, &cm->msg, cm->p_data, cm->p_size)) { + dapl_log(DAPL_DBG_TYPE_ERR," accept ERR: ucm reply send()\n"); + dapl_os_unlock(&cm->lock); + return -1; + } + dapl_os_unlock(&cm->lock); + return 0; +} + + +/* + * PASSIVE: consumer accept, send local QP information, private data, + * queue on work thread to receive RTU information to avoid blocking + * user thread. + */ +DAT_RETURN +dapli_accept_usr(DAPL_EP *ep, DAPL_CR *cr, DAT_COUNT p_size, DAT_PVOID p_data) +{ + DAPL_IA *ia = ep->header.owner_ia; + dp_ib_cm_handle_t cm = cr->ib_cm_handle; + int ret; + + dapl_log(DAPL_DBG_TYPE_CM, + " MCM_ACCEPT_USR: ep %p cm %p QPt %p QPr %p p_data %p p_size %d\n", + ep, cm, ep->qp_handle->qp2, ep->qp_handle->qp, p_data, p_size); + + dapl_log(DAPL_DBG_TYPE_CM, " MCM_ACCEPT_USR: ep %p cm %p %s refs=%d" + " %x %x i_%x i2_%x %s <- %x %x i1_%x i2_%x l_pid %x r_pid %x %s\n", + ep, cm, dapl_cm_state_str(cm->state), cm->ref_count, + htons(cm->hca->ib_trans.addr.lid), htons(cm->msg.sport), + ep->qp_handle->qp ? ep->qp_handle->qp->qp_num:0, + ep->qp_handle->qp2 ? ep->qp_handle->qp2->qp_num:0, + mcm_map_str(cm->hca->ib_trans.addr.ep_map), + htons(cm->msg.daddr1.lid), htons(cm->msg.dport), + htonl(cm->msg.daddr1.qpn), htonl(cm->msg.daddr2.qpn), + ntohl(cm->msg.s_id), ntohl(cm->msg.d_id), + mcm_map_str(cm->msg.daddr1.ep_map)); + + if (p_size > DAT_MCM_PDATA_SIZE) + return DAT_LENGTH_ERROR; + + dapl_os_lock(&cm->lock); + if (cm->state != MCM_ACCEPTING) { + dapl_log(DAPL_DBG_TYPE_ERR, + " CM_ACCEPT_USR: wrong state ep %p cm %p %s refs=%d" + " %x %x i_%x i2_ %x <- %x %x i_%x i2_%x l_pid %x r_pid %x\n", + cm->ep, cm, dapl_cm_state_str(cm->state), cm->ref_count, + htons(cm->hca->ib_trans.addr.lid), htons(cm->msg.sport), + ep->qp_handle->qp ? ep->qp_handle->qp->qp_num:0, + ep->qp_handle->qp2 ? ep->qp_handle->qp2->qp_num:0, + htons(cm->msg.daddr1.lid), htons(cm->msg.dport), + htonl(cm->msg.daddr1.qpn), htonl(cm->msg.daddr2.qpn), + ntohl(cm->msg.s_id), ntohl(cm->msg.d_id)); + dapl_os_unlock(&cm->lock); + return DAT_INVALID_STATE; + } + dapl_os_unlock(&cm->lock); + + dapl_dbg_log(DAPL_DBG_TYPE_CM," ACCEPT_USR: rlid=%x iqp=%x type %d, psize=%d\n", + ntohs(cm->msg.daddr1.lid), ntohl(cm->msg.daddr1.qpn), + cm->msg.daddr1.qp_type, p_size); + + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " ACCEPT_USR: remote GID subnet %s\n", + inet_ntop(AF_INET6, cm->msg.daddr1.gid, + gid_str, sizeof(gid_str))); + + /* rdma_out, initiator, cannot exceed remote rdma_in max */ + ep->param.ep_attr.max_rdma_read_out = + DAPL_MIN(ep->param.ep_attr.max_rdma_read_out, cm->msg.rd_in); + + /* modify QPr to RTR and then to RTS, QPr (qp) to remote QPt (daddr2), !xsocket */ + dapl_os_lock(&ep->header.lock); + if (!MXS_EP(&cm->hca->ib_trans.addr)) { + ret = dapls_modify_qp_rtu(ep->qp_handle->qp, + cm->msg.daddr2.qpn, + cm->msg.daddr2.lid, + (ib_gid_handle_t)cm->msg.daddr2.gid); + if (ret) { + dapl_log(DAPL_DBG_TYPE_ERR, + " ACCEPT_USR: QPS_RTR ERR %s -> lid %x qpn %x\n", + strerror(errno), ntohs(cm->msg.daddr1.lid), + ntohl(cm->msg.daddr1.qpn)); + dapl_os_unlock(&ep->header.lock); + goto bail; + } + } + /* modify QPt to RTR and then to RTS, QPt (qp2) to remote QPr (daddr1) */ + if (!cm->tp->scif_ep) { /* NON-MIC, qp2 is local and not on MPXYD */ + ret = dapls_modify_qp_rtu(ep->qp_handle->qp2, + cm->msg.daddr1.qpn, + cm->msg.daddr1.lid, + (ib_gid_handle_t)cm->msg.daddr1.gid); + if (ret) { + dapl_log(DAPL_DBG_TYPE_ERR, + " ACCEPT_USR: QPS_RTS ERR %s -> lid %x qpn %x\n", + strerror(errno), ntohs(cm->msg.daddr1.lid), + ntohl(cm->msg.daddr1.qpn)); + dapl_os_unlock(&ep->header.lock); + goto bail; + } + cm->msg.saddr2.qpn = htonl(ep->qp_handle->qp2->qp_num); + cm->msg.saddr2.lid = cm->hca->ib_trans.addr.lid; + cm->msg.saddr2.qp_type = ep->qp_handle->qp->qp_type; + cm->msg.saddr2.ep_map = cm->hca->ib_trans.addr.ep_map; + dapl_os_memcpy(&cm->msg.saddr2.gid[0], + &cm->hca->ib_trans.addr.gid, 16); + + /* MXS peer: setup PI WC and save peer WR queue info */ + if (MXS_EP(&cm->msg.daddr1)) { + /* save PI WR info, create local WC_q, send back WC info */ + mcm_ntoh_wrc(&ep->qp_handle->wrc_rem, (mcm_wrc_info_t*)cm->msg.p_proxy); + mcm_create_wc_q(ep->qp_handle, ep->qp_handle->wrc_rem.wr_end + 1); + mcm_hton_wrc((mcm_wrc_info_t*)cm->msg.p_proxy, &ep->qp_handle->wrc); + ep->qp_handle->ep_map = cm->msg.daddr1.ep_map; + + /* post 0-byte rcv for inbound WC's via RW_imm */ + if (mcm_post_rcv_wc(ep->qp_handle, MCM_WRC_QLEN)) + goto bail; + + dapl_log(DAPL_DBG_TYPE_CM, + "ACCEPT_USR: WR_rem %p rkey %x sz %d, WC %p rkey %x sz %d\n", + ep->qp_handle->wrc_rem.wr_addr, + ep->qp_handle->wrc_rem.wr_rkey, + ep->qp_handle->wrc_rem.wr_end+1, + ep->qp_handle->wrc.wc_addr, + ep->qp_handle->wrc.wc_rkey, + ep->qp_handle->wrc.wc_end+1); + } + } + dapl_os_unlock(&ep->header.lock); + + /* save remote address information, QPr */ + dapl_os_memcpy(&ep->remote_ia_address, + &cm->msg.daddr1, sizeof(dat_mcm_addr_t)); + + /* setup local QPr info (if !KR) and type from EP, copy pdata, for reply */ + cm->msg.op = htons(MCM_REP); + cm->msg.rd_in = ep->param.ep_attr.max_rdma_read_in; + + if (!MXS_EP(&cm->hca->ib_trans.addr)) { + cm->msg.saddr1.qpn = htonl(ep->qp_handle->qp->qp_num); + cm->msg.saddr1.qp_type = ep->qp_handle->qp->qp_type; + cm->msg.saddr1.lid = cm->hca->ib_trans.addr.lid; + cm->msg.saddr1.ep_map = cm->hca->ib_trans.addr.ep_map; + dapl_os_memcpy(&cm->msg.saddr1.gid[0], + &cm->hca->ib_trans.addr.gid, 16); + } + + /* + * UD: deliver p_data with REQ and EST event, keep REQ p_data in + * cm->msg.p_data and save REPLY accept data in cm->p_data for retries + */ + cm->p_size = p_size; + dapl_os_memcpy(&cm->p_data, p_data, p_size); + + if (cm->tp->scif_ep) { + dapl_ep_link_cm(ep, cm); + cm->ep = ep; + return (dapli_mix_cm_rep_out(cm, p_size, p_data)); + } + + /* save state and setup valid reference to EP, HCA. !PSP !RSP */ + if (!cm->sp->ep_handle && !cm->sp->psp_flags) + dapl_ep_link_cm(ep, cm); + cm->ep = ep; + cm->hca = ia->hca_ptr; + + /* Send RTU and change state under CM lock */ + dapl_os_lock(&cm->lock); + cm->state = MCM_RTU_PENDING; + dapl_os_get_time(&cm->timer); /* RTU expected */ + if (mcm_send(&cm->hca->ib_trans, &cm->msg, cm->p_data, cm->p_size)) { + dapl_log(DAPL_DBG_TYPE_ERR," accept ERR: ucm reply send()\n"); + dapl_os_unlock(&cm->lock); + dapl_ep_unlink_cm(ep, cm); + goto bail; + } + dapl_os_unlock(&cm->lock); + + DAPL_CNTR(ia, DCNT_IA_CM_REP_TX); + dapl_dbg_log(DAPL_DBG_TYPE_CM, " PASSIVE: accepted!\n"); + dapls_thread_signal(&cm->hca->ib_trans.signal); + return DAT_SUCCESS; +bail: + DAPL_CNTR(ia, DCNT_IA_CM_ERR); + dapli_cm_free(cm); + return DAT_INTERNAL_ERROR; +} + + +/* + * dapls_ib_connect + * + * Initiate a connection with the passive listener on another node + * + * Input: + * ep_handle, + * remote_ia_address, + * remote_conn_qual, + * prd_size size of private data and structure + * prd_prt pointer to private data structure + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + * DAT_INSUFFICIENT_RESOURCES + * DAT_INVALID_PARAMETER + * + */ +DAT_RETURN +dapls_ib_connect(IN DAT_EP_HANDLE ep_handle, + IN DAT_IA_ADDRESS_PTR r_addr, + IN DAT_CONN_QUAL r_psp, + IN DAT_COUNT p_size, IN void *p_data) +{ + DAPL_EP *ep = (DAPL_EP *)ep_handle; + DAPL_HCA *hca = ep->header.owner_ia->hca_ptr; + struct dat_mcm_addr *mcm_ia = (struct dat_mcm_addr *)r_addr; + dp_ib_cm_handle_t cm; + + dapl_log(DAPL_DBG_TYPE_CM, " MCM connect -> AF %d LID 0x%x QPN 0x%x GID %s" + " port %d ep_map %s sl %d qt %d\n", + mcm_ia->family, ntohs(mcm_ia->lid), ntohl(mcm_ia->qpn), + inet_ntop(AF_INET6, &mcm_ia->gid, gid_str, sizeof(gid_str)), + mcm_ia->port, mcm_map_str(mcm_ia->ep_map), + mcm_ia->sl, mcm_ia->qp_type); + + /* create CM object, initialize SRC info from EP */ + cm = dapls_cm_create(hca, ep); + if (cm == NULL) + return DAT_INSUFFICIENT_RESOURCES; + + /* remote hca and port: lid, gid, network order */ + dapl_os_memcpy(&cm->msg.daddr1, r_addr, sizeof(struct dat_mcm_addr)); + dapl_os_memcpy(&cm->msg.daddr2, r_addr, sizeof(struct dat_mcm_addr)); + + /* validate port and ep_map range */ + if ((mcm_ia->port > 2) || (mcm_ia->ep_map > 3)) + cm->msg.daddr1.ep_map = 0; + + /* remote uCM information, comes from consumer provider r_addr */ + cm->msg.dport = htons((uint16_t)r_psp); + cm->msg.dqpn = cm->msg.daddr1.qpn; + cm->msg.daddr1.qpn = 0; /* don't have a remote qpn until reply */ + + /* set max rdma inbound requests */ + cm->msg.rd_in = ep->param.ep_attr.max_rdma_read_in; + + if (p_size) { + cm->msg.p_size = htons(p_size); + dapl_os_memcpy(&cm->msg.p_data, p_data, p_size); + } + cm->state = MCM_INIT; + + /* link EP and CM, put on work queue */ + dapli_queue_conn(cm); + + /* build connect request, send to remote CM based on r_addr info */ + return (dapli_cm_connect(ep, cm)); +} + +/* + * dapls_ib_disconnect + * + * Disconnect an EP + * + * Input: + * ep_handle, + * disconnect_flags + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + */ +DAT_RETURN +dapls_ib_disconnect(IN DAPL_EP *ep_ptr, IN DAT_CLOSE_FLAGS close_flags) +{ + dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr); + + dapl_os_lock(&ep_ptr->header.lock); + if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED || + ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC || + cm_ptr == NULL) { + dapl_os_unlock(&ep_ptr->header.lock); + return DAT_SUCCESS; + } + dapl_os_unlock(&ep_ptr->header.lock); + + if (cm_ptr->tp->scif_ep) { /* QPt on MPXYD, QPr local or on MPXYD */ + dapli_mix_cm_dreq_out(cm_ptr); + if (ep_ptr->qp_handle->qp) + dapls_modify_qp_state(ep_ptr->qp_handle->qp, IBV_QPS_ERR,0,0,0); + } else { /* QPt and QPr local */ + dapli_cm_disconnect(cm_ptr); + dapls_modify_qp_state(ep_ptr->qp_handle->qp2, IBV_QPS_ERR,0,0,0); + } + + return DAT_SUCCESS; +} + +/* + * dapls_ib_disconnect_clean + * + * Clean up outstanding connection data. This routine is invoked + * after the final disconnect callback has occurred. Only on the + * ACTIVE side of a connection. It is also called if dat_ep_connect + * times out using the consumer supplied timeout value. + * + * Input: + * ep_ptr DAPL_EP + * active Indicates active side of connection + * + * Output: + * none + * + * Returns: + * void + * + */ +void +dapls_ib_disconnect_clean(IN DAPL_EP *ep, + IN DAT_BOOLEAN active, + IN const ib_cm_events_t ib_cm_event) +{ + if (ib_cm_event == IB_CME_TIMEOUT) { + dp_ib_cm_handle_t cm_ptr; + + if ((cm_ptr = dapl_get_cm_from_ep(ep)) == NULL) + return; + + dapl_log(DAPL_DBG_TYPE_WARN, + "dapls_ib_disc_clean: CONN_TIMEOUT ep %p cm %p %s\n", + ep, cm_ptr, dapl_cm_state_str(cm_ptr->state)); + + /* schedule release of socket and local resources */ + dapli_cm_free(cm_ptr); + } +} + +/* + * dapl_ib_setup_conn_listener + * + * Have the CM set up a connection listener. + * + * Input: + * ibm_hca_handle HCA handle + * qp_handle QP handle + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + * DAT_INSUFFICIENT_RESOURCES + * DAT_INTERNAL_ERROR + * DAT_CONN_QUAL_UNAVAILBLE + * DAT_CONN_QUAL_IN_USE + * + */ +DAT_RETURN +dapls_ib_setup_conn_listener(IN DAPL_IA *ia, + IN DAT_UINT64 sid, + IN DAPL_SP *sp) +{ + dp_ib_cm_handle_t cm = NULL; + int ret; + + dapl_dbg_log(DAPL_DBG_TYPE_EP, + " listen(ia %p ServiceID %x sp %p)\n", + ia, sid, sp); + + /* cm_create will setup saddr1 for listen server */ + if ((cm = dapls_cm_create(ia->hca_ptr, NULL)) == NULL) + return DAT_INSUFFICIENT_RESOURCES; + + /* LISTEN: init DST address and QP info to local CM server info */ + cm->sp = sp; + cm->hca = ia->hca_ptr; + + /* save cm_handle reference in service point */ + sp->cm_srvc_handle = cm; + + /* proxy CM service: send listen over to MPXYD */ + if (ia->hca_ptr->ib_trans.scif_ep) { + ret = dapli_mix_listen(cm, sid); + if (ret) { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " listen: MIX_ERROR %d on conn_qual %x\n", + ret, sid); + dapli_cm_free(cm); + if (ret == MIX_EADDRINUSE) + return DAT_CONN_QUAL_IN_USE; + else + return DAT_INSUFFICIENT_RESOURCES; + } + } else { + /* local CM service, reserve local port and setup addr info */ + if (!mcm_get_port(&ia->hca_ptr->ib_trans, (uint16_t)sid)) { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " listen: ERROR %s on conn_qual %x\n", + strerror(errno), sid); + dapli_cm_free(cm); + return DAT_CONN_QUAL_IN_USE; + } + cm->msg.sport = htons((uint16_t)sid); + cm->msg.sqpn = htonl(ia->hca_ptr->ib_trans.qp->qp_num); + cm->msg.saddr1.qp_type = IBV_QPT_UD; + cm->msg.saddr1.lid = ia->hca_ptr->ib_trans.addr.lid; + dapl_os_memcpy(&cm->msg.saddr1.gid[0], + &cm->hca->ib_trans.addr.gid, 16); + } + + /* queue up listen socket to process inbound CR's */ + cm->state = MCM_LISTEN; + dapli_queue_listen(cm); + + DAPL_CNTR(ia, DCNT_IA_CM_LISTEN); + + return DAT_SUCCESS; +} + + +/* + * dapl_ib_remove_conn_listener + * + * Have the CM remove a connection listener. + * + * Input: + * ia_handle IA handle + * ServiceID IB Channel Service ID + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + * DAT_INVALID_STATE + * + */ +DAT_RETURN +dapls_ib_remove_conn_listener(IN DAPL_IA *ia, IN DAPL_SP *sp) +{ + dp_ib_cm_handle_t cm = sp->cm_srvc_handle; + + /* free cm_srvc_handle and port, and mark CM for cleanup */ + if (cm) { + dapl_dbg_log(DAPL_DBG_TYPE_EP, + " remove_listener(ia %p sp %p cm %p psp=%x)\n", + ia, sp, cm, ntohs(cm->msg.sport)); + + sp->cm_srvc_handle = NULL; + dapli_dequeue_listen(cm); + + /* clean up proxy listen, otherwise local port space */ + if (cm->hca->ib_trans.scif_ep) + dapli_mix_listen_free(cm); + else + mcm_free_port(&cm->hca->ib_trans, ntohs(cm->msg.sport)); + + dapls_cm_release(cm); /* last ref, dealloc */ + } + return DAT_SUCCESS; +} + +/* + * dapls_ib_accept_connection + * + * Perform necessary steps to accept a connection + * + * Input: + * cr_handle + * ep_handle + * private_data_size + * private_data + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + * DAT_INSUFFICIENT_RESOURCES + * DAT_INTERNAL_ERROR + * + */ +DAT_RETURN +dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle, + IN DAT_EP_HANDLE ep_handle, + IN DAT_COUNT p_size, + IN const DAT_PVOID p_data) +{ + DAPL_CR *cr = (DAPL_CR *)cr_handle; + DAPL_EP *ep = (DAPL_EP *)ep_handle; + + dapl_dbg_log(DAPL_DBG_TYPE_EP, + " accept_connection(cr %p cm %p ep %p prd %p,%d)\n", + cr, cr->ib_cm_handle, ep, p_data, p_size); + + /* allocate and attach a QP if necessary */ + if (ep->qp_state == DAPL_QP_STATE_UNATTACHED) { + DAT_RETURN status; + status = dapls_ib_qp_alloc(ep->header.owner_ia, + ep, ep); + if (status != DAT_SUCCESS) + return status; + } + + return (dapli_accept_usr(ep, cr, p_size, p_data)); +} + +/* + * dapls_ib_reject_connection + * + * Reject a connection + * + * Input: + * cr_handle + * + * Output: + * none + * + * Returns: + * DAT_SUCCESS + * DAT_INTERNAL_ERROR + * + */ +DAT_RETURN +dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm, + IN int reason, + IN DAT_COUNT p_size, IN const DAT_PVOID p_data) +{ + dapl_dbg_log(DAPL_DBG_TYPE_EP, + " reject(cm %p reason %x, p_data %p, p_size %d)\n", + cm, reason, p_data, p_size); + + if (p_size > DAT_MCM_PDATA_SIZE) + return DAT_LENGTH_ERROR; + + if (cm->tp->scif_ep) + return (dapli_mix_cm_rej_out(cm, p_size, p_data, reason)); + + /* cr_thread will destroy CR, update saddr1 lid, gid, qp_type info */ + dapl_os_lock(&cm->lock); + dapl_log(DAPL_DBG_TYPE_CM, + " PASSIVE: REJECTING CM_REQ:" + " cm %p op %s, st %s slid %x iqp %x port %x ->" + " dlid %x iqp %x port %x\n", cm, + dapl_cm_op_str(ntohs(cm->msg.op)), + dapl_cm_state_str(cm->state), + ntohs(cm->hca->ib_trans.addr.lid), + ntohl(cm->msg.saddr1.qpn), + ntohs(cm->msg.sport), ntohs(cm->msg.daddr1.lid), + ntohl(cm->msg.daddr1.qpn), ntohs(cm->msg.dport)); + + cm->state = MCM_REJECTED; + cm->msg.saddr1.lid = cm->hca->ib_trans.addr.lid; + cm->msg.saddr1.qp_type = cm->msg.daddr1.qp_type; + dapl_os_memcpy(&cm->msg.saddr1.gid[0], + &cm->hca->ib_trans.addr.gid, 16); + + if (reason == IB_CM_REJ_REASON_CONSUMER_REJ) + cm->msg.op = htons(MCM_REJ_USER); + else + cm->msg.op = htons(MCM_REJ_CM); + + DAPL_CNTR(((DAPL_IA *)dapl_llist_peek_head(&cm->hca->ia_list_head)), + reason == IB_CM_REJ_REASON_CONSUMER_REJ ? + DCNT_IA_CM_USER_REJ_TX : DCNT_IA_CM_ERR_REJ_TX); + + if (mcm_send(&cm->hca->ib_trans, &cm->msg, p_data, p_size)) { + dapl_log(DAPL_DBG_TYPE_WARN, + " cm_reject: send ERR: %s\n", strerror(errno)); + dapl_os_unlock(&cm->lock); + return DAT_INTERNAL_ERROR; + } + dapl_os_unlock(&cm->lock); + dapli_cm_free(cm); + return DAT_SUCCESS; +} + +/* + * dapls_ib_cm_remote_addr + * + * Obtain the remote IP address given a connection + * + * Input: + * cr_handle + * + * Output: + * remote_ia_address: where to place the remote address + * + * Returns: + * DAT_SUCCESS + * DAT_INVALID_HANDLE + * + */ +DAT_RETURN +dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle, + OUT DAT_SOCK_ADDR6 * remote_ia_address) +{ + DAPL_HEADER *header; + dp_ib_cm_handle_t cm; + + dapl_dbg_log(DAPL_DBG_TYPE_EP, + "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n", + dat_handle); + + header = (DAPL_HEADER *) dat_handle; + + if (header->magic == DAPL_MAGIC_EP) + cm = dapl_get_cm_from_ep((DAPL_EP *) dat_handle); + else if (header->magic == DAPL_MAGIC_CR) + cm = ((DAPL_CR *) dat_handle)->ib_cm_handle; + else + return DAT_INVALID_HANDLE; + + dapl_os_memcpy(remote_ia_address, + &cm->msg.daddr1, + sizeof(DAT_SOCK_ADDR6)); + + return DAT_SUCCESS; +} + +int dapls_ib_private_data_size( + IN DAPL_HCA *hca_ptr) +{ + return DAT_MCM_PDATA_SIZE; +} + +void cm_thread(void *arg) +{ + struct dapl_hca *hca = arg; + dp_ib_cm_handle_t cm, next; + ib_cq_handle_t m_cq; + struct dapl_fd_set *set; + char rbuf[2]; + int time_ms, ret; + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread: ENTER hca %p\n", hca); + set = dapl_alloc_fd_set(); + if (!set) + goto out; + + dapl_os_lock(&hca->ib_trans.lock); + hca->ib_trans.cm_state = IB_THREAD_RUN; + + while (1) { + time_ms = -1; /* reset to blocking */ + dapl_fd_zero(set); + dapl_fd_set(hca->ib_trans.signal.scm[0], set, DAPL_FD_READ); + dapl_fd_set(hca->ib_hca_handle->async_fd, set, DAPL_FD_READ); + dapl_fd_set(hca->ib_trans.rch_fd, set, DAPL_FD_READ); + dapl_fd_set(hca->ib_trans.scif_ev_ep, set, DAPL_FD_READ); + dapl_fd_set(hca->ib_trans.ib_cq->fd, set, DAPL_FD_READ); + + dapl_os_lock(&hca->ib_trans.cqlock); /* CQt for HST->MXS */ + if (!dapl_llist_is_empty(&hca->ib_trans.cqlist)) + m_cq = dapl_llist_peek_head(&hca->ib_trans.cqlist); + else + m_cq = NULL; + + while (m_cq) { + dapl_fd_set(m_cq->cq->channel->fd, set, DAPL_FD_READ); + dapl_log(DAPL_DBG_TYPE_CM, " cm_thread: mcm_pio_event(%p)\n", m_cq); + mcm_dto_event(m_cq); + m_cq = dapl_llist_next_entry( + &hca->ib_trans.cqlist, + (DAPL_LLIST_ENTRY *)&m_cq->entry); + } + dapl_os_unlock(&hca->ib_trans.cqlock); + + if (!dapl_llist_is_empty(&hca->ib_trans.list)) + next = dapl_llist_peek_head(&hca->ib_trans.list); + else + next = NULL; + + while (next) { + cm = next; + next = dapl_llist_next_entry( + &hca->ib_trans.list, + (DAPL_LLIST_ENTRY *)&cm->local_entry); + dapls_cm_acquire(cm); /* hold thread ref */ + dapl_os_lock(&cm->lock); + if (cm->state == MCM_FREE || + hca->ib_trans.cm_state != IB_THREAD_RUN) { + dapl_os_unlock(&cm->lock); + dapl_log(DAPL_DBG_TYPE_CM, + " CM destroy: cm %p ep %p st=%s refs=%d\n", + cm, cm->ep, mcm_state_str(cm->state), + cm->ref_count); + + dapls_cm_release(cm); /* release alloc ref */ + dapli_cm_dequeue(cm); /* release workq ref */ + dapls_cm_release(cm); /* release thread ref */ + continue; + } + dapl_os_unlock(&cm->lock); + mcm_check_timers(cm, &time_ms); + dapls_cm_release(cm); /* release thread ref */ + } + + /* set to exit and all resources destroyed */ + if ((hca->ib_trans.cm_state != IB_THREAD_RUN) && + (dapl_llist_is_empty(&hca->ib_trans.list))) + break; + + dapl_os_unlock(&hca->ib_trans.lock); + dapl_select(set, time_ms); + + if (dapl_poll(hca->ib_trans.rch_fd, + DAPL_FD_READ) == DAPL_FD_READ) { + mcm_recv(&hca->ib_trans); + } + ret = dapl_poll(hca->ib_trans.scif_ev_ep, DAPL_FD_READ); + if (ret == DAPL_FD_READ) + dapli_mix_recv(hca, hca->ib_trans.scif_ev_ep); + else if (ret == DAPL_FD_ERROR) { + struct ibv_async_event event; + + dapl_log(1, " cm_thread: dev_id %d scif_ev_ep %d ERR\n", + hca->ib_trans.dev_id, hca->ib_trans.scif_ev_ep); + + event.event_type = IBV_EVENT_DEVICE_FATAL; + dapl_evd_un_async_error_callback(hca->ib_hca_handle, + &event, + hca->ib_trans.async_un_ctx); + dapl_os_lock(&hca->ib_trans.lock); + hca->ib_trans.cm_state = IB_THREAD_CANCEL; + continue; + } + if (dapl_poll(hca->ib_hca_handle->async_fd, + DAPL_FD_READ) == DAPL_FD_READ) { + dapli_async_event_cb(&hca->ib_trans); + } + if (dapl_poll(hca->ib_trans.ib_cq->fd, + DAPL_FD_READ) == DAPL_FD_READ) { + dapli_cq_event_cb(&hca->ib_trans); + } + while (dapl_poll(hca->ib_trans.signal.scm[0], + DAPL_FD_READ) == DAPL_FD_READ) { + recv(hca->ib_trans.signal.scm[0], rbuf, 2, 0); + } + dapl_os_lock(&hca->ib_trans.lock); + + /* set to exit and all resources destroyed */ + if ((hca->ib_trans.cm_state != IB_THREAD_RUN) && + (dapl_llist_is_empty(&hca->ib_trans.list))) + break; + } + + dapl_os_unlock(&hca->ib_trans.lock); + free(set); +out: + hca->ib_trans.cm_state = IB_THREAD_EXIT; + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cm_thread(hca %p) exit\n", hca); +} + +static void mcm_log_addrs(int lvl, struct dat_mcm_msg *msg, int state, int in) +{ + if (in) { + if (MXS_EP(&msg->daddr1) && MXS_EP(&msg->saddr1)) { + dapl_log(lvl, " QPr_t addr2: %s 0x%x %x 0x%x %s <- QPt_r addr2: 0x%x %x 0x%x %s\n", + mcm_state_str(state), htons(msg->daddr2.lid), + htonl(msg->daddr2.qpn), htons(msg->dport), + mcm_map_str(msg->daddr2.ep_map), + htons(msg->saddr2.lid), htonl(msg->saddr2.qpn), + htons(msg->sport), mcm_map_str(msg->saddr2.ep_map)); + } else { + dapl_log(lvl, " QPr addr1: %s 0x%x %x 0x%x %s <- QPt addr2: 0x%x %x 0x%x %s\n", + mcm_state_str(state), htons(msg->daddr1.lid), + htonl(msg->daddr1.qpn), htons(msg->dport), + mcm_map_str(msg->daddr1.ep_map), + htons(msg->saddr2.lid), htonl(msg->saddr2.qpn), + htons(msg->sport), mcm_map_str(msg->saddr2.ep_map)); + dapl_log(lvl, " QPt addr2: %s 0x%x %x 0x%x %s <- QPr addr1: 0x%x %x 0x%x %s\n", + mcm_state_str(state),htons(msg->daddr2.lid), + htonl(msg->daddr2.qpn), htons(msg->dport), + mcm_map_str(msg->daddr2.ep_map), + htons(msg->saddr1.lid), htonl(msg->saddr1.qpn), + htons(msg->sport), mcm_map_str(msg->saddr1.ep_map)); + } + } else { + if (MXS_EP(&msg->saddr1) && MXS_EP(&msg->daddr1)) { + dapl_log(lvl, " QPr_t addr2: %s 0x%x %x 0x%x %s -> QPt_r addr2: 0x%x %x 0x%x %s\n", + mcm_state_str(state), htons(msg->saddr2.lid), + htonl(msg->saddr2.qpn), htons(msg->sport), + mcm_map_str(msg->saddr2.ep_map), + htons(msg->daddr2.lid), htonl(msg->daddr2.qpn), + htons(msg->dport), mcm_map_str(msg->daddr2.ep_map)); + } else { + dapl_log(lvl, " QPr addr1: %s 0x%x %x 0x%x %s -> QPt addr2: 0x%x %x 0x%x %s\n", + mcm_state_str(state), htons(msg->saddr1.lid), + htonl(msg->saddr1.qpn), htons(msg->sport), + mcm_map_str(msg->saddr1.ep_map), + htons(msg->daddr2.lid), htonl(msg->daddr2.qpn), + htons(msg->dport), mcm_map_str(msg->daddr2.ep_map)); + dapl_log(lvl, " QPt addr2: %s 0x%x %x 0x%x %s -> QPr addr1: 0x%x %x 0x%x %s\n", + mcm_state_str(state), htons(msg->saddr2.lid), + htonl(msg->saddr2.qpn), htons(msg->sport), + mcm_map_str(msg->saddr2.ep_map), + htons(msg->daddr1.lid), htonl(msg->daddr1.qpn), + htons(msg->dport), mcm_map_str(msg->daddr1.ep_map)); + } + } +} + +#ifdef DAPL_COUNTERS +static char _ctr_host_[128]; +/* Debug aid: List all Connections in process and state */ +void dapls_print_cm_list(IN DAPL_IA *ia_ptr) +{ + /* Print in process CM's for this IA, if debug type set */ + int i = 0; + dp_ib_cm_handle_t cm, next_cm; + struct dapl_llist_entry **list; + DAPL_OS_LOCK *lock; + + /* LISTEN LIST */ + list = &ia_ptr->hca_ptr->ib_trans.llist; + lock = &ia_ptr->hca_ptr->ib_trans.llock; + + dapl_os_lock(lock); + if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)list)) + next_cm = dapl_llist_peek_head((DAPL_LLIST_HEAD*)list); + else + next_cm = NULL; + + gethostname(_ctr_host_, sizeof(_ctr_host_)); + printf("\n [%s:%x] DAPL IA LISTEN/CONNECTIONS IN PROCESS:\n", + _ctr_host_ , dapl_os_getpid()); + + while (next_cm) { + cm = next_cm; + next_cm = dapl_llist_next_entry((DAPL_LLIST_HEAD*)list, + (DAPL_LLIST_ENTRY*)&cm->local_entry); + + printf( " LISTEN[%d]: sp %p %s uCM_QP: %x %x c_%x l_pid %x \n", + i, cm->sp, dapl_cm_state_str(cm->state), + ntohs(cm->msg.saddr1.lid), ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), + ntohl(cm->msg.s_id)); + i++; + } + dapl_os_unlock(lock); + + /* CONNECTION LIST */ + list = &ia_ptr->hca_ptr->ib_trans.list; + lock = &ia_ptr->hca_ptr->ib_trans.lock; + + dapl_os_lock(lock); + if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)list)) + next_cm = dapl_llist_peek_head((DAPL_LLIST_HEAD*)list); + else + next_cm = NULL; + + while (next_cm) { + cm = next_cm; + next_cm = dapl_llist_next_entry((DAPL_LLIST_HEAD*)list, + (DAPL_LLIST_ENTRY*)&cm->local_entry); + + printf( " CONN[%d]: ep %p cm %p %s %s" + " %x %x c_%x i_%x %s %x %x c_%x i_%x r_pid %x\n", + i, cm->ep, cm, + cm->msg.saddr1.qp_type == IBV_QPT_RC ? "RC" : "UD", + dapl_cm_state_str(cm->state), + ntohs(cm->msg.saddr1.lid), + ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), + ntohl(cm->msg.saddr1.qpn), + cm->sp ? "<-" : "->", + ntohs(cm->msg.daddr1.lid), + ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), + ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id)); + i++; + } + printf("\n"); + dapl_os_unlock(lock); +} + +void dapls_print_cm_free_list(IN DAPL_IA *ia_ptr) +{ + DAPL_EP *ep, *next_ep; + dp_ib_cm_handle_t cm, next_cm; + int i = 0; + + gethostname(_ctr_host_, sizeof(_ctr_host_)); + printf("\n [%s:%x] DAPL EP CM FREE LIST:\n", + _ctr_host_ , dapl_os_getpid()); + + dapl_os_lock(&ia_ptr->header.lock); + ep = (dapl_llist_is_empty(&ia_ptr->ep_list_head) ? + NULL : dapl_llist_peek_head(&ia_ptr->ep_list_head)); + while (ep != NULL) { + next_ep = dapl_llist_next_entry(&ia_ptr->ep_list_head, + &ep->header.ia_list_entry); + dapl_os_lock(&ep->header.lock); + cm = (dapl_llist_is_empty(&ep->cm_list_head) ? + NULL : dapl_llist_peek_head(&ep->cm_list_head)); + while (cm) { + dapl_os_lock(&cm->lock); + next_cm = dapl_llist_next_entry(&ep->cm_list_head, + &cm->list_entry); + if (cm->state == DCM_FREE) { + printf( " CONN[%d]: ep %p cm %p %s %s" + " %x %x c_%x i_%x l_pid %x %s" + " %x %x c_%x i_%x r_pid %x\n", + i, cm->ep, cm, + cm->msg.saddr1.qp_type == IBV_QPT_RC ? "RC" : "UD", + dapl_cm_state_str(cm->state), + ntohs(cm->msg.saddr1.lid), + ntohs(cm->msg.sport), + ntohl(cm->msg.sqpn), + ntohl(cm->msg.saddr1.qpn), + ntohl(cm->msg.s_id), + cm->sp ? "<-" : "->", + ntohs(cm->msg.daddr1.lid), + ntohs(cm->msg.dport), + ntohl(cm->msg.dqpn), + ntohl(cm->msg.daddr1.qpn), + ntohl(cm->msg.d_id)); + i++; + } + dapl_os_unlock(&cm->lock); + cm = next_cm; + } + dapl_os_unlock(&ep->header.lock); + ep = next_ep; + } + dapl_os_unlock(&ia_ptr->header.lock); +} +#endif + diff --git a/dapl/openib_mcm/dapl_ib_util.h b/dapl/openib_mcm/dapl_ib_util.h new file mode 100644 index 0000000..a5e1c1b --- /dev/null +++ b/dapl/openib_mcm/dapl_ib_util.h @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2009-2014 Intel Corporation. All rights reserved. + * + * This Software is licensed under one of the following licenses: + * + * 1) under the terms of the "Common Public License 1.0" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/cpl.php. + * + * 2) under the terms of the "The BSD License" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/bsd-license.php. + * + * 3) under the terms of the "GNU General Public License (GPL) Version 2" a + * copy of which is available from the Open Source Initiative, see + * http://www.opensource.org/licenses/gpl-license.php. + * + * Licensee has the right to choose one of the above licenses. + * + * Redistributions of source code must retain the above copyright + * notice and one of the license notices. + * + * Redistributions in binary form must reproduce both the above copyright + * notice, one of the license notices in the documentation + * and/or other materials provided with the distribution. + */ + +#ifndef _DAPL_IB_UTIL_H_ +#define _DAPL_IB_UTIL_H_ +#define _OPENIB_MCM_ + +#include +#include +#include "openib_osd.h" +#include "dapl_mic_common.h" +#include "dapl_ib_common.h" + +#define MCM_RETRY_CNT 10 +#define MCM_REP_TIME 4000 /* reply timeout in m_secs */ +#define MCM_RTU_TIME 2000 /* rtu timeout in m_secs */ + +/* DAPL CM objects MUST include list_entry, ref_count, event for EP linking */ +struct ib_cm_handle +{ + struct dapl_llist_entry list_entry; + struct dapl_llist_entry local_entry; + DAPL_OS_WAIT_OBJECT d_event; + DAPL_OS_WAIT_OBJECT f_event; + DAPL_OS_LOCK lock; + DAPL_OS_TIMEVAL timer; + uint32_t cm_id; /* local id */ + uint32_t scm_id; /* shadow id */ + uint64_t cm_ctx; /* local context */ + uint64_t scm_ctx; /* shadow context */ + int ref_count; + int state; + int retries; + struct _ib_hca_transport *tp; + struct dapl_hca *hca; + struct dapl_sp *sp; + struct dapl_ep *ep; + struct ibv_ah *ah; + uint16_t p_size; /* accept p_data, for retries */ + uint8_t p_data[DAT_MCM_PDATA_SIZE]; + dat_mcm_msg_t msg; +}; + +typedef struct ib_cm_handle *dp_ib_cm_handle_t; +typedef dp_ib_cm_handle_t ib_cm_srvc_handle_t; + +/* Definitions */ +#define IB_INVALID_HANDLE NULL + +/* ib_hca_transport_t, specific to this implementation */ +typedef struct _ib_hca_transport +{ + struct ibv_device *ib_dev; + struct dapl_hca *hca; + struct ibv_context *ib_ctx; + struct ibv_comp_channel *ib_cq; + ib_cq_handle_t ib_cq_empty; + int destroy; + int cm_state; + DAPL_OS_THREAD thread; + DAPL_OS_LOCK lock; /* connect list */ + struct dapl_llist_entry *list; + DAPL_OS_LOCK llock; /* listen list */ + struct dapl_llist_entry *llist; + DAPL_OS_LOCK cqlock; /* CQ list for PI WC's */ + struct dapl_llist_entry *cqlist; + ib_async_handler_t async_unafiliated; + void *async_un_ctx; + ib_async_cq_handler_t async_cq_error; + ib_async_dto_handler_t async_cq; + ib_async_qp_handler_t async_qp_error; + struct dat_mcm_addr addr; /* lid, port, qp_num, gid */ + struct dapl_thread_signal signal; + /* dat_mix_dev_attr_t */ + uint8_t ack_timer; + uint8_t ack_retry; + uint8_t rnr_timer; + uint8_t rnr_retry; + uint8_t global; + uint8_t hop_limit; + uint8_t tclass; + uint8_t sl; + uint8_t mtu; + uint8_t rd_atom_in; + uint8_t rd_atom_out; + uint8_t pkey_idx; + uint16_t pkey; + uint16_t max_inline_send; + /* dat_mix_dev_attr_t */ + int cqe; + int qpe; + int burst; + int retries; + int cm_timer; + int rep_time; + int rtu_time; + DAPL_OS_LOCK slock; + int s_hd; + int s_tl; + struct ibv_pd *pd; + struct ibv_cq *scq; + struct ibv_cq *rcq; + struct ibv_qp *qp; + struct ibv_mr *mr_rbuf; + struct ibv_mr *mr_sbuf; + dat_mcm_msg_t *sbuf; + dat_mcm_msg_t *rbuf; + struct ibv_comp_channel *rch; + int rch_fd; + struct ibv_ah **ah; + DAPL_OS_LOCK plock; + uint16_t lid; + uint8_t *sid; /* Sevice IDs, port space, bitarray? */ + + /* SCIF MIC indirect, EP to MPXYD services, if running on MIC */ + uint32_t dev_id; /* proxy device id */ + struct scif_portID self; + scif_epd_t scif_ep; /* FD operation and CM processing */ + scif_epd_t scif_ev_ep; /* unsolicited events processing */ + scif_epd_t scif_tx_ep; /* FD data path processing */ + struct scif_portID peer; /* MPXYD op EP proxy addr info */ + struct scif_portID peer_ev; /* MPXYD event EP proxy addr info */ + struct scif_portID peer_tx; /* MPXYD data EP proxy addr info */ + uint64_t sys_guid; /* system image guid, network order */ + uint64_t guid; /* host order */ + char guid_str[32]; + ib_named_attr_t na; + +} ib_hca_transport_t; + +/* prototypes */ +void cm_thread(void *arg); +void dapli_queue_conn(dp_ib_cm_handle_t cm); +void dapli_dequeue_conn(dp_ib_cm_handle_t cm); +void mcm_connect_rtu(dp_ib_cm_handle_t cm, dat_mcm_msg_t *msg); +void mcm_disconnect_final(dp_ib_cm_handle_t cm); +void dapli_async_event_cb(struct _ib_hca_transport *tp); +void dapli_cq_event_cb(struct _ib_hca_transport *tp); +void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr); +void dapls_cm_release(dp_ib_cm_handle_t cm_ptr); +void dapls_cm_free(dp_ib_cm_handle_t cm_ptr); +dp_ib_cm_handle_t dapls_cm_create(DAPL_HCA *hca, DAPL_EP *ep); +DAT_RETURN dapls_modify_qp_rtu(struct ibv_qp *qp, uint32_t qpn, uint16_t lid, ib_gid_handle_t gid); + +/* HST->MXS (MIC xsocket) remote PI communication, proxy.c */ +int mcm_send_pi(ib_qp_handle_t m_qp, int len, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr); +int mcm_post_rcv_wc(struct dcm_ib_qp *m_qp, int cnt); +void mcm_dto_event(struct dcm_ib_cq *m_cq); +int mcm_create_wc_q(struct dcm_ib_qp *m_qp, int entries); +void mcm_destroy_wc_q(struct dcm_ib_qp *m_qp); +int mcm_create_pi_cq(struct dcm_ib_qp *m_qp, int len); +void mcm_destroy_pi_cq(struct dcm_ib_qp *m_qp); + +/* MIC eXchange (MIX) operations, mix.c */ +int dapli_mix_open(ib_hca_transport_t *tp, char *name, int port, int query); +void dapli_mix_close(ib_hca_transport_t *tp); +int dapli_mix_listen(dp_ib_cm_handle_t cm, uint16_t sid); +int dapli_mix_listen_free(dp_ib_cm_handle_t cm); +int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr, + ib_cq_handle_t req_cq, ib_cq_handle_t rcv_cq); +int dapli_mix_qp_free(ib_qp_handle_t m_qp); +int dapli_mix_cq_create(ib_cq_handle_t m_cq, int cq_len); +int dapli_mix_cq_free(ib_cq_handle_t m_cq); +int dapli_mix_cq_wait(ib_cq_handle_t m_cq, int time); +int dapli_mix_cq_poll(ib_cq_handle_t m_cq, struct ibv_wc *wc); +int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp); +int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm); +void dapli_mix_cm_dreq_out(dp_ib_cm_handle_t m_cm); +int dapli_mix_cm_rep_out(dp_ib_cm_handle_t m_cm, int p_size, void *p_data); +int dapli_mix_cm_rej_out(dp_ib_cm_handle_t m_cm, int p_size, void *p_data, int reason); +int dapli_mix_post_send(ib_qp_handle_t m_qp, int len, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr); +int dapli_mix_post_recv(ib_qp_handle_t m_qp, int len, struct ibv_recv_wr *wr, struct ibv_recv_wr **bad_wr); +int dapli_mix_recv(DAPL_HCA *hca, int scif_ep); +int dapli_mix_mr_create(ib_hca_transport_t *tp, DAPL_LMR * lmr); +int dapli_mix_mr_free(ib_hca_transport_t *tp, DAPL_LMR * lmr); + +#ifdef DAPL_COUNTERS +void dapls_print_cm_list(IN DAPL_IA *ia_ptr); +#endif + +#endif /* _DAPL_IB_UTIL_H_ */ + diff --git a/dapl/openib_mcm/device.c b/dapl/openib_mcm/device.c new file mode 100644 index 0000000..b60ba1c --- /dev/null +++ b/dapl/openib_mcm/device.c @@ -0,0 +1,620 @@ +/* + * Copyright (c) 2009-2014 Intel Corporation. All rights reserved. + * + * This Software is licensed under one of the following licenses: + * + * 1) under the terms of the "Common Public License 1.0" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/cpl.php. + * + * 2) under the terms of the "The BSD License" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/bsd-license.php. + * + * 3) under the terms of the "GNU General Public License (GPL) Version 2" a + * copy of which is available from the Open Source Initiative, see + * http://www.opensource.org/licenses/gpl-license.php. + * + * Licensee has the right to choose one of the above licenses. + * + * Redistributions of source code must retain the above copyright + * notice and one of the license notices. + * + * Redistributions in binary form must reproduce both the above copyright + * notice, one of the license notices in the documentation + * and/or other materials provided with the distribution. + */ + +#include "openib_osd.h" +#include "dapl.h" +#include "dapl_adapter_util.h" +#include "dapl_ib_util.h" +#include "dapl_osd.h" +#include + +char gid_str[INET6_ADDRSTRLEN]; + +static void mcm_service_destroy(IN DAPL_HCA *hca); +static int mcm_service_create(IN DAPL_HCA *hca); + +static int32_t create_os_signal(IN DAPL_HCA * hca_ptr) +{ + DAPL_SOCKET listen_socket; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int ret; + + listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listen_socket == DAPL_INVALID_SOCKET) + return 1; + + memset(&addr, 0, sizeof addr); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(0x7f000001); + ret = bind(listen_socket, (struct sockaddr *)&addr, sizeof addr); + if (ret) + goto err1; + + ret = getsockname(listen_socket, (struct sockaddr *)&addr, &addrlen); + if (ret) + goto err1; + + ret = listen(listen_socket, 0); + if (ret) + goto err1; + + hca_ptr->ib_trans.signal.scm[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (hca_ptr->ib_trans.signal.scm[1] == DAPL_INVALID_SOCKET) + goto err1; + + ret = connect(hca_ptr->ib_trans.signal.scm[1], + (struct sockaddr *)&addr, sizeof(addr)); + if (ret) + goto err2; + + hca_ptr->ib_trans.signal.scm[0] = accept(listen_socket, NULL, NULL); + if (hca_ptr->ib_trans.signal.scm[0] == DAPL_INVALID_SOCKET) + goto err2; + + closesocket(listen_socket); + return 0; + + err2: + closesocket(hca_ptr->ib_trans.signal.scm[1]); + err1: + closesocket(listen_socket); + return 1; +} + +static void destroy_os_signal(IN DAPL_HCA * hca_ptr) +{ + closesocket(hca_ptr->ib_trans.signal.scm[0]); + closesocket(hca_ptr->ib_trans.signal.scm[1]); +} + +static int dapls_config_fd(int fd) +{ + int opts; + + opts = fcntl(fd, F_GETFL); + if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) { + dapl_log(DAPL_DBG_TYPE_ERR, + " dapls_config_fd: fcntl on fd %d ERR %d %s\n", + fd, opts, strerror(errno)); + return errno; + } + return 0; +} + +static int dapls_config_verbs(struct ibv_context *verbs) +{ + return dapls_config_fd(verbs->async_fd); +} + +static int dapls_config_comp_channel(struct ibv_comp_channel *channel) +{ + return dapls_config_fd(channel->fd); +} + +/* Need CQ for shadow QP's with one half usage */ +static ib_cq_handle_t dapls_create_empty_cq(struct ibv_context *ib_ctx) +{ + struct dcm_ib_cq *empty_cq; + + empty_cq = dapl_os_alloc(sizeof(struct dcm_ib_cq)); + if (!empty_cq) + return NULL; + dapl_os_memzero(empty_cq, sizeof(struct dcm_ib_cq)); + + empty_cq->cq = ibv_create_cq(ib_ctx, 1, NULL, NULL, 0); + if (!empty_cq->cq) { + dapl_os_free(empty_cq, sizeof(struct dcm_ib_cq)); + return NULL; + } + return empty_cq; +} + +/* + * dapls_ib_init, dapls_ib_release + * + * Initialize Verb related items for device open + * + * Input: + * none + * + * Output: + * none + * + * Returns: + * 0 success, -1 error + * + */ +int32_t dapls_ib_init(void) +{ + return 0; +} + +int32_t dapls_ib_release(void) +{ + return 0; +} + +/* + * dapls_ib_open_hca + * + * Open HCA + * + * Input: + * *hca_name pointer to provider device name + * *ib_hca_handle_p pointer to provide HCA handle + * + * Output: + * none + * + * Return: + * DAT_SUCCESS + * dapl_convert_errno + * + */ +DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, + IN DAPL_HCA * hca_ptr, + IN DAPL_OPEN_FLAGS flags) +{ + struct ibv_device **dev_list; + struct dat_mcm_addr *mcm_ia = (struct dat_mcm_addr *) &hca_ptr->hca_address; + struct ibv_port_attr port_attr; + int i, nd = 0; + DAT_RETURN dat_status = DAT_INTERNAL_ERROR; + + dapl_log(DAPL_DBG_TYPE_UTIL, " open_hca: %s %s - %p in %s\n", + PROVIDER_NAME, hca_name, hca_ptr, + flags & DAPL_OPEN_QUERY ? "QUERY MODE":"STD MODE"); + + /* Get list of all IB devices, find match, open */ + dev_list = ibv_get_device_list(&nd); + if (!dev_list) { + dapl_dbg_log(DAPL_DBG_TYPE_ERR, + " open_hca: ibv_get_device_list() failed\n", + hca_name); + return DAT_INTERNAL_ERROR; + } + dapl_log(DAPL_DBG_TYPE_UTIL, " open_hca %p: %d devices found\n", hca_ptr, nd); + for (i = 0; i < nd; ++i) { + if (!strcmp(dev_list[i]->name, hca_name)) { + hca_ptr->ib_trans.ib_dev = dev_list[i]; + goto found; + } + } + + dapl_log(DAPL_DBG_TYPE_ERR, " open_hca: device %s not found\n", hca_name); + dat_status = DAT_PROVIDER_NOT_FOUND; + goto err; + +found: + hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev); + if (!hca_ptr->ib_hca_handle) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: dev open failed for %s\n", + ibv_get_device_name(hca_ptr->ib_trans.ib_dev)); + goto err; + } + hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle; + dapls_config_verbs(hca_ptr->ib_hca_handle); + + /* get lid for this hca-port, network order */ + if (ibv_query_port(hca_ptr->ib_hca_handle, + (uint8_t)hca_ptr->port_num, &port_attr)) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: get lid ERR for %s, err=%s\n", + ibv_get_device_name(hca_ptr->ib_trans.ib_dev), + strerror(errno)); + dat_status = DAT_INVALID_ADDRESS; + goto bail; + } else { + if (port_attr.state != IBV_PORT_ACTIVE) { + dat_status = DAT_INVALID_ADDRESS; + goto bail; + } + hca_ptr->ib_trans.addr.lid = htons(port_attr.lid); + hca_ptr->ib_trans.lid = htons(port_attr.lid); + } + + /* get gid for this hca-port, network order */ + if (ibv_query_gid(hca_ptr->ib_hca_handle, + (uint8_t) hca_ptr->port_num, 0, + (union ibv_gid *)&hca_ptr->ib_trans.addr.gid)) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: query GID ERR for %s, err=%s\n", + ibv_get_device_name(hca_ptr->ib_trans.ib_dev), + strerror(errno)); + dat_status = DAT_INVALID_ADDRESS; + goto bail; + } + + /* set RC tunables via enviroment or default */ + hca_ptr->ib_trans.max_inline_send = + dapl_os_get_env_val("DAPL_MAX_INLINE", INLINE_SEND_IB_DEFAULT); + hca_ptr->ib_trans.ack_retry = + dapl_os_get_env_val("DAPL_ACK_RETRY", DCM_ACK_RETRY); + hca_ptr->ib_trans.ack_timer = + dapl_os_get_env_val("DAPL_ACK_TIMER", DCM_ACK_TIMER); + hca_ptr->ib_trans.rnr_retry = + dapl_os_get_env_val("DAPL_RNR_RETRY", DCM_RNR_RETRY); + hca_ptr->ib_trans.rnr_timer = + dapl_os_get_env_val("DAPL_RNR_TIMER", DCM_RNR_TIMER); + hca_ptr->ib_trans.global = + dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", DCM_GLOBAL); + hca_ptr->ib_trans.hop_limit = + dapl_os_get_env_val("DAPL_HOP_LIMIT", DCM_HOP_LIMIT); + hca_ptr->ib_trans.tclass = + dapl_os_get_env_val("DAPL_TCLASS", DCM_TCLASS); + hca_ptr->ib_trans.mtu = + dapl_ib_mtu(dapl_os_get_env_val("DAPL_IB_MTU", DCM_IB_MTU)); + + if (dapli_mix_open(&hca_ptr->ib_trans, hca_name, + hca_ptr->port_num, flags & DAPL_OPEN_QUERY)) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: SCIF init ERR for %s\n", + ibv_get_device_name(hca_ptr->ib_trans.ib_dev)); + goto bail; + } + + if (flags & DAPL_OPEN_QUERY) + goto done; + + /* initialize CM list, LISTEN, SND queue, PSP array, locks */ + if ((dapl_os_lock_init(&hca_ptr->ib_trans.lock)) != DAT_SUCCESS) + goto bail; + + if ((dapl_os_lock_init(&hca_ptr->ib_trans.llock)) != DAT_SUCCESS) + goto bail; + + if ((dapl_os_lock_init(&hca_ptr->ib_trans.slock)) != DAT_SUCCESS) + goto bail; + + if ((dapl_os_lock_init(&hca_ptr->ib_trans.plock)) != DAT_SUCCESS) + goto bail; + + if ((dapl_os_lock_init(&hca_ptr->ib_trans.cqlock)) != DAT_SUCCESS) + goto bail; + + /* EVD events without direct CQ channels, CNO support */ + hca_ptr->ib_trans.ib_cq = + ibv_create_comp_channel(hca_ptr->ib_hca_handle); + if (hca_ptr->ib_trans.ib_cq == NULL) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: ibv_create_comp_channel ERR %s\n", + strerror(errno)); + goto bail; + } + dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq); + + /* EVD to indirect CQ's, need empty CQ for half QP that is not used */ + hca_ptr->ib_trans.ib_cq_empty = dapls_create_empty_cq(hca_ptr->ib_hca_handle); + if (hca_ptr->ib_trans.ib_cq_empty == NULL) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: ERR: create_empty_cq = %s\n", + strerror(errno)); + goto bail; + } + + /* initialize CM and listen lists on this HCA uCM QP */ + dapl_llist_init_head(&hca_ptr->ib_trans.list); + dapl_llist_init_head(&hca_ptr->ib_trans.llist); + dapl_llist_init_head(&hca_ptr->ib_trans.cqlist); + + /* create uCM qp services */ + if (mcm_service_create(hca_ptr)) + goto bail; + + if (create_os_signal(hca_ptr)) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: failed to init cr pipe - %s\n", + strerror(errno)); + goto bail; + } + + /* create thread to process inbound connect request */ + hca_ptr->ib_trans.cm_state = IB_THREAD_INIT; + dat_status = dapl_os_thread_create(cm_thread, + (void *)hca_ptr, + &hca_ptr->ib_trans.thread); + if (dat_status != DAT_SUCCESS) { + dapl_log(DAPL_DBG_TYPE_ERR, + " open_hca: failed to create thread\n"); + goto bail; + } + + dapl_log(DAPL_DBG_TYPE_UTIL, + " open_hca: MCM devname %s port %d, dev_IP %s ep_map %s\n", + ibv_get_device_name(hca_ptr->ib_trans.ib_dev), + hca_ptr->port_num, + inet_ntoa(((struct sockaddr_in *) + &hca_ptr->hca_address)->sin_addr), + mcm_map_str(hca_ptr->ib_trans.addr.ep_map)); + + /* wait for cm_thread */ + while (hca_ptr->ib_trans.cm_state != IB_THREAD_RUN) + dapl_os_sleep_usec(1000); + +done: + /* save LID, GID, QPN, PORT address information, for ia_queries */ + /* Set AF_INET6 to insure callee address storage of 28 bytes */ + hca_ptr->ib_trans.hca = hca_ptr; + hca_ptr->ib_trans.addr.family = AF_INET6; + hca_ptr->ib_trans.addr.qp_type = IBV_QPT_UD; + memcpy(&hca_ptr->hca_address, + &hca_ptr->ib_trans.addr, + sizeof(struct dat_mcm_addr)); + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + "%s open: dev %s port %d, GID %s, LID %x qpn %x sl %d %s\n", + PROVIDER_NAME, hca_name, hca_ptr->port_num, + inet_ntop(AF_INET6, &mcm_ia->gid, gid_str, sizeof(gid_str)), + ntohs(mcm_ia->lid), ntohl(mcm_ia->qpn), + mcm_ia->sl, mcm_map_str(mcm_ia->ep_map)); + + ibv_free_device_list(dev_list); + return DAT_SUCCESS; +bail: + mcm_service_destroy(hca_ptr); + ibv_close_device(hca_ptr->ib_hca_handle); + hca_ptr->ib_hca_handle = IB_INVALID_HANDLE; + hca_ptr->ib_trans.ib_ctx = NULL; + hca_ptr->ib_trans.ib_dev = NULL; + +err: + ibv_free_device_list(dev_list); + return dat_status; +} + +/* + * dapls_ib_close_hca + * + * Open HCA + * + * Input: + * DAPL_HCA provide CA handle + * + * Output: + * none + * + * Return: + * DAT_SUCCESS + * dapl_convert_errno + * + */ +DAT_RETURN dapls_ib_close_hca(IN DAPL_HCA * hca_ptr) +{ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " close_hca: %p\n", hca_ptr); + + if (!hca_ptr->ib_trans.cm_state) /* thread never started */ + goto done; + + if (hca_ptr->ib_trans.cm_state == IB_THREAD_RUN) { + hca_ptr->ib_trans.cm_state = IB_THREAD_CANCEL; + dapls_thread_signal(&hca_ptr->ib_trans.signal); + while (hca_ptr->ib_trans.cm_state != IB_THREAD_EXIT) { + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " close_hca: waiting for cr_thread\n"); + dapls_thread_signal(&hca_ptr->ib_trans.signal); + dapl_os_sleep_usec(1000); + } + } + + dapli_mix_close(&hca_ptr->ib_trans); + dapl_os_lock_destroy(&hca_ptr->ib_trans.lock); + dapl_os_lock_destroy(&hca_ptr->ib_trans.llock); + dapl_os_lock_destroy(&hca_ptr->ib_trans.cqlock); + destroy_os_signal(hca_ptr); + mcm_service_destroy(hca_ptr); +done: + if (hca_ptr->ib_trans.ib_cq) + ibv_destroy_comp_channel(hca_ptr->ib_trans.ib_cq); + + if (hca_ptr->ib_trans.ib_cq_empty) { + struct ibv_comp_channel *channel; + channel = hca_ptr->ib_trans.ib_cq_empty->cq->channel; + ibv_destroy_cq(hca_ptr->ib_trans.ib_cq_empty->cq); + if (channel) + ibv_destroy_comp_channel(channel); + } + + if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) { + if (ibv_close_device(hca_ptr->ib_hca_handle)) + return (dapl_convert_errno(errno, "ib_close_device")); + hca_ptr->ib_hca_handle = IB_INVALID_HANDLE; + } + + return (DAT_SUCCESS); +} + +/* Create uCM endpoint services, allocate remote_ah's array */ +static void mcm_service_destroy(IN DAPL_HCA *hca) +{ + ib_hca_transport_t *tp = &hca->ib_trans; + int msg_size = sizeof(ib_cm_msg_t); + + if (tp->mr_sbuf) + ibv_dereg_mr(tp->mr_sbuf); + + if (tp->mr_rbuf) + ibv_dereg_mr(tp->mr_rbuf); + + if (tp->qp) + ibv_destroy_qp(tp->qp); + + if (tp->scq) + ibv_destroy_cq(tp->scq); + + if (tp->rcq) + ibv_destroy_cq(tp->rcq); + + if (tp->rch) { + tp->rch_fd = 0; + ibv_destroy_comp_channel(tp->rch); + } + + if (tp->ah) { + int i; + + for (i = 0;i < 0xffff; i++) { + if (tp->ah[i]) + ibv_destroy_ah(tp->ah[i]); + } + dapl_os_free(tp->ah, (sizeof(*tp->ah) * 0xffff)); + } + + if (tp->pd) + ibv_dealloc_pd(tp->pd); + + if (tp->sid) + dapl_os_free(tp->sid, (sizeof(*tp->sid) * 0xffff)); + + if (tp->rbuf) + dapl_os_free(tp->rbuf, (msg_size * tp->qpe)); + + if (tp->sbuf) + dapl_os_free(tp->sbuf, (msg_size * tp->qpe)); +} + +static int mcm_service_create(IN DAPL_HCA *hca) +{ + struct ibv_qp_init_attr qp_create; + ib_hca_transport_t *tp = &hca->ib_trans; + struct ibv_recv_wr recv_wr, *recv_err; + struct ibv_sge sge; + int i, mlen = sizeof(dat_mcm_msg_t); + int hlen = sizeof(struct ibv_grh); /* hdr included with UD recv */ + char *rbuf; + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " mcm_create: \n"); + + /* setup CM timers and queue sizes */ + tp->retries = dapl_os_get_env_val("DAPL_UCM_RETRY", MCM_RETRY_CNT); + tp->rep_time = dapl_os_get_env_val("DAPL_UCM_REP_TIME", MCM_REP_TIME); + tp->rtu_time = dapl_os_get_env_val("DAPL_UCM_RTU_TIME", MCM_RTU_TIME); + tp->cm_timer = DAPL_MIN(tp->rep_time,tp->rtu_time); + tp->qpe = dapl_os_get_env_val("DAPL_UCM_QP_SIZE", DCM_QP_SIZE); + tp->cqe = dapl_os_get_env_val("DAPL_UCM_CQ_SIZE", DCM_CQ_SIZE); + tp->burst = dapl_os_get_env_val("DAPL_UCM_TX_BURST", DCM_TX_BURST); + + /* CM service via MPXYD, no need for local IB UD CM service */ + if (tp->scif_ep) + return 0; + + tp->pd = ibv_alloc_pd(hca->ib_hca_handle); + if (!tp->pd) + goto bail; + + dapl_log(DAPL_DBG_TYPE_UTIL, + " create_service: pd %p ctx %p handle 0x%x\n", + tp->pd, tp->pd->context, tp->pd->handle); + + tp->rch = ibv_create_comp_channel(hca->ib_hca_handle); + if (!tp->rch) + goto bail; + dapls_config_comp_channel(tp->rch); + tp->rch_fd = tp->rch->fd; + + tp->scq = ibv_create_cq(hca->ib_hca_handle, tp->cqe, hca, NULL, 0); + if (!tp->scq) + goto bail; + + tp->rcq = ibv_create_cq(hca->ib_hca_handle, tp->cqe, hca, tp->rch, 0); + if (!tp->rcq) + goto bail; + + if(ibv_req_notify_cq(tp->rcq, 0)) + goto bail; + + dapl_os_memzero((void *)&qp_create, sizeof(qp_create)); + qp_create.qp_type = IBV_QPT_UD; + qp_create.send_cq = tp->scq; + qp_create.recv_cq = tp->rcq; + qp_create.cap.max_send_wr = qp_create.cap.max_recv_wr = tp->qpe; + qp_create.cap.max_send_sge = qp_create.cap.max_recv_sge = 1; + qp_create.cap.max_inline_data = tp->max_inline_send; + qp_create.qp_context = (void *)hca; + + tp->qp = ibv_create_qp(tp->pd, &qp_create); + if (!tp->qp) + goto bail; + + tp->ah = (ib_ah_handle_t*) dapl_os_alloc(sizeof(ib_ah_handle_t) * 0xffff); + tp->sid = (uint8_t*) dapl_os_alloc(sizeof(uint8_t) * 0xffff); + tp->rbuf = (void*) dapl_os_alloc((mlen + hlen) * tp->qpe); + tp->sbuf = (void*) dapl_os_alloc(mlen * tp->qpe); + tp->s_hd = tp->s_tl = 0; + + if (!tp->ah || !tp->rbuf || !tp->sbuf || !tp->sid) + goto bail; + + (void)dapl_os_memzero(tp->ah, (sizeof(ib_ah_handle_t) * 0xffff)); + (void)dapl_os_memzero(tp->sid, (sizeof(uint8_t) * 0xffff)); + tp->sid[0] = 1; /* resv slot 0, 0 == no ports available */ + (void)dapl_os_memzero(tp->rbuf, ((mlen + hlen) * tp->qpe)); + (void)dapl_os_memzero(tp->sbuf, (mlen * tp->qpe)); + + tp->mr_sbuf = ibv_reg_mr(tp->pd, tp->sbuf, + (mlen * tp->qpe), + IBV_ACCESS_LOCAL_WRITE); + if (!tp->mr_sbuf) + goto bail; + + tp->mr_rbuf = ibv_reg_mr(tp->pd, tp->rbuf, + ((mlen + hlen) * tp->qpe), + IBV_ACCESS_LOCAL_WRITE); + if (!tp->mr_rbuf) + goto bail; + + /* modify UD QP: init, rtr, rts */ + if ((dapls_modify_qp_ud(hca, tp->qp)) != DAT_SUCCESS) + goto bail; + + /* post receive buffers, setup head, tail pointers */ + recv_wr.next = NULL; + recv_wr.sg_list = &sge; + recv_wr.num_sge = 1; + sge.length = mlen + hlen; + sge.lkey = tp->mr_rbuf->lkey; + rbuf = (char *) tp->rbuf; + + for (i = 0; i < tp->qpe; i++) { + recv_wr.wr_id = (uintptr_t) (rbuf + hlen); + sge.addr = (uintptr_t) rbuf; + if (ibv_post_recv(tp->qp, &recv_wr, &recv_err)) + goto bail; + rbuf += sge.length; + } + + /* save qp_num as part of ia_address, network order */ + tp->addr.qpn = htonl(tp->qp->qp_num); + return 0; +bail: + dapl_log(DAPL_DBG_TYPE_ERR, + " ucm_create_services: ERR %s\n", strerror(errno)); + mcm_service_destroy(hca); + return -1; +} + diff --git a/dapl/openib_mcm/linux/openib_osd.h b/dapl/openib_mcm/linux/openib_osd.h new file mode 100644 index 0000000..357d2e0 --- /dev/null +++ b/dapl/openib_mcm/linux/openib_osd.h @@ -0,0 +1,37 @@ +#ifndef OPENIB_OSD_H +#define OPENIB_OSD_H + +#include +#include + +#if __BYTE_ORDER == __BIG_ENDIAN +#define htonll(x) (x) +#define ntohll(x) (x) +#elif __BYTE_ORDER == __LITTLE_ENDIAN +#define htonll(x) bswap_64(x) +#define ntohll(x) bswap_64(x) +#endif +#ifndef STATIC +#define STATIC static +#endif /* STATIC */ +#ifndef _INLINE_ +#define _INLINE_ __inline__ +#endif /* _INLINE_ */ + +#define DAPL_SOCKET int +#define DAPL_INVALID_SOCKET -1 +#define DAPL_FD_SETSIZE 8192 + +#define closesocket close + +struct dapl_thread_signal +{ + DAPL_SOCKET scm[2]; +}; + +STATIC _INLINE_ void dapls_thread_signal(struct dapl_thread_signal *signal) +{ + send(signal->scm[1], "w", sizeof "w", 0); +} + +#endif // OPENIB_OSD_H diff --git a/dapl/openib_mcm/mix.c b/dapl/openib_mcm/mix.c new file mode 100644 index 0000000..c9c9672 --- /dev/null +++ b/dapl/openib_mcm/mix.c @@ -0,0 +1,1293 @@ +/* + * Copyright (c) 2009-2014 Intel Corporation. All rights reserved. + * + * This Software is licensed under one of the following licenses: + * + * 1) under the terms of the "Common Public License 1.0" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/cpl.php. + * + * 2) under the terms of the "The BSD License" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/bsd-license.php. + * + * 3) under the terms of the "GNU General Public License (GPL) Version 2" a + * copy of which is available from the Open Source Initiative, see + * http://www.opensource.org/licenses/gpl-license.php. + * + * Licensee has the right to choose one of the above licenses. + * + * Redistributions of source code must retain the above copyright + * notice and one of the license notices. + * + * Redistributions in binary form must reproduce both the above copyright + * notice, one of the license notices in the documentation + * and/or other materials provided with the distribution. + */ + +#include "dapl.h" +#include "dapl_adapter_util.h" +#include "dapl_evd_util.h" +#include "dapl_cr_util.h" +#include "dapl_name_service.h" +#include "dapl_ib_util.h" +#include "dapl_ep_util.h" +#include "dapl_osd.h" + +/* + * CM proxy services, MCM on MIC to MPXYD via SCIF + * + * MIX_IA_OPEN + */ +int dapli_mix_open(ib_hca_transport_t *tp, char *name, int port, int query_only) +{ + int ret, len; + dat_mix_open_t msg; + scif_epd_t listen_ep; + int listen_port; + int always_proxy; + int scif_port_id; + + /* make MPXY connection even not running on MIC. good for debugging */ + always_proxy = dapl_os_get_env_val("DAPL_MCM_ALWAYS_PROXY", 0); + scif_port_id = dapl_os_get_env_val("DAPL_MCM_PORT_ID", SCIF_OFED_PORT_8); + + ret = scif_get_nodeIDs(NULL, 0, &tp->self.node); + if (ret < 0) { + dapl_log(1, " scif_get_nodeIDs() failed with error %s\n", strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " SCIF node_id: %d client req_id 0x%x\n", + (uint16_t)tp->self.node, dapl_os_getpid()); + + if (tp->self.node == 0) + tp->addr.ep_map = HOST_SOCK_DEV; /* non-MIC mapping */ + + if (query_only || (tp->self.node == 0 && !always_proxy)){ + dapl_log(DAPL_DBG_TYPE_EXTENSION," Not running on MIC, no MPXY connect required\n"); + tp->scif_ep = 0; + return 0; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Running on MIC, MPXY connect required\n"); + + /* Create an endpoint for MPXYD to connect back */ + listen_ep = scif_open(); + if (listen_ep < 0) { + dapl_log(1, "scif_open() failed with error %s\n", strerror(errno)); + return -1; + } + + listen_port = scif_bind(listen_ep, 0); + if (listen_port < 0) { + dapl_log(1, "scif_listen() failed with error %s\n", strerror(errno)); + return -1; + } + + ret = scif_listen(listen_ep, 2); + if (ret < 0) { + dapl_log(1, "scif_listen() failed with error %s\n", strerror(errno)); + return -1; + } + + /* MPXYD is running on node 0 and well-known OFED port */ + tp->peer.node = 0; + tp->peer.port = scif_port_id; + + tp->scif_ep = scif_open(); + if (tp->scif_ep < 0) { + dapl_log(1, "scif_open() failed with error %s\n", strerror(errno)); + return -1; + } + ret = scif_connect(tp->scif_ep, &tp->peer); + if (ret < 0) { + dapl_log(1, "scif_connect() to port %d, failed with error %s\n", + scif_port_id, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, "Connected to node 0 for operations, ep=%d\n", tp->scif_ep); + + len = sizeof(listen_port); + ret = scif_send(tp->scif_ep, &listen_port, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: OPEN EP's send on %d, ret %d, exp %d, error %s\n", + tp->scif_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent listen port number (%d) on SCIF EP\n", listen_port); + + ret = scif_accept(listen_ep, &tp->peer_ev, &tp->scif_ev_ep, SCIF_ACCEPT_SYNC); + if (ret < 0) { + dapl_log(1, "scif_accept() for ev_ep failed with error %s\n", strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Accepted Event EP (%d)\n", tp->scif_ev_ep); + ret = scif_accept(listen_ep, &tp->peer_tx, &tp->scif_tx_ep, SCIF_ACCEPT_SYNC); + if (ret < 0) { + dapl_log(1, "scif_accept() for tx_ep failed with error %s\n", strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Accepted TX EP (%d)\n", tp->scif_tx_ep); + ret = scif_close(listen_ep); + if (ret < 0) { + dapl_log(1, "scif_close() failed with error %d\n", strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, "Connected to node 0 for DATA, tx_ep=%d \n", tp->scif_tx_ep); + + /* MIX_IA_OPEN: device name and port */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_IA_OPEN; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = dapl_os_getpid(); + msg.port = port; + strcpy((char*)&msg.name, name); + memcpy(&msg.dev_attr, (void*)&tp->ack_timer, sizeof(dat_mix_dev_attr_t)); + + len = sizeof(dat_mix_open_t); + ret = scif_send(tp->scif_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op),tp->scif_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP %d, req_id 0x%x\n", + mix_op_str(msg.hdr.op), tp->scif_ep, ntohl(msg.hdr.req_id)); + + /* MIX_IA_OPEN: reply includes addr info */ + ret = scif_recv(tp->scif_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: dev_open reply ep %d, ret %d, exp %d, error %s\n", + tp->scif_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Recv'd %s reply on SCIF EP %d, dev_id %d\n", + mix_op_str(msg.hdr.op), tp->scif_ep, msg.hdr.req_id); + + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_IA_OPEN || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " ERR: dev_open ver (exp %d rcv %d), op %s, flgs %d, st %d dev_id %d\n", + DAT_MIX_VER, msg.hdr.ver, mix_op_str(msg.hdr.op), + msg.hdr.flags, msg.hdr.status, msg.hdr.req_id); + return -1; + } + /* save address to transport object, keeps IA queries local */ + memcpy((void*)&tp->addr, (void*)&msg.dev_addr, sizeof(dat_mcm_addr_t)); + tp->dev_id = msg.hdr.req_id; + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " mix_open reply (msg %p, ln %d) EPs %d %d %d - dev_id %d\n", + &msg, len, tp->scif_ep, tp->scif_ev_ep, + tp->scif_tx_ep, tp->dev_id); + return 0; +} + +/* MIX_IA_CLOSE - no operation, just shutdown endpoint(s) */ +void dapli_mix_close(ib_hca_transport_t *tp) +{ + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_IA_CLOSE: tp %p scif EP's %d,%d,%d dev_id %d\n", + tp, tp->scif_ep, tp->scif_tx_ep, tp->scif_ev_ep, tp->dev_id); + + if (tp->scif_ep) { + scif_close(tp->scif_ep); + tp->scif_ep = 0; + } + if (tp->scif_tx_ep) { + scif_close(tp->scif_tx_ep); + tp->scif_tx_ep = 0; + } + if (tp->scif_ev_ep) { + scif_close(tp->scif_ev_ep); + tp->scif_ev_ep = 0; + } +} + +/* MIX_LISTEN */ +int dapli_mix_listen(dp_ib_cm_handle_t cm, uint16_t sid) +{ + dat_mix_listen_t msg; + scif_epd_t mix_ep = cm->hca->ib_trans.scif_ep; + int ret, len; + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_LISTEN port 0x%x htons(0x%x), %d - client req_id 0x%x\n", + sid, htons(sid), sid, htonl(dapl_os_getpid())); + + /* listen request: sid and backlog */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_LISTEN; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = cm->hca->ib_trans.dev_id; + msg.sp_ctx = (uint64_t)cm->sp; + msg.sid = sid; + msg.backlog = 64; + + len = sizeof(dat_mix_listen_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s msg %p send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), &msg, mix_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP %d\n", mix_op_str(msg.hdr.op), mix_ep); + + /* listen response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", mix_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Recv'd %s reply on SCIF EP %d for dev_id %d\n", + mix_op_str(msg.hdr.op), mix_ep, msg.hdr.req_id); + + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_LISTEN || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " ERR: MIX_LISTEN ver %d, op %s, flgs %d, st %d dev_id %d\n", + msg.hdr.ver, mix_op_str(msg.hdr.op), + msg.hdr.flags, msg.hdr.status, msg.hdr.req_id); + if (msg.hdr.status != MIX_SUCCESS) + return msg.hdr.status; + else + return -1; + } + + dapl_log(DAPL_DBG_TYPE_EXTENSION," MIX_LISTEN successful on SCIF EP %d\n", mix_ep); + return 0; +} + +/* MIX_LISTEN_FREE */ +int dapli_mix_listen_free(dp_ib_cm_handle_t cm) +{ + dat_mix_hdr_t msg; + scif_epd_t mix_ep = cm->hca->ib_trans.scif_ep; + int ret, len; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," mix_listen_free port 0x%x htons(0x%x), %d\n", + (uint16_t)cm->sp->conn_qual, htons((uint16_t)cm->sp->conn_qual), + (uint16_t)cm->sp->conn_qual); + + /* listen free request */ + msg.ver = DAT_MIX_VER; + msg.op = MIX_LISTEN_FREE; + msg.status = 0; + msg.flags = MIX_OP_REQ; + msg.req_id = (uint16_t)cm->sp->conn_qual; + + len = sizeof(dat_mix_hdr_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.op)); + + /* listen free response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.ver != DAT_MIX_VER || msg.op != MIX_LISTEN_FREE || + msg.flags != MIX_OP_RSP || msg.status != MIX_SUCCESS) { + dapl_log(1, " MIX_LISTEN_FREE: sid 0x%x, ver %d, op %d, flags %d, or stat %d ERR \n", + (uint16_t)cm->sp->conn_qual, msg.ver, msg.op, msg.flags, msg.status); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," received successful reply on SCIF EP\n"); + return 0; +} + +/* MIX_LMR_CREATE */ +int dapli_mix_mr_create(ib_hca_transport_t *tp, DAPL_LMR * lmr) +{ + dat_mix_mr_t msg; + scif_epd_t mix_ep = tp->scif_ep; + int ret, len; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," lmr create %p, addr %p %p rmr_context %x mr->rkey %x\n", + lmr, lmr->mr_handle->addr, lmr->param.registered_address, + lmr->param.rmr_context, lmr->mr_handle->rkey ); + + /* request: */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_MR_CREATE; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = tp->dev_id; + msg.mr_id = 0; + msg.mr_len = lmr->param.registered_size; + msg.sci_addr = lmr->sci_addr; + msg.sci_off = lmr->sci_off; + msg.ib_addr = (uint64_t) lmr->mr_handle->addr; + msg.ib_rkey = lmr->param.rmr_context; + msg.ctx = (uint64_t)lmr; + + + len = sizeof(dat_mix_mr_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* response, status and mr_id */ + len = sizeof(dat_mix_mr_t); + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_MR_CREATE || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n", + msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status); + return -1; + } + + /* save the MPXYD mr_id */ + lmr->mr_id = msg.mr_id; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," lmr_created %p id = %d\n", lmr, lmr->mr_id); + return 0; +} + +/* MIX_LMR_FREE */ +int dapli_mix_mr_free(ib_hca_transport_t *tp, DAPL_LMR * lmr) +{ + dat_mix_mr_t msg; + scif_epd_t mix_ep = tp->scif_ep; + int ret, len; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," lmr free %p, id=%d\n", lmr, lmr->mr_id); + + /* request */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_MR_FREE; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.mr_id = lmr->mr_id; + + len = sizeof(dat_mix_mr_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* response, status only */ + len = sizeof(dat_mix_hdr_t); + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_MR_FREE || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n", + msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," removed lmr %p, id %d\n", lmr, lmr->mr_id); + return 0; +} + + +/* MIX_QP_CREATE */ +int dapli_mix_qp_create(ib_qp_handle_t m_qp, struct ibv_qp_init_attr *attr, + ib_cq_handle_t req_cq, ib_cq_handle_t rcv_cq) +{ + dat_mix_qp_t msg; + scif_epd_t mix_ep = m_qp->tp->scif_ep; + int ret, len; + + /* request: QP_r local or shadowed, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_QP_CREATE; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_qp->tp->dev_id; + + if (m_qp->qp) { /* QP_r local */ + msg.qp_r.qp_num = m_qp->qp->qp_num; + msg.qp_r.qp_type = m_qp->qp->qp_type; + msg.qp_r.state = m_qp->qp->state; + } else { /* QP_r shadowed on proxy */ + msg.qp_r.qp_num = 0; + msg.qp_r.qp_type = 0; + msg.qp_r.state = 0; + } + msg.qp_r.rcq_id = rcv_cq->cq_id; + msg.qp_r.ctx = (uint64_t)m_qp; + msg.qp_r.qp_id = 0; /* for now */ + msg.qp_r.qp_type = attr->qp_type; + msg.qp_r.max_recv_wr = attr->cap.max_recv_wr; + msg.qp_r.max_recv_sge = attr->cap.max_recv_sge; + msg.qp_r.max_send_wr = attr->cap.max_send_wr; + msg.qp_r.max_send_sge = attr->cap.max_send_sge; + + msg.qp_t.qp_type = attr->qp_type; + msg.qp_t.max_inline_data = attr->cap.max_inline_data; + msg.qp_t.max_send_wr = attr->cap.max_send_wr; + msg.qp_t.max_send_sge = attr->cap.max_send_sge; + msg.qp_t.max_recv_wr = attr->cap.max_recv_wr; + msg.qp_t.max_recv_sge = attr->cap.max_recv_sge; + msg.qp_t.scq_id = req_cq->cq_id; /* QP_t always shadowed on proxy */ + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_QP_CREATE: QP_r - qpn 0x%x, ctx %p, rq %d,%d sq %d,%d rcq_id %d,%p\n", + msg.qp_r.qp_num, msg.qp_r.ctx, msg.qp_r.max_recv_wr, + msg.qp_r.max_recv_sge, msg.qp_r.max_send_wr, + msg.qp_r.max_send_sge, msg.qp_r.rcq_id, rcv_cq); + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_QP_CREATE: QP_t - wr %d sge %d inline %d scq_id %d,%p\n", + msg.qp_t.max_send_wr, msg.qp_t.max_send_sge, + msg.qp_t.max_inline_data, msg.qp_t.scq_id, req_cq); + + len = sizeof(dat_mix_qp_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + return EFAULT; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* wait for response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", mix_ep, ret, len, strerror(errno)); + return EFAULT; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_QP_CREATE || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " MIX msg ver %d, op %d, flags %d, or stat %d ERR \n", + msg.hdr.ver, msg.hdr.op, msg.hdr.flags, msg.hdr.status); + if (msg.hdr.status) + return msg.hdr.status; + else + return EINVAL; + } + + /* save QP_t id, QP is shadowed TX */ + m_qp->qp_id = msg.qp_t.qp_id; + m_qp->m_inline = msg.m_inline; + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_QP_CREATE: reply, proxy qp_id 0x%x\n", m_qp->qp_id); + + return 0; +} + +/* MIX_EP_FREE, fits in header */ +int dapli_mix_qp_free(ib_qp_handle_t m_qp) +{ + dat_mix_hdr_t msg; + scif_epd_t mix_ep = m_qp->tp->scif_ep; + int ret, len; + + /* request */ + msg.ver = DAT_MIX_VER; + msg.op = MIX_QP_FREE; + msg.status = 0; + msg.flags = MIX_OP_REQ; + msg.req_id = m_qp->qp_id; /* shadowed QP */ + + len = sizeof(dat_mix_hdr_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.op)); + + /* response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.ver != DAT_MIX_VER || msg.op != MIX_QP_FREE || + msg.flags != MIX_OP_RSP || msg.status != MIX_SUCCESS) { + dapl_log(1, " MIX_QP_FREE ERR: ver %d, op %d, flags %d, or stat %d len %d\n", + msg.ver, msg.op, msg.flags, msg.status, ret); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," received reply on SCIF EP\n"); + return 0; +} + +/* MIX_CQ_CREATE */ +int dapli_mix_cq_create(ib_cq_handle_t m_cq, int cq_len) +{ + dat_mix_cq_t msg; + scif_epd_t mix_ep = m_cq->tp->scif_ep; + int ret, len; + + /* request: QP_r local, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CQ_CREATE; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cq->tp->dev_id; + msg.cq_len = cq_len; + msg.cq_ctx = (uint64_t)m_cq; + msg.cq_id = 0; + + len = sizeof(dat_mix_cq_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s snd on %d, ret %d, exp %d, err %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, + strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", + mix_op_str(msg.hdr.op)); + + /* wait for response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on ep %d, ret %d, exp %d, err %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_CQ_CREATE || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " ERR: %s %p ver %d, op %d, flags %d, stat %d\n", + mix_op_str(msg.hdr.op), m_cq, msg.hdr.ver, + msg.hdr.op, msg.hdr.flags, msg.hdr.status); + return -1; + } + + /* save id from proxy CQ create */ + m_cq->cq_id = msg.cq_id; + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CQ_CREATE: reply, proxy cq_id 0x%x\n", m_cq->cq_id); + return 0; +} + +/* MIX_CQ_FREE, fits in header */ +int dapli_mix_cq_free(ib_cq_handle_t m_cq) +{ + dat_mix_hdr_t msg; + scif_epd_t mix_ep = m_cq->tp->scif_ep; + int ret, len; + + /* request */ + msg.ver = DAT_MIX_VER; + msg.op = MIX_CQ_FREE; + msg.status = 0; + msg.flags = MIX_OP_REQ; + msg.req_id = m_cq->cq_id; + + len = sizeof(dat_mix_hdr_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", + mix_op_str(msg.op)); + + /* response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: rcv on new_ep %d, ret %d, exp %d, error %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.ver != DAT_MIX_VER || msg.op != MIX_CQ_FREE || + msg.flags != MIX_OP_RSP || msg.status != MIX_SUCCESS) { + dapl_log(1, " MIX_CQ_FREE ERR: ver %d, op %d, flags %d, or stat %d ln %d\n", + msg.ver, msg.op, msg.flags, msg.status, ret); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CQ_FREE: reply, proxy cq_id 0x%x\n", m_cq->cq_id); + return 0; +} + +int dapli_mix_cq_poll(ib_cq_handle_t m_cq, struct ibv_wc *wc) +{ + /* MPXYD will send event and update EVD, return empty to avoid unnecessary SCIF traffic */ + return 0; +} + +/* SCIF DMA outbound writes and inbound msg receives; translate to scif_off via LMR */ +/* TODO: faster translation for post_send? */ +static inline int mix_proxy_data(ib_qp_handle_t m_qp, dat_mix_sr_t *msg, struct ibv_sge *sglist, int txlen, int mix_ep) +{ + off_t l_off; + uint64_t addr; + struct dapl_lmr *lmr = NULL; + int i, len; + + for (i=0; i < msg->wr.num_sge ; i++) { + dapl_log(DAPL_DBG_TYPE_EXTENSION, " mix_proxy_data: post_%s: sge[%d] addr %p, len %d\n", + msg->wr.opcode == OP_RECEIVE ? "recv":"send", + i, sglist[i].addr, sglist[i].length); + + /* find LMR with lkey to get scif_off for scif_read_from */ + l_off = 0; + + if (!lmr || (lmr && (lmr->mr_handle->lkey != sglist[i].lkey))) + lmr = dapl_llist_peek_head(&m_qp->ep->header.owner_ia->lmr_list_head); + + while (lmr) { + if (lmr->mr_handle->lkey == sglist[i].lkey) { + len = sglist[i].length; + addr = sglist[i].addr; + l_off = lmr->sci_addr + lmr->sci_off + (addr - lmr->param.registered_address); + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " mix_proxy_data: LMR (%p) lkey %x sci_addr %p off %x l_off %p addr %p len %d\n", + lmr, lmr->mr_handle->lkey, lmr->sci_addr, lmr->sci_off, l_off, addr, len); + break; + } + lmr = dapl_llist_next_entry(&lmr->header.owner_ia->lmr_list_head, + &lmr->header.ia_list_entry); + } + if (l_off) { + msg->sge[i].length = len; + msg->sge[i].addr = l_off; + msg->sge[i].lkey = 0; + } else + return -1; /* no translation */ + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," mix_proxy_data: return \n"); + return 0; +} + +/**** speed path ****/ +int dapli_mix_post_send(ib_qp_handle_t m_qp, int txlen, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr) +{ + char cmd[DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX]; + dat_mix_sr_t *msg = (dat_mix_sr_t *)cmd; + scif_epd_t mix_ep = m_qp->tp->scif_ep; + int ret, i, offset = sizeof(dat_mix_sr_t); + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " mix_post_send: msg=%p sge=%d len=%d op=%d off=%d (%p)raddr %Lx rkey 0x%x, wr_id %LX\n", + msg, wr->num_sge, txlen, wr->opcode, offset, &wr->wr.rdma.remote_addr, + wr->wr.rdma.remote_addr, wr->wr.rdma.rkey, wr->wr_id); + + if (wr->opcode != IBV_WR_SEND && + wr->opcode != IBV_WR_RDMA_WRITE && + wr->opcode != IBV_WR_RDMA_WRITE_WITH_IMM) + return EINVAL; + + msg->hdr.ver = DAT_MIX_VER; + msg->hdr.op = MIX_SEND; + msg->hdr.status = 0; + msg->hdr.flags = MIX_OP_REQ; + msg->hdr.req_id = m_qp->tp->dev_id; + msg->len = txlen; + msg->qp_id = m_qp->qp_id; + mcm_const_mix_wr(&msg->wr, wr); + + if (txlen > m_qp->m_inline) { + if (mix_proxy_data(m_qp, msg, wr->sg_list, txlen, mix_ep)) + return EINVAL; + } else { + msg->hdr.flags |= MIX_OP_INLINE; + for (i=0; i < wr->num_sge; i++) { + memcpy(&cmd[offset], (void*)wr->sg_list[i].addr, wr->sg_list[i].length); + offset += wr->sg_list[i].length; + } + } + + ret = scif_send(mix_ep, msg, offset, SCIF_SEND_BLOCK); + if (ret != offset) { + dapl_log(1, " ERR: %s on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg->hdr.op), mix_ep, ret, + offset, strerror(errno)); + return -1; + } + + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent MIX_SEND on SCIF EP %d, mlen=%d\n", mix_ep, offset); + return 0; +} + +int dapli_mix_post_recv(ib_qp_handle_t m_qp, int len, struct ibv_recv_wr *wr, struct ibv_recv_wr **bad_wr) +{ + char cmd[DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX]; + dat_mix_sr_t *msg = (dat_mix_sr_t *)cmd; + scif_epd_t mix_ep = m_qp->tp->scif_ep; + int ret; + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " mix_post_recv: msg=%p sge=%d len=%d wr_id %LX, addr %p lkey 0x%x\n", + msg, wr->num_sge, len, wr->wr_id, wr->sg_list[0].addr, wr->sg_list[0].lkey); + + if (wr->num_sge > DAT_MIX_SGE_MAX) + return EINVAL; + + msg->hdr.ver = DAT_MIX_VER; + msg->hdr.op = MIX_RECV; + msg->hdr.status = 0; + msg->hdr.flags = MIX_OP_REQ; + msg->hdr.req_id = m_qp->tp->dev_id; + msg->len = len; + msg->qp_id = m_qp->qp_id; /* shadowed RX */ + + /* setup work request */ + memset((void*)&msg->wr, 0, sizeof(dat_mix_wr_t)); + msg->wr.opcode = OP_RECEIVE; + msg->wr.wr_id = wr->wr_id; + msg->wr.num_sge = wr->num_sge; + + if (mix_proxy_data(m_qp, msg, wr->sg_list, len, mix_ep)) + return EINVAL; + + ret = scif_send(mix_ep, msg, sizeof(dat_mix_sr_t), SCIF_SEND_BLOCK); + if (ret != sizeof(dat_mix_sr_t)) { + dapl_log(1, " ERR: %s on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg->hdr.op), mix_ep, ret, + sizeof(dat_mix_sr_t), strerror(errno)); + return -1; + } + + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent MIX_RECV on SCIF EP %d, mlen=%d\n", mix_ep, sizeof(dat_mix_sr_t)); + return 0; +} + + +/* MIX CM operations: + * + * Event/CM channel (scif_ev_ep) for events and CM messages + * This channel is used via CM Thread context, separate from user thread context for OPs + * Separate EP's per thread too avoid locking overhead on SCIF streams + */ + +dp_ib_cm_handle_t dapli_mix_get_cm(ib_hca_transport_t *tp, uint64_t cm_ctx) +{ + dp_ib_cm_handle_t cm = NULL; + + dapl_os_lock(&tp->lock); + if (!dapl_llist_is_empty(&tp->list)) + cm = dapl_llist_peek_head(&tp->list); + + while (cm) { + if (cm == (void*)cm_ctx) + break; + + cm = dapl_llist_next_entry(&tp->list, &cm->local_entry); + } + dapl_os_unlock(&tp->lock); + return cm; +} + +/* CM_REP operation, user context, op channel */ +int dapli_mix_cm_rep_out(dp_ib_cm_handle_t m_cm, int p_size, void *p_data) +{ + dat_mix_cm_t msg; + scif_epd_t mix_ep = m_cm->tp->scif_ep; /* op channel */ + int ret, len; + + /* request: QP_r local, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CM_ACCEPT; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cm->tp->dev_id; + msg.qp_id = m_cm->ep->qp_handle->qp_id; /* QP2 shadowed TX */ + msg.cm_id = m_cm->cm_id; + msg.cm_ctx = (uint64_t)m_cm->cm_ctx; + msg.sp_ctx = (uint64_t)m_cm; /* send back my cm_ctx */ + memcpy(&msg.msg, &m_cm->msg, sizeof(dat_mcm_msg_t)); + memcpy(msg.msg.p_data, p_data, p_size); + msg.msg.p_size = htons(p_size); + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " ACCEPT -> dport %x cqpn %x iqpn %x lid %x, psize %d pdata[0]=%x\n", + ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), + ntohl(msg.msg.daddr1.qpn), ntohs(msg.msg.daddr1.lid), + p_size, msg.msg.p_data[0]); + + len = sizeof(dat_mix_cm_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* no reply */ + return 0; +} + +/* CM_REJ message, user or cm_thread context, locking required */ +int dapli_mix_cm_rej_out(dp_ib_cm_handle_t m_cm, int p_size, void *p_data, int reason) +{ + dat_mix_cm_t msg; + scif_epd_t mix_ep = m_cm->tp->scif_ev_ep; /* CM,EV channel */ + int ret, len; + + /* request: QP_r local, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CM_REJECT; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cm->tp->dev_id; + msg.cm_id = m_cm->cm_id; + msg.cm_ctx = (uint64_t)m_cm->cm_ctx; + memcpy(&msg.msg, &m_cm->msg, sizeof(dat_mcm_msg_t)); + memcpy(msg.msg.p_data, p_data, p_size); + msg.msg.p_size = htons(p_size); + + if (reason == IB_CM_REJ_REASON_CONSUMER_REJ) /* setup op in CM message */ + msg.msg.op = htons(MCM_REJ_USER); + else + msg.msg.op = htons(MCM_REJ_CM); + + msg.msg.saddr1.lid = m_cm->hca->ib_trans.addr.lid; + msg.msg.saddr1.qp_type = m_cm->msg.daddr1.qp_type; + dapl_os_memcpy(&msg.msg.saddr1.gid[0], &m_cm->hca->ib_trans.addr.gid, 16); + + dapl_log(DAPL_DBG_TYPE_EXTENSION," REJECT -> dport 0x%x, dqpn 0x%x dlid 0x%x, reason %d, psize %d\n", + ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr1.lid), reason, p_size ); + + len = sizeof(dat_mix_cm_t); + dapl_os_lock(&m_cm->tp->lock); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + dapl_os_unlock(&m_cm->tp->lock); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* no reply */ + return 0; +} + +/* MIX_CM_REQ operation, user context, op channel */ +int dapli_mix_cm_req_out(dp_ib_cm_handle_t m_cm, ib_qp_handle_t m_qp) +{ + dat_mix_cm_t msg; + scif_epd_t mix_ep = m_cm->tp->scif_ep; /* use operation channel */ + int ret, len; + + /* request: QP_r local, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CM_REQ; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cm->tp->dev_id; + msg.qp_id = m_qp->qp_id; /* shadowed TX */ + msg.cm_id = m_cm->cm_id; + msg.cm_ctx = (uint64_t)m_cm; + memcpy(&msg.msg, &m_cm->msg, sizeof(dat_mcm_msg_t)); + + dapl_log(DAPL_DBG_TYPE_EXTENSION," -> dport 0x%x, dqpn 0x%x dlid 0x%x ep_map %s\n", + ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr1.lid), + mcm_map_str(msg.msg.daddr1.ep_map)); + + len = sizeof(dat_mix_cm_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + + /* wait for response */ + ret = scif_recv(mix_ep, &msg, len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(1, " ERR: req_out rcv ep %d, ret %d, exp %d, err %s\n", + mix_ep, ret, len, strerror(errno)); + return -1; + } + if (msg.hdr.ver != DAT_MIX_VER || msg.hdr.op != MIX_CM_REQ || + msg.hdr.flags != MIX_OP_RSP || msg.hdr.status != MIX_SUCCESS) { + dapl_log(1, " MIX msg ver %d, op %s, flags %d, or stat %d ERR \n", + msg.hdr.ver, mix_op_str(msg.hdr.op), msg.hdr.flags, msg.hdr.status); + return -1; + } + + /* CM object linking: MIC to MPXYD */ + m_cm->scm_id = msg.cm_id; + m_cm->scm_ctx = msg.cm_ctx; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," reply on SCIF EP -> cm_id 0x%x, ctx %p\n", + m_cm->scm_id, (void*)m_cm->scm_ctx ); + + return 0; +} + +/* MIX_CM_RTU message, cm_thread context, use EV/CM channel, lock snd channel */ +int dapli_mix_cm_rtu_out(dp_ib_cm_handle_t m_cm) +{ + dat_mix_cm_t msg; + scif_epd_t mix_ep = m_cm->tp->scif_ev_ep; + int ret, len; + + /* connect RTU: QP_r local, QP_t shadowed */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CM_RTU; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cm->tp->dev_id; + msg.cm_id = m_cm->scm_id; + msg.cm_ctx = (uint64_t)m_cm; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," RTU -> id 0x%x dport 0x%x, dqpn 0x%x dlid 0x%x\n", + msg.cm_id, ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr1.lid)); + + len = sizeof(dat_mix_cm_t); + dapl_os_lock(&m_cm->tp->lock); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + dapl_os_unlock(&m_cm->tp->lock); + if (ret != len) { + dapl_log(1, " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); + return 0; +} + +/* MIX_CM_DREQ operation, user context, op channel */ +void dapli_mix_cm_dreq_out(dp_ib_cm_handle_t m_cm) { + + dat_mix_cm_t msg; + scif_epd_t mix_ep = m_cm->tp->scif_ep; /* operation channel */ + int ret, len; + + /* disconnect request out */ + msg.hdr.ver = DAT_MIX_VER; + msg.hdr.op = MIX_CM_DISC; + msg.hdr.status = 0; + msg.hdr.flags = MIX_OP_REQ; + msg.hdr.req_id = m_cm->tp->dev_id; + msg.cm_id = m_cm->scm_id; + msg.cm_ctx = (uint64_t)m_cm; + + dapl_log(DAPL_DBG_TYPE_EXTENSION," DREQ -> id 0x%x dport 0x%x, dqpn 0x%x dlid 0x%x\n", + msg.cm_id, ntohs(msg.msg.dport), ntohl(msg.msg.dqpn), ntohs(msg.msg.daddr1.lid) ); + + len = sizeof(dat_mix_cm_t); + ret = scif_send(mix_ep, &msg, len, SCIF_SEND_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " ERR: %s send on %d, ret %d, exp %d, error %s\n", + mix_op_str(msg.hdr.op), mix_ep, ret, len, strerror(errno)); + } + dapl_log(DAPL_DBG_TYPE_EXTENSION," Sent %s request on SCIF EP\n", mix_op_str(msg.hdr.op)); +} + +/* unsolicited CM event, scif_ep channel */ +int dapli_mix_cm_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_event_t *pmsg) +{ + int len, ret; + dp_ib_cm_handle_t cm; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_cm_event_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d, error %s\n", ret, len, strerror(errno)); + return ret; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CM_EVENT <-: id %d ctx %p event 0x%x\n", + pmsg->cm_id, pmsg->cm_ctx, pmsg->event); + + /* Find the CM and EP for event processing */ + cm = dapli_mix_get_cm(tp, pmsg->cm_ctx); + if (!cm) { + dapl_log(DAPL_DBG_TYPE_EXTENSION, " mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx); + return 0; + } + + switch (pmsg->event) { + case DAT_CONNECTION_EVENT_TIMED_OUT: + if (cm->sp) + dapls_cr_callback(cm, IB_CME_LOCAL_FAILURE, NULL, 0, cm->sp); + else + dapl_evd_connection_callback(cm, IB_CME_DESTINATION_UNREACHABLE, NULL, 0, cm->ep); + + break; + + case DAT_CONNECTION_EVENT_ESTABLISHED: + case DAT_CONNECTION_REQUEST_EVENT: + case DAT_DTO_COMPLETION_EVENT: + case DAT_CONNECTION_EVENT_PEER_REJECTED: + case DAT_CONNECTION_EVENT_NON_PEER_REJECTED: + case DAT_CONNECTION_EVENT_ACCEPT_COMPLETION_ERROR: + case DAT_CONNECTION_EVENT_DISCONNECTED: + mcm_disconnect_final(cm); + break; + case DAT_CONNECTION_EVENT_BROKEN: + case DAT_CONNECTION_EVENT_UNREACHABLE: + + default: + break; + } + + return 0; +} + +/* unsolicited DTO event, op channel */ +int dapli_mix_dto_event_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_dto_comp_t *pmsg) +{ + int len, ret, i; + struct dcm_ib_cq *m_cq; + DAPL_COOKIE *cookie; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_dto_comp_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d, error %s\n", ret, len, strerror(errno)); + return ret; + } + + /* Get cq and post DTO event with this WC entry */ + m_cq = (void*)pmsg->cq_ctx; + + for (i=0; iwc_cnt; i++) { + struct ibv_wc ib_wc; + /* possible segmentation on mpxyd side, update length if success */ + if (pmsg->wc[i].status == 0) { + cookie = (DAPL_COOKIE *) (uintptr_t) pmsg->wc[i].wr_id; + if (!cookie) { + dapl_log(DAPL_DBG_TYPE_ERR, + " ERR: mix_dto_event_in: wr_id=0 wc[%d] cnt %d\n", + i, pmsg->wc_cnt); + return 0; + } + pmsg->wc[i].byte_len = cookie->val.dto.size; + dapl_log(DAPL_DBG_TYPE_EP, + " mix_dto_event: MCM evd %p ep %p wr_id=%Lx ln=%d\n", + m_cq->evd, cookie->ep, pmsg->wc[i].wr_id, + cookie->val.dto.size); + } + mcm_const_ib_wc(&ib_wc, &pmsg->wc[i], 1); + dapl_os_lock(&m_cq->evd->header.lock); + dapls_evd_cqe_to_event(m_cq->evd, &ib_wc); + dapl_os_unlock(&m_cq->evd->header.lock); + } + + return 0; +} + +int dapli_mix_cm_rep_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t *pmsg) +{ + int len, ret; + dp_ib_cm_handle_t cm; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d, error %s\n", ret, len, strerror(errno)); + return ret; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CM_REP_IN <-: id %d ctx %p \n", pmsg->cm_id, pmsg->cm_ctx); + + /* Find the CM and EP for event processing */ + cm = dapli_mix_get_cm(tp, pmsg->cm_ctx); + if (!cm) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx); + return -1; + } + + mcm_connect_rtu(cm, &pmsg->msg); + return 0; +} + +int dapli_mix_cm_req_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t *pmsg) +{ + int len, ret; + dp_ib_cm_handle_t acm; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d, error %s\n", ret, len, strerror(errno)); + return ret; + } + + /* Allocate client CM and setup passive references */ + if ((acm = dapls_cm_create(tp->hca, NULL)) == NULL) { + dapl_log(DAPL_DBG_TYPE_WARN, " mix_cm_req_in: ERR cm_create\n"); + return -1; + } + + acm->sp = (DAPL_SP *)pmsg->sp_ctx; + acm->cm_id = pmsg->cm_id; + acm->cm_ctx = pmsg->cm_ctx; + memcpy(&acm->msg, &pmsg->msg, sizeof(dat_mcm_msg_t)); + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CM_REQ_IN <- DST port=%x lid=%x, QPr=%x, QPt=%x, psize=%d r_guid=%Lx\n", + ntohs(acm->msg.dport), ntohs(acm->msg.daddr1.lid), + htonl(acm->msg.daddr1.qpn), htonl(acm->msg.daddr2.qpn), + htons(acm->msg.p_size), htonll(acm->msg.sys_guid)); + + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CM_REQ_IN <-: sp %p new_cm %p pxy_id %d pxy_ctx %p psize %d pdata[0]=0x%x\n", + acm->sp, acm, pmsg->cm_id, pmsg->cm_ctx, ntohs(acm->msg.p_size), + acm->msg.p_data[0]); + + /* trigger CR event */ + acm->state = MCM_ACCEPTING; + dapli_queue_conn(acm); + dapls_cr_callback(acm, IB_CME_CONNECTION_REQUEST_PENDING, + acm->msg.p_data, ntohs(acm->msg.p_size), acm->sp); + return 0; +} + +int dapli_mix_cm_rtu_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t *pmsg) +{ + int len, ret; + dp_ib_cm_handle_t cm; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: ret %d, exp %d, error %s\n", ret, len, strerror(errno)); + return ret; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, + " MIX_CM_RTU_IN <-: id %d ctx %p \n", pmsg->cm_id, pmsg->cm_ctx); + + /* Find the CM and EP for event processing */ + cm = dapli_mix_get_cm(tp, pmsg->cm_ctx); + if (!cm) { + dapl_log(DAPL_DBG_TYPE_ERR, " ERR: mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx); + return -1; + } + + dapl_os_lock(&cm->lock); + cm->state = MCM_CONNECTED; + dapl_os_unlock(&cm->lock); + + dapls_cr_callback(cm, IB_CME_CONNECTED, NULL, 0, cm->sp); + return 0; +} + +int dapli_mix_cm_rej_in(ib_hca_transport_t *tp, scif_epd_t scif_ep, dat_mix_cm_t *pmsg) +{ + int len, ret, event; + dp_ib_cm_handle_t cm; + + /* hdr already read, get operation data */ + len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK); + if (ret != len) { + dapl_log(DAPL_DBG_TYPE_ERR, " MCM_REJ_IN ERR: %d exp %d, scif_recv err %s\n", + ret, len, strerror(errno)); + return ret; + } + + /* Find the CM and EP for event processing */ + cm = dapli_mix_get_cm(tp, pmsg->cm_ctx); + if (!cm) { + dapl_log(1, " ERR: mcm_get_cm, ctx %p, not found\n", pmsg->cm_ctx); + return -1; + } + memcpy(&cm->msg, &pmsg->msg, sizeof(dat_mcm_msg_t)); + + dapl_log(DAPL_DBG_TYPE_CM_WARN, + " MCM_REJ_IN%s <- p_msg %p id %d cm %p (%d) ep %p %s\n", + pmsg->hdr.op == MIX_CM_REJECT_USER ? "_USER":"", + pmsg, pmsg->cm_id, pmsg->cm_ctx, cm->ref_count, cm->ep, + dapl_cm_state_str(cm->state)); + + if (pmsg->hdr.op == MIX_CM_REJECT_USER) + event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA; + else + event = IB_CME_DESTINATION_REJECT; + + dapl_os_lock(&cm->lock); + cm->state = MCM_REJECTED; + dapl_os_unlock(&cm->lock); + + if (cm->sp) + dapls_cr_callback(cm, event, NULL, 0, cm->sp); + else + dapl_evd_connection_callback(cm, event, + cm->msg.p_data, + ntohs(cm->msg.p_size), cm->ep); + return 0; +} + + +/* + * MIX recv, unsolicited messages from MPXYD, via scif_ev_ep - CM/EV endpoint + * + */ +int dapli_mix_recv(DAPL_HCA *hca, int scif_ep) +{ + char cmd[DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX]; + dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd; + ib_hca_transport_t *tp = &hca->ib_trans; + int ret, len; + + len = sizeof(dat_mix_hdr_t); + ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK); + if ((ret != len) || (phdr->ver != DAT_MIX_VER) || phdr->flags != MIX_OP_REQ) { + dapl_log(DAPL_DBG_TYPE_EXCEPTION, + " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d flgs=%d, error %s\n", + scif_ep, ret, len, phdr->ver, phdr->flags, strerror(errno)); + return -1; + } + dapl_log(DAPL_DBG_TYPE_EXTENSION, " ver %d, op %d, flags %d\n", phdr->ver, phdr->op, phdr->flags); + + switch (phdr->op) { + case MIX_DTO_EVENT: + ret = dapli_mix_dto_event_in(tp, scif_ep, (dat_mix_dto_comp_t*)phdr); + break; + case MIX_CM_EVENT: + ret = dapli_mix_cm_event_in(tp, scif_ep, (dat_mix_cm_event_t*)phdr); + break; + case MIX_CM_REQ: + ret = dapli_mix_cm_req_in(tp, scif_ep, (dat_mix_cm_t*)phdr); + break; + case MIX_CM_REP: + ret = dapli_mix_cm_rep_in(tp, scif_ep, (dat_mix_cm_t*)phdr); + break; + case MIX_CM_REJECT: + case MIX_CM_REJECT_USER: + ret = dapli_mix_cm_rej_in(tp, scif_ep, (dat_mix_cm_t*)phdr); + break; + case MIX_CM_RTU: + ret = dapli_mix_cm_rtu_in(tp, scif_ep, (dat_mix_cm_t*)phdr); + break; + case MIX_CM_EST: + case MIX_CM_DISC: + case MIX_CM_DREP: + break; + default: + dapl_log(DAPL_DBG_TYPE_ERR, " ERROR!!! unknown MIX operation: %d\n", phdr->op); + return -1; + } + return ret; +} + + + + + + + + + + + + diff --git a/dapl/openib_mcm/proxy.c b/dapl/openib_mcm/proxy.c new file mode 100644 index 0000000..e81b6f1 --- /dev/null +++ b/dapl/openib_mcm/proxy.c @@ -0,0 +1,501 @@ +/* + * Copyright (c) 2009-2014 Intel Corporation. All rights reserved. + * + * This Software is licensed under one of the following licenses: + * + * 1) under the terms of the "Common Public License 1.0" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/cpl.php. + * + * 2) under the terms of the "The BSD License" a copy of which is + * available from the Open Source Initiative, see + * http://www.opensource.org/licenses/bsd-license.php. + * + * 3) under the terms of the "GNU General Public License (GPL) Version 2" a + * copy of which is available from the Open Source Initiative, see + * http://www.opensource.org/licenses/gpl-license.php. + * + * Licensee has the right to choose one of the above licenses. + * + * Redistributions of source code must retain the above copyright + * notice and one of the license notices. + * + * Redistributions in binary form must reproduce both the above copyright + * notice, one of the license notices in the documentation + * and/or other materials provided with the distribution. + */ +#include "dapl.h" +#include "dapl_adapter_util.h" +#include "dapl_ib_util.h" +#include "dapl_evd_util.h" +#include "dapl_ep_util.h" +#include "dapl_osd.h" + +/* + * HST -> MXS - proxy-out (PO) to proxy-in (PI) + * + * non-MIC host to MIC cross socket EP needs to send WR to remote PI service + * instead of direct IB send or write. Inbound traffic from remote MXS will still be + * be direct so there is no need for PI service on this MCM providers host side. + * + * NOTE: Initial design with no segmentation, set frequent PI MP signal rate + * This will avoid creation and management of a local PO WR queue for segments + */ +#define MCM_MP_SIG_RATE 5 + +int mcm_send_pi(struct dcm_ib_qp *m_qp, int len, struct ibv_send_wr *wr, struct ibv_send_wr **bad_wr) +{ + struct ibv_send_wr wr_imm; + struct ibv_sge sge; + struct mcm_wr_rx m_wr_rx; + int i, ret = 0, wr_idx; + struct wrc_idata wrc; + uint32_t wr_flags, offset=0; + + dapl_log(DAPL_DBG_TYPE_EP, + " mcm_send_pi: ep %p qpn %x ln %d WR: tl %d hd %d end %d wr_id %Lx\n", + m_qp->ep, m_qp->qp2->qp_num, len, m_qp->wr_tl, + m_qp->wr_hd, m_qp->wrc_rem.wr_end, wr->wr_id); + + if (wr->num_sge > DAT_MIX_SGE_MAX) { + ret = EINVAL; + goto bail; + } + /* one WR per IB sge, no additional segmentation */ + for (i=0;inum_sge;i++) { + wr_flags = M_SEND_DIRECT | M_SEND_PI; + if (i==0) wr_flags |= M_SEND_FS; + if (i==(wr->num_sge-1)) { + wr_flags |= M_SEND_LS; + if (wr->send_flags & IBV_SEND_SIGNALED) + wr_flags |= M_SEND_CN_SIG; + } + dapl_os_lock(&m_qp->lock); + if (((m_qp->wr_hd + 1) & m_qp->wrc_rem.wr_end) == m_qp->wr_tl) { /* full */ + ret = ENOMEM; + dapl_os_unlock(&m_qp->lock); + goto bail; + } + m_qp->wr_hd = (m_qp->wr_hd + 1) & m_qp->wrc_rem.wr_end; /* move hd */ + wr_idx = m_qp->wr_hd; + if (!(wr_idx % MCM_MP_SIG_RATE) || (wr_flags & M_SEND_CN_SIG)) + wr_flags |= M_SEND_MP_SIG; + dapl_os_unlock(&m_qp->lock); + + dapl_log(DAPL_DBG_TYPE_EVD, + " mcm_send_pi[%d]: ln %d wr_idx %d, tl %d hd %d\n", + i, wr->sg_list[i].length, wr_idx, m_qp->wr_tl, m_qp->wr_hd); + + /* build local m_wr_rx for remote PI */ + memset((void*)&m_wr_rx, 0, sizeof(struct mcm_wr_rx)); + m_wr_rx.org_id = (uint64_t) htonll((uint64_t)wr->wr_id); + m_wr_rx.flags = htonl(wr_flags); + m_wr_rx.w_idx = htonl(m_qp->wc_tl); /* snd back wc tail */ + m_wr_rx.wr.num_sge = htonl(wr->num_sge); + m_wr_rx.wr.opcode = htonl(wr->opcode); + m_wr_rx.wr.send_flags = htonl(wr->send_flags); + m_wr_rx.wr.imm_data = htonl(wr->imm_data); + m_wr_rx.sg[0].addr = htonll(wr->sg_list[i].addr); + m_wr_rx.sg[0].lkey = htonl(wr->sg_list[i].lkey); + m_wr_rx.sg[0].length = htonl(wr->sg_list[i].length); + + if ((wr->opcode == IBV_WR_RDMA_WRITE) || + (wr->opcode == IBV_WR_RDMA_WRITE_WITH_IMM)) { + m_wr_rx.wr.wr.rdma.remote_addr = htonll(wr->wr.rdma.remote_addr + offset); + m_wr_rx.wr.wr.rdma.rkey = htonl(wr->wr.rdma.rkey); + offset += wr->sg_list[i].length; + } + + /* setup imm_data for PI rcv engine */ + wrc.id = (uint16_t)wr_idx; + wrc.type = M_WR_TYPE; + wrc.flags = 0; + + /* setup local WR for wr_rx transfer - RW_imm inline */ + wr_imm.wr_id = wr->wr_id; /* MUST be original cookie, CQ processing */ + wr_imm.next = 0; + wr_imm.sg_list = &sge; + wr_imm.num_sge = 1; + wr_imm.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + wr_imm.send_flags = IBV_SEND_INLINE; /* m_wr_rx, 148 bytes */ + if (wr_flags & M_SEND_MP_SIG) + wr_imm.send_flags |= IBV_SEND_SIGNALED; + wr_imm.imm_data = htonl(*(uint32_t *)&wrc); + wr_imm.wr.rdma.rkey = m_qp->wrc_rem.wr_rkey; + wr_imm.wr.rdma.remote_addr = + (uint64_t)(uintptr_t) + ((struct mcm_wr_rx *) (m_qp->wrc_rem.wr_addr + (m_qp->wrc_rem.wr_sz * wr_idx))); + + sge.addr = (uint64_t)(uintptr_t) &m_wr_rx; + sge.length = (uint32_t) sizeof(struct mcm_wr_rx); /* 160 byte WR */ + sge.lkey = 0; /* inline doesn't need registered */ + + dapl_log(DAPL_DBG_TYPE_EVD, + " mcm_send_pi[%d]: WR_RX wr_id %Lx qn %x op %d flgs 0x%x" + " imm %x raddr %p rkey %x ln %d\n", + i, wr_imm.wr_id, m_qp->qp2->qp_num, wr_imm.opcode, + wr_flags, ntohl(wr_imm.imm_data), + wr_imm.wr.rdma.remote_addr, wr_imm.wr.rdma.rkey, + sizeof(struct mcm_wr_rx)); + dapl_log(DAPL_DBG_TYPE_EVD, + " mcm_send_pi[%d]: WR wr_id %Lx qn %x op %d flgs %x" + " imm %x raddr %p rkey %x ln %d tl %d me %d hd %d\n", + i, wr->wr_id, m_qp->qp2->qp_num, wr->opcode, + wr->send_flags, wr->imm_data, wr->wr.rdma.remote_addr, + wr->wr.rdma.rkey, wr->sg_list[i].length, + m_qp->wr_tl, wr_idx, m_qp->wr_hd); + + ret = ibv_post_send(m_qp->qp2, &wr_imm, bad_wr); /* QP2: QPtx - QPrx PI */ + if (ret) { + dapl_log(DAPL_DBG_TYPE_ERR, + " mcm_send_pi ERR: m_wr %p idx %d laddr=%p ln=%d lkey=%x flgs %x" + " tl %d hd %d\n", + m_wr_rx, wr_idx, wr->sg_list[0].addr, + wr->sg_list[0].length, wr->sg_list[0].lkey, + m_wr_rx.flags, m_qp->wr_tl, m_qp->wr_hd); + dapl_log(DAPL_DBG_TYPE_ERR, + " mcm_send_pi ERR: wr_id %Lx %p sglist %p sge %d op %d flgs %x" + " idata 0x%x raddr %p rkey %x \n", + m_wr_rx.wr.wr_id, wr->sg_list, + m_wr_rx.wr.num_sge, m_wr_rx.wr.opcode, + m_wr_rx.wr.send_flags, m_wr_rx.wr.imm_data, + m_wr_rx.wr.wr.rdma.remote_addr, + m_wr_rx.wr.wr.rdma.rkey); + goto bail; + } + } +bail: + return ret; +} + +/* TX - RW_imm work request data to remote PI or consumer TX data direct to peer */ +static inline void mcm_dto_req(struct dcm_ib_cq *m_cq, struct ibv_wc *wc) +{ + DAPL_COOKIE *cookie; + struct dcm_ib_qp *m_qp; + + cookie = (DAPL_COOKIE *)(uintptr_t)wc->wr_id; + m_qp = cookie->ep->qp_handle; + + if (!m_qp->tp->scif_ep && MXS_EP(m_qp) && + (wc->opcode == (uint32_t)IBV_WR_RDMA_WRITE_WITH_IMM)) { + dapl_log(DAPL_DBG_TYPE_EP, + " mcm_dto_req: RW_imm -> WR, wr_id %Lx\n", wc->wr_id); + return; /* post_send -> RW_imm to peer PI */ + } + + dapl_log(DAPL_DBG_TYPE_EP, + " mcm_dto_req: SIG evd %p ep %p WR tl %d hd %d WC tl %d wr_id %p\n", + m_qp->req_cq ? m_qp->req_cq->evd:0, m_qp->ep, m_qp->wr_tl, m_qp->wr_hd, + m_qp->wc_tl, cookie); + + dapl_os_lock(&m_qp->req_cq->evd->header.lock); + dapls_evd_cqe_to_event(m_qp->req_cq->evd, wc); + dapl_os_unlock(&m_qp->req_cq->evd->header.lock); +} + +/* RX work completion of RW data to remote PI, remote RR completion */ +static inline void mcm_dto_rcv(struct dcm_ib_cq *m_cq, struct ibv_wc *wc) +{ + struct mcm_wc_rx *m_wc; + struct dcm_ib_qp *m_qp = (struct dcm_ib_qp *)wc->wr_id; + struct wrc_idata wrc; + + wrc.id = WRC_ID_DATA(ntohl(wc->imm_data)); + wrc.type = WRC_TYPE_DATA(ntohl(wc->imm_data)); + wrc.flags = WRC_FLAGS_DATA(ntohl(wc->imm_data)); + + if (wrc.type != M_WC_TYPE) { + dapl_log(DAPL_DBG_TYPE_ERR, + "mcm_dto_rcv: ERR imm WC type ?= 0x%x\n", wrc.type); + goto bail; + } + + if (wrc.id > m_qp->wrc.wc_end) { + dapl_log(DAPL_DBG_TYPE_ERR, + " mcm_dto_rcv: ERR WC id out of range %x > %x \n", + wrc.id, m_qp->wrc.wc_end); + goto bail; + } + m_wc = (struct mcm_wc_rx *)(m_qp->wrc.wc_addr + (m_qp->wrc.wc_sz * wrc.id)); + mcm_ntoh_wc_rx(m_wc); /* convert WC contents, pushed via wire */ + + dapl_log(DAPL_DBG_TYPE_EP, + " mcm_dto_rcv: MCM evd %p ep %p id %d wc %p wr_id %Lx flgs 0x%x %s\n", + m_qp->req_cq->evd, m_qp->ep, wrc.id, m_wc, m_wc->wc.wr_id, + m_wc->flags, m_wc->flags & M_SEND_CN_SIG ? "SIG":"NO_SIG"); + + dapl_os_lock(&m_qp->lock); + m_qp->wr_tl = m_wc->wr_tl; + m_qp->wc_tl = wrc.id; /* move wc_tl, for wc_tl_rem on peer PI service */ + dapl_os_unlock(&m_qp->lock); + if (m_wc->flags & M_SEND_CN_SIG) { + struct ibv_wc ib_wc; + DAPL_COOKIE *cookie = (DAPL_COOKIE *)(uintptr_t) m_wc->wc.wr_id; + + dapl_log(DAPL_DBG_TYPE_EP, + " mcm_dto_rcv: MCM SIG evd %p ep %p WR tl %d hd %d WC tl %d wr_id %p\n", + m_qp->req_cq ? m_qp->req_cq->evd:0, m_qp->ep, m_qp->wr_tl, m_qp->wr_hd, + m_qp->wc_tl, cookie); + + mcm_const_ib_wc(&ib_wc, &m_wc->wc, 1); + dapl_os_lock(&m_qp->req_cq->evd->header.lock); + dapls_evd_cqe_to_event(m_qp->req_cq->evd, &ib_wc); + dapl_os_unlock(&m_qp->req_cq->evd->header.lock); + } +bail: + if (mcm_post_rcv_wc(m_qp, 1)) + dapl_log(DAPL_DBG_TYPE_ERR,"mcm_dto_rcv: recv wc repost failed\n"); +} + +int mcm_post_rcv_wc(struct dcm_ib_qp *m_qp, int cnt) +{ + struct ibv_recv_wr r_wr, *r_err; + int err, i; + + r_wr.next = NULL; /* re-post message */ + r_wr.sg_list = NULL; + r_wr.num_sge = 0; + r_wr.wr_id = (uint64_t)(uintptr_t) m_qp; + errno = 0; + + for (i=0;iqp2, &r_wr, &r_err); + if (err) { + dapl_log(DAPL_DBG_TYPE_ERR,"ERR: qp %p (QP2) qpn %x " + "ibv_post_recv ret = %d %s\n", + m_qp, m_qp->qp2 ? m_qp->qp2->qp_num:0, + err, strerror(errno)); + return errno; + } + } + dapl_log(DAPL_DBG_TYPE_EP, "mcm_post_rcv_wc: qp %p qpn 0x%x posted %d\n", + m_qp, m_qp->qp2->qp_num, cnt); + return 0; +} + +/* Proxy-in service - called from CM-RX thread + * + * This processes both TX and RX events + * rcv_cq is PI only service + * req_cq is PO-PI RW_imm or HST->Direct RW if CQ shared across QP's + * + * <- Work completion in (RW_imm - WC idata), local initiated RW + * -> RW_imm work requests out PO-PI + * -> RW direct from consumer post HST->Direct (remote is HST or MSS) + * + */ +void mcm_dto_event(struct dcm_ib_cq *m_cq) +{ + struct ibv_wc wc[5]; + struct ibv_cq *ib_cq; + void *cq_ctx = NULL; + int i, wc_cnt, ret, err, notify; + + dapl_log(DAPL_DBG_TYPE_THREAD," PI event: enter\n"); + + ret = ibv_get_cq_event(m_cq->cq->channel, &ib_cq, (void *)&cq_ctx); + if (ret == 0) + ibv_ack_cq_events(ib_cq, 1); + + wc_cnt = err = notify = 0; +retry: + ret = ibv_poll_cq(m_cq->cq, 5, wc); + if (ret <= 0) { + if (!ret && !notify) { + ibv_req_notify_cq(m_cq->cq, 0); + notify = 1; + goto retry; + } + dapl_log(DAPL_DBG_TYPE_THREAD," PI event: empty, return\n"); + return; + } else + notify = 0; + + wc_cnt += ret; + for (i=0; iopcode == IBV_WC_RECV_RDMA_WITH_IMM) + mcm_dto_rcv(m_cq, &wc[i]); + else + mcm_dto_req(m_cq, &wc[i]); + } + goto retry; +} + +void mcm_destroy_wc_q(struct dcm_ib_qp *m_qp) +{ + dapl_log(DAPL_DBG_TYPE_EP, + "mcm_destroy_wc_q: QP %p PI WC_q %p\n", + m_qp, m_qp->wrc.wc_addr); + + if (m_qp->wc_mr) { + ibv_dereg_mr(m_qp->wc_mr); + m_qp->wc_mr = NULL; + } + if (m_qp->wrc.wc_addr) { + free((void*)m_qp->wrc.wc_addr); + m_qp->wrc.wc_addr = 0; + } +} + +int mcm_create_wc_q(struct dcm_ib_qp *m_qp, int entries) +{ + struct ibv_pd *ib_pd = ((DAPL_PZ *)m_qp->ep->param.pz_handle)->pd_handle; + + dapl_log(DAPL_DBG_TYPE_EP, + "mcm_create_wc_q: QP %p entries %d\n", m_qp, entries); + + /* RDMA proxy WC pool, register with SCIF and IB, set pool and segm size with parameters */ + m_qp->wrc.wc_sz = ALIGN_64(sizeof(struct mcm_wc_rx)); + m_qp->wrc.wc_len = m_qp->wrc.wc_sz * entries; /* 64 byte aligned for signal_fence */ + m_qp->wrc.wc_end = entries - 1; + + if (posix_memalign((void **)&m_qp->wrc.wc_addr, 4096, ALIGN_PAGE(m_qp->wrc.wc_len))) { + dapl_log(DAPL_DBG_TYPE_EP, "failed to allocate wc_rbuf," + " m_qp=%p, wc_len=%d, entries=%d\n", + m_qp, m_qp->wrc.wc_len, entries); + return -1; + } + memset((void*)m_qp->wrc.wc_addr, 0, ALIGN_PAGE(m_qp->wrc.wc_len)); + + dapl_log(DAPL_DBG_TYPE_EP, " WC rbuf pool %p, LEN req=%d, act=%d\n", + m_qp->wrc.wc_addr, m_qp->wrc.wc_len, ALIGN_PAGE(m_qp->wrc.wc_len)); + + m_qp->wc_mr = ibv_reg_mr(ib_pd, (void*)m_qp->wrc.wc_addr, m_qp->wrc.wc_len, + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); + if (!m_qp->wc_mr) { + dapl_log(DAPL_DBG_TYPE_ERR, " IB_register addr=%p,%d failed %s\n", + m_qp->wrc.wc_addr, ALIGN_PAGE(m_qp->wrc.wc_len), strerror(errno)); + return -1; + } + m_qp->wrc.wc_addr = (uint64_t)(uintptr_t)m_qp->wc_mr->addr; + m_qp->wrc.wc_rkey = m_qp->wc_mr->rkey; + + dapl_log(DAPL_DBG_TYPE_EP, " IB_mr for wc_buf addr %p, mr 0x%llx, len %d, entries %d rkey %x lkey %x\n", + m_qp->wrc.wc_addr, m_qp->wc_mr->addr, ALIGN_PAGE(m_qp->wrc.wc_len), + entries, m_qp->wc_mr->rkey, m_qp->wc_mr->lkey); + + /* Put QP's req and rcv CQ on device PI cqlist, mark CQ for indirect signaling */ + dapl_os_lock(&m_qp->tp->cqlock); + m_qp->req_cq->flags |= DCM_CQ_TX_INDIRECT; + if (!m_qp->req_cq->entry.list_head) + dapl_llist_add_tail(&m_qp->tp->cqlist, &m_qp->req_cq->entry, m_qp->req_cq); + if (!m_qp->rcv_cq->entry.list_head) + dapl_llist_add_tail(&m_qp->tp->cqlist, &m_qp->rcv_cq->entry, m_qp->rcv_cq); + dapl_os_unlock(&m_qp->tp->cqlock); + dapls_thread_signal(&m_qp->tp->signal); /* CM thread will process PI */ + + return 0; +} + +void mcm_destroy_pi_cq(struct dcm_ib_qp *m_qp) +{ + if (!m_qp->rcv_cq) + return; + + dapl_log(DAPL_DBG_TYPE_EP, "mcm_destroy_pi_cq: QP %p CQ %p\n", + m_qp, m_qp->rcv_cq); + + /* remove from device PI processing list */ + dapl_os_lock(&m_qp->tp->cqlock); + if (m_qp->rcv_cq->entry.list_head) + dapl_llist_remove_entry(&m_qp->tp->cqlist, + &m_qp->rcv_cq->entry); + dapl_os_unlock(&m_qp->tp->cqlock); + + if (m_qp->rcv_cq->cq) { + struct ibv_comp_channel *channel = m_qp->rcv_cq->cq->channel; + + ibv_destroy_cq(m_qp->rcv_cq->cq); + m_qp->rcv_cq->cq = NULL; + if (channel) + ibv_destroy_comp_channel(channel); + } + dapl_os_free(m_qp->rcv_cq, sizeof(struct dcm_ib_cq)); + m_qp->rcv_cq = NULL; +} + +int mcm_create_pi_cq(struct dcm_ib_qp *m_qp, int len) +{ + struct ibv_comp_channel *channel = NULL; + int cqlen = len; + int opts, ret = ENOMEM; + + dapl_dbg_log(DAPL_DBG_TYPE_EP, + "mcm_create_pi_cq: qp = %p cqlen=%d \n", m_qp, cqlen); + + /* create CQ object */ + m_qp->rcv_cq = dapl_os_alloc(sizeof(struct dcm_ib_cq)); + if (!m_qp->rcv_cq) + goto err; + + dapl_os_memzero(m_qp->rcv_cq, sizeof(struct dcm_ib_cq)); + m_qp->rcv_cq->tp = m_qp->tp; + dapl_llist_init_entry(&m_qp->rcv_cq->entry); + + errno = 0; + channel = ibv_create_comp_channel(m_qp->tp->hca->ib_hca_handle); + if (!channel) + goto err; + + /* move channel FD to non-blocking */ + opts = fcntl(channel->fd, F_GETFL); + if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) { + dapl_log(DAPL_DBG_TYPE_ERR, + " dapls_config_fd: fcntl on channel->fd %d ERR %d %s\n", + channel->fd, opts, strerror(errno)); + goto err; + } + m_qp->rcv_cq->cq = ibv_create_cq(m_qp->tp->hca->ib_hca_handle, + cqlen, m_qp, channel, 0); + if (!m_qp->rcv_cq->cq) + goto err; + + /* arm cq for events */ + ibv_req_notify_cq(m_qp->rcv_cq->cq, 0); + + dapl_dbg_log(DAPL_DBG_TYPE_EP, + "mcm_create_pi_cq: new_cq %p cqlen=%d \n", + m_qp->rcv_cq, cqlen); + + dapl_log(DAPL_DBG_TYPE_EVD, + "mcm_create_pi_cq (%d): new_cq %p ib_cq %p cqlen %d,%d\n", + m_qp->rcv_cq, m_qp->rcv_cq->cq, len, cqlen); + + return 0; + +err: + dapl_log(DAPL_DBG_TYPE_ERR, + "mcm_create_pi_cq: ERR new_cq %p cqlen %d,%d ret %s\n", + m_qp->rcv_cq, len, cqlen, strerror(errno)); + + if (m_qp->rcv_cq) { + dapl_os_free(m_qp->rcv_cq, sizeof(struct dcm_ib_cq)); + m_qp->rcv_cq = NULL; + } + if (channel) + ibv_destroy_comp_channel(channel); + + return dapl_convert_errno(ret, "create_pi_cq" ); +} + + + + -- 2.46.0