/* scif-rdma cmd and data channel parameters */
static int mix_align = 64;
static int mix_buffer_mb = 128;
-static int mix_buffer_sg = 1048576;
+static int mix_buffer_sg = 131072;
+static int mix_buffer_sg_cnt = 100;
static int mix_cmd_depth = 50;
static int mix_cmd_size = 256;
static int mix_shared_buffer = 1;
static int mix_max_msg_mb = 64;
+static int mix_inline_threshold = 256;
+static int mix_eager_completion = 0;
+static int mcm_ib_inline = 128;
+static int mcm_ib_signal_rate = 20;
/* cm parameters */
static int mcm_depth = 500;
} mcm_ib_dev_t;
+enum mcm_send_flags {
+ M_SEND_POSTED = 1 << 0, /* m_wr already posted */
+ M_SEND_CN_SIG = 1 << 1, /* m_wr consumer signaled, IB completion */
+ M_SEND_CN_EAGER_SIG = 1 << 2, /* m_wr consumer eager signaled, SCIF read completion */
+ M_SEND_MP_SIG = 1 << 3, /* m_wr mpxyd signaled, segmentation, manage proxy buf/wr resources */
+};
+
+typedef struct mcm_wr {
+ struct ibv_send_wr wr;
+ struct ibv_sge sg[DAT_MIX_SGE_MAX];
+ uint64_t org_id;
+ uint64_t context;
+ uint32_t m_idx;
+ uint32_t w_idx;
+ int flags;
+} mcm_wr_t;
+
/* DAPL MCM QP object, id in entry, buffer pool per QP for now, TODO share a pool at SMD level? */
typedef struct mcm_qp {
LLIST_ENTRY entry;
pthread_mutex_t lock; /* QP lock, pool and cookies */
dat_mix_qp_attr_t qp_t;
dat_mix_qp_attr_t qp_r;
+ struct mcm_cq *m_cq; /* CQ for sQP */
char *m_buf; /* MIC proxy buffer, SCIF and IB */
struct ibv_mr *m_mr; /* IB registration */
off_t m_off; /* SCIF registration */
int wr_tl; /* work request pool tail */
int wr_end; /* work request pool end */
int wr_len; /* work request pool size */
+ int wr_sz; /* work request entry size, 64 byte aligned */
int ref_cnt;
+ int post_cnt; /* segmenting requires completion management to avoid WR depletion */
+ int post_sig_cnt;
+ int comp_cnt;
uint64_t rf_min; /* scif_readfrom profiling */
uint64_t rf_max;
uint64_t rf_avg;
uint32_t cq_len;
uint32_t cq_id; /* MIC client */
uint64_t cq_ctx; /* MIC client */
+ uint64_t prev_id;
int ref_cnt;
} mcm_cq_t;
pthread_mutex_t qplock; /* qp lock */
pthread_mutex_t cqlock; /* cq lock */
pthread_mutex_t mrlock; /* mr lock */
- int ref_cnt; /* references */
+ int destroy; /* destroying device, all resources */
+ int ref_cnt; /* child references */
+ int th_ref_cnt; /* work thread references */
struct mcm_ib_dev *md; /* mcm_ib_dev, parent */
uint16_t cm_id; /* port ID MIC client, md->ports */
uint64_t *ports; /* EP port space MIC client */
"MCM_CQ_REARM",
"MCM_CQ_EVENT",
"MCM_QP_CREATE",
+ "MCM_MX_SEND",
+ "MCM_MX_SEND_INLINE",
+ "MCM_MX_WRITE",
+ "MCM_MX_WRITE_SEG",
+ "MCM_MX_WRITE_INLINE",
"MCM_QP_SEND",
+ "MCM_QP_SEND_INLINE",
"MCM_QP_WRITE",
+ "MCM_QP_WRITE_SEG",
+ "MCM_QP_WRITE_INLINE",
"MCM_QP_READ",
"MCM_QP_FREE",
"MCM_QP_EVENT",
MCM_CQ_POLL,
MCM_CQ_REARM,
MCM_CQ_EVENT,
+ MCM_MX_SEND,
+ MCM_MX_SEND_INLINE,
+ MCM_MX_WRITE,
+ MCM_MX_WRITE_SEG,
+ MCM_MX_WRITE_INLINE,
MCM_QP_CREATE,
MCM_QP_SEND,
+ MCM_QP_SEND_INLINE,
MCM_QP_WRITE,
+ MCM_QP_WRITE_SEG,
+ MCM_QP_WRITE_INLINE,
MCM_QP_READ,
MCM_QP_FREE,
MCM_QP_EVENT,
void mpxy_op_thread(void *mic_client);
void mpxy_tx_thread(void *mic_client);
void mpxy_cm_thread(void *mic_client);
+void mcm_cq_event(struct mcm_cq *m_cq);
+static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data, int *events);
+
+static inline void sleep_usec(int usec)
+{
+ struct timespec sleep, remain;
+
+ sleep.tv_sec = 0;
+ sleep.tv_nsec = usec * 1000;
+ nanosleep(&sleep, &remain);
+}
static inline uint32_t mcm_ts_us(void)
{
return ret;
}
+static int mcm_config_fd(int fd)
+{
+ int opts;
+
+ opts = fcntl(fd, F_GETFL);
+ if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+ mlog(1, "fcntl on fd %d ERR %d %s\n", fd, opts, strerror(errno));
+ return errno;
+ }
+ return 0;
+}
+
/* MCM 16-bit port space */
static uint16_t mcm_get_port(uint64_t *p_port, uint16_t port, uint64_t ctx)
{
mix_buffer_sg = atoi(value);
else if (!strcasecmp("buffer_alignment", opt))
mix_align = atoi(value);
+ else if (!strcasecmp("buffer_inline_threshold", opt))
+ mix_inline_threshold = atoi(value);
else if (!strcasecmp("mcm_depth", opt))
mcm_depth = atoi(value);
else if (!strcasecmp("scif_port_id", opt))
mcm_affinity = atoi(value);
else if (!strcasecmp("mcm_affinity_base", opt))
mcm_affinity_base = atoi(value);
+ else if (!strcasecmp("mcm_ib_inline", opt))
+ mcm_ib_inline = atoi(value);
else if (!strcasecmp("mcm_perf_profile", opt))
mcm_profile = atoi(value);
+ else if (!strcasecmp("mcm_eager_completion", opt))
+ mix_eager_completion = atoi(value);
}
fclose(f);
mlog(0, "RDMA buffer pool size %d MB\n", mix_buffer_mb);
mlog(0, "RDMA buffer segment size %d\n", mix_buffer_sg);
mlog(0, "RDMA buffer alignment %d\n", mix_align);
+ mlog(0, "RDMA SCIF inline threshold %d\n", mix_inline_threshold);
+ mlog(0, "RDMA IB inline threshold %d\n", mcm_ib_inline);
+ mlog(0, "RDMA eager completion %d\n", mix_eager_completion);
mlog(0, "Maximum message size %d MB\n", mix_max_msg_mb);
mlog(0, "CM msg queue depth %d\n", mcm_depth);
mlog(0, "CM msg completion signal rate %d\n", mcm_signal);
smd->m_shared_len = mix_buffer_mb * (1024 * 1024);
else
smd->m_shared_len = 0;
- smd->m_len = smd->m_shared_len + 8 * (1024 * 1024);
+ smd->m_len = smd->m_shared_len + (8 * (1024 * 1024));
ret = posix_memalign((void **)&smd->m_buf, 4096, smd->m_len);
if (ret) {
mlog(0, "failed to allocate smd m_buf, m_len=%d, ERR: %s\n", smd->m_len, strerror(errno));
goto err;
}
- mlog(1, " Allocate/Register RDMA Proxy buffer %p, ln=%d\n", smd->m_buf, smd->m_len);
+ mlog(0, " Allocate/Register RDMA Proxy buffer %p-%p, ln=%d\n", smd->m_buf, (char*)smd->m_buf + smd->m_len, smd->m_len);
smd->m_offset = scif_register(tx_ep, smd->m_buf, smd->m_len,
(off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0);
mlog(0, " scif_register addr=%p,%d failed %s\n", smd->m_buf, smd->m_len, strerror(errno));
goto err;
}
- mlog(1, " SCIF addr=%p, offset=0x%llx, len %d\n", smd->m_buf, smd->m_offset, smd->m_len);
+ mlog(0, " SCIF addr=%p, offset=0x%llx, len %d\n", smd->m_buf, smd->m_offset, smd->m_len);
smd->m_mr = ibv_reg_mr(smd->md->pd, smd->m_buf, smd->m_len,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ);
mlog(0, " IB addr=%p,%d failed %s\n", smd->m_buf, smd->m_len, strerror(errno));
goto err;
}
- mlog(1, " IB registered addr=%p,%d, mr_addr=%p handle=0x%x, lkey=0x%x rkey=0x%x \n",
+ mlog(0, " IB registered addr=%p,%d, mr_addr=%p handle=0x%x, lkey=0x%x rkey=0x%x \n",
smd->m_buf, smd->m_len, smd->m_mr->addr, smd->m_mr->handle, smd->m_mr->lkey, smd->m_mr->rkey);
/* close MCM device, MIC client, md->slock held */
static void mix_close_device(mcm_ib_dev_t *md, mcm_scif_dev_t *smd)
{
- mlog(1, " md %p smd %p\n", md, smd);
+ mlog(0, " md %p smd %p\n", md, smd);
/* close and remove scif MIX client, leave parent mcm_ib_dev open */
if (smd->scif_op_ep) {
scif_close(smd->scif_tx_ep);
smd->scif_tx_ep = 0;
}
+ if (smd->scif_ev_ep) {
+ scif_close(smd->scif_ev_ep);
+ smd->scif_ev_ep = 0;
+ }
+
+ smd->destroy = 1;
+
+ while (smd->th_ref_cnt) {
+ mlog(0, " waiting for SMD %p ref_cnt (%d) = 0\n", smd, smd->th_ref_cnt);
+ pthread_mutex_unlock(&md->slock);
+ sleep_usec(1000);
+ pthread_mutex_lock(&md->slock);
+ }
mcm_destroy_smd(smd);
mlog(1, " freed smd %p\n", md, smd);
}
/* locate QP object */
-mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid)
+mcm_qp_t *mix_get_qp(mcm_scif_dev_t *smd, uint32_t tid, uint32_t qpn)
{
mcm_qp_t *qp = NULL;
pthread_mutex_lock(&smd->qplock);
qp = get_head_entry(&smd->qplist);
while (qp) {
- if (qp->entry.tid == tid)
+ if ((qp->entry.tid == tid) || (qp->ib_qp->qp_num == qpn))
break;
qp = get_next_entry(&qp->entry, &smd->qplist);
}
goto err;
}
- mlog(1, " RDMA Proxy buffer %p, len %d\n", m_qp->m_buf, m_qp->m_len);
+ mlog(0, " RDMA Proxy buffer %p, len %d\n", m_qp->m_buf, m_qp->m_len);
m_qp->m_off = scif_register(m_qp->smd->scif_tx_ep, m_qp->m_buf, m_qp->m_len,
(off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0);
mlog(0, " IB addr=%p,%d failed %s\n", m_qp->m_buf, m_qp->m_len, strerror(errno));
goto err;
}
- mlog(1, " IB_mr addr=%p,%d, mr_addr %p handle 0x%x lkey 0x%x rkey 0x%x \n",
+ mlog(0, " IB_mr addr=%p,%d, mr_addr %p handle 0x%x lkey 0x%x rkey 0x%x \n",
m_qp->m_buf, m_qp->m_len, m_qp->m_mr->addr, m_qp->m_mr->handle, m_qp->m_mr->lkey, m_qp->m_mr->rkey);
return 0;
{
/* RDMA proxy pool, register with SCIF and IB, set pool and segm size with parameters */
m_qp->wr_end = entries;
- m_qp->wr_len = DAT_MCM_WR * entries; /* 192 bytes to align signaling, ibv_qp + 7 SGE's */
+ m_qp->wr_sz = ALIGN_64(sizeof(struct mcm_wr));
+ m_qp->wr_len = m_qp->wr_sz * entries; /* 64 byte aligned for signal_fence */
if (posix_memalign((void **)&m_qp->wr_buf, 4096, ALIGN_PAGE(m_qp->wr_len))) {
mlog(0, "failed to allocate wr_buf, m_qp=%p, wr_len=%d, entries=%d\n", m_qp, m_qp->wr_len, entries);
goto err;
}
-
memset(m_qp->wr_buf, 0, ALIGN_PAGE(m_qp->wr_len));
- mlog(1, " WR buf pool %p, LEN req=%d, act=%d\n", m_qp->wr_buf, m_qp->wr_len, ALIGN_PAGE(m_qp->wr_len) );
+ mlog(0, " WR buf pool %p, LEN req=%d, act=%d\n", m_qp->wr_buf, m_qp->wr_len, ALIGN_PAGE(m_qp->wr_len) );
m_qp->wr_off = scif_register(m_qp->smd->scif_tx_ep, m_qp->wr_buf, ALIGN_PAGE(m_qp->wr_len),
(off_t)0, SCIF_PROT_READ | SCIF_PROT_WRITE, 0);
m_qp->wr_buf, ALIGN_PAGE(m_qp->wr_len), strerror(errno));
goto err;
}
- mlog(1, " SCIF_mr for wr_buf addr %p, off 0x%llx, len %d, entries %d\n",
+ mlog(0, " SCIF_mr for wr_buf addr %p, off 0x%llx, len %d, entries %d\n",
m_qp->wr_buf, m_qp->wr_off, ALIGN_PAGE(m_qp->wr_len), entries);
return 0;
if (!m_cq->ib_ch)
goto err;
+ if (mcm_config_fd(m_cq->ib_ch->fd))
+ goto err;
+
m_cq->ib_cq = ibv_create_cq(smd->md->ibctx, cq_len, m_cq, m_cq->ib_ch, 0);
if (!m_cq->ib_cq)
goto err;
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
- mlog(1, " MIX_CQ_CREATE: cq_len = %d\n", pmsg->cq_len);
+ mlog(0, " MIX_CQ_CREATE: cq_len = %d\n", pmsg->cq_len);
if (m_cq_create(smd, pmsg->cq_len, &new_mcq))
goto err;
mlog(1, " MIX_QP_DESTROY: QP_t - id 0x%x\n", pmsg->req_id );
/* Find the QP */
- m_qp = mix_get_qp(smd, pmsg->req_id);
+ m_qp = mix_get_qp(smd, pmsg->req_id, 0);
if (!m_qp) {
mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->req_id);
goto err;
static int mix_qp_modify(mcm_scif_dev_t *smd, dat_mix_qp_t *pmsg)
{
- /* TODO */
return 0;
}
mlog(0, " ERR: mix_get_cq, id %d, not found\n", scq_id);
goto err;
}
+ m_qp->m_cq = m_cq;
attr->recv_cq = m_cq->ib_cq;
attr->send_cq = m_cq->ib_cq;
memset((void *)&qp_create, 0, sizeof(qp_create));
qp_create.cap.max_recv_wr = pmsg->qp_t.max_recv_wr;
qp_create.cap.max_recv_sge = pmsg->qp_t.max_recv_sge;
- qp_create.cap.max_send_wr = pmsg->qp_t.max_send_wr * 8; /* max of 8 segments per wr */
+ qp_create.cap.max_send_wr = min(pmsg->qp_t.max_send_wr + mix_buffer_sg_cnt, 16000);
qp_create.cap.max_send_sge = pmsg->qp_t.max_send_sge;
- qp_create.cap.max_inline_data = 0; /* better bandwidth without inline */
+ qp_create.cap.max_inline_data = mcm_ib_inline;
qp_create.qp_type = IBV_QPT_RC;
- mlog(1, " QP_t - max_wr %d adjusted for segmentation, inline == 0\n",
- qp_create.cap.max_send_wr);
+ mlog(0, " QP_t - max_wr %d adjusted up to %d for segmentation \n",
+ pmsg->qp_t.max_send_wr, qp_create.cap.max_send_wr);
pmsg->hdr.status = m_qp_create(smd, &qp_create, pmsg->qp_t.scq_id, &new_mqp);
if (pmsg->hdr.status)
pmsg->m_seg = new_mqp->m_seg;
pmsg->wr_off = new_mqp->wr_off;
pmsg->wr_len = new_mqp->wr_end;
+ pmsg->m_inline = mix_inline_threshold;
- mlog(1, " QP_t - qpn %x, q_id %d, q_ctx %p sq %d,%d rq %d,%d\n",
+ mlog(1, " QP_t - qpn %x, q_id %d, q_ctx %p sq %d,%d rq %d,%d, il %d\n",
pmsg->qp_t.qp_num, pmsg->qp_t.qp_id, new_mqp,
pmsg->qp_t.max_send_wr,
pmsg->qp_t.max_send_sge, pmsg->qp_t.max_recv_wr,
- pmsg->qp_t.max_recv_sge);
+ pmsg->qp_t.max_recv_sge, pmsg->m_inline);
resp:
/* send back response */
static void mix_dto_event(struct mcm_cq *m_cq, struct ibv_wc *wc, int nc)
{
dat_mix_dto_comp_t msg;
- int len, i;
+ int i;
/* send DTO events to MIC client */
msg.hdr.ver = DAT_MIX_VER;
msg.hdr.flags = MIX_OP_REQ;
msg.cq_id = m_cq->cq_id;
msg.cq_ctx = m_cq->cq_ctx;
+ msg.wc_cnt = nc;
for (i=0; i < nc; i++) {
- msg.wc_cnt = 1; /* todo, batch in groups of MIX_WC_MAX */
memcpy(&msg.wc, &wc[i], sizeof(*wc));
- len = sizeof(dat_mix_dto_comp_t);
- if (scif_send_msg(m_cq->smd->scif_ev_ep, (void*)&msg, len))
- return;
- mlog(1, " MIX_DTO_EVENT (ep=%d,sz=%d): cq %p id %d ctx %p stat %d op %d ln %d wr_id %p wc's %d verr 0x%x\n",
- m_cq->smd->scif_op_ep, len, m_cq, msg.cq_id, msg.cq_ctx,
- msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len,
- msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err);
+ if (msg.wc[i].status != IBV_WC_SUCCESS) {
+ mlog(0, " ERROR (ep=%d): cq %p id %d ctx %p stat %d"
+ " op %d wr_id %p wc's %d verr 0x%x\n",
+ m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx,
+ msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len,
+ msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err);
+ } else {
+ mlog(1, " SUCCESS (ep=%d): cq %p id %d ctx %p stat %d"
+ " op %d ln %d wr_id %p wc's %d verr 0x%x\n",
+ m_cq->smd->scif_op_ep, m_cq, msg.cq_id, msg.cq_ctx,
+ msg.wc[i].status, msg.wc[i].opcode, msg.wc[i].byte_len,
+ msg.wc[i].wr_id, msg.wc_cnt, msg.wc[i].vendor_err);
+ }
}
+ scif_send_msg(m_cq->smd->scif_ev_ep, (void*)&msg, sizeof(msg));
}
static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event)
pmsg->cm_id, (void*)pmsg->cm_ctx, pmsg->qp_id);
/* Find the QP for linking */
- m_qp = mix_get_qp(smd, pmsg->qp_id);
+ m_qp = mix_get_qp(smd, pmsg->qp_id, 0);
if (!m_qp) {
mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id);
goto err;
memcpy(&m_cm->msg, &pmsg->msg, sizeof(dat_mcm_msg_t)); /* do I need this copy?? */
/* Attach the QP for this CR */
- m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id);
+ m_cm->m_qp = mix_get_qp(smd, pmsg->qp_id, 0);
if (!m_cm->m_qp) {
mlog(0, " ERR: mix_get_qp, id %d, not found\n", pmsg->qp_id);
return -1;
return 0;
}
-/* Post SEND message request, operation channel */
-static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg)
+static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data, int *events)
{
- int len, ret;
struct mcm_qp *m_qp;
+ struct mcm_wr *m_wr;
struct ibv_send_wr *bad_wr;
- struct ibv_sge sge;
-
- /* hdr already read, get operation data */
- len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
- if (ret != len) {
- mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len);
- return -1;
- }
- mlog(1, " enter: q_id %d, q_ctx %p, len %d, wr_id %p, sge %d, op %x flgs %x \n",
- pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, pmsg->wr.wr_id,
- pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags);
-
- m_qp = (struct mcm_qp*)pmsg->qp_ctx; /* trust me for now, TODO use id lookup */
-
- mlog(1, " qp_num %x \n", m_qp->ib_qp->qp_num);
-
- /* TODO, mix with rdma_writes and preserve order */
- len = pmsg->len;
- ret = scif_recv(smd->scif_op_ep, smd->m_buf + smd->m_shared_len, len, SCIF_RECV_BLOCK);
- if (ret != len) {
- mlog(0, " ERR: scif_recv inline DATA, ret %d, exp %d\n", ret, len);
- return -1;
- }
- mlog(1, " scif_recv inline DATA, len = %d, exp %d \n",ret,len);
-
- pmsg->wr.sg_list = &sge;
- pmsg->wr.num_sge = 1;
- sge.addr = (uint64_t)smd->m_mr->addr + smd->m_shared_len;
- sge.lkey = smd->m_mr->lkey;
- sge.length = len;
- ret = ibv_post_send(m_qp->ib_qp, &pmsg->wr, &bad_wr);
- if (ret) {
- struct ibv_wc wc;
- mlog(0, " ERR ibv_post_send: %s - wr_id %p\n", strerror(errno), pmsg->wr.wr_id);
-
- wc.wr_id = pmsg->wr.wr_id;
- wc.byte_len = 0;
- wc.status = IBV_WC_GENERAL_ERR;
- wc.opcode = IBV_WC_SEND;
- wc.vendor_err = ret;
- mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1);
- }
- MCNTR(smd->md, MCM_QP_SEND);
-
- mlog(1, " exit: q_id %d, q_ctx %p, len %d\n",
- pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len);
-
- return 0;
-}
-
-static void m_post_pending_wr(mcm_scif_dev_t *smd, int *data)
-{
- struct mcm_qp *m_qp;
- struct ibv_send_wr *m_wr, *bad_wr;
- int ret,posted, wr_idx, wr_max, poll_cnt;
+ int ret, posted, done, wr_idx, wr_max, poll_cnt, cn_signal;
pthread_mutex_lock(&smd->qplock);
m_qp = get_head_entry(&smd->qplist);
while (m_qp) {
- wr_max = 1; /* mix_write and ib_write, equal time between op and tx threads */
+ done = 0;
+ wr_max = mcm_ib_signal_rate * 2;
wr_idx = m_qp->wr_tl;
- while (wr_idx != m_qp->wr_hd && wr_max--) { /* change to if, one at a time */
- posted = 0; poll_cnt = 100;
- m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * wr_idx));
- while ((m_wr->wr_id != m_wr->wr.atomic.swap) && (--poll_cnt));
- mlog(1, " qp %p wr_id %#016Lx poll_cnt %d\n", m_qp, m_wr->wr.atomic.swap, poll_cnt);
+ /* check for credits on this sQP */
+ if (((m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate) >= (m_qp->wr_end - (mcm_ib_signal_rate*2))) {
+ mlog(0, " wr_len %d - pending %d = low credits %d, rate=%d waiting.. \n",
+ m_qp->wr_end, (m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate,
+ m_qp->wr_end - ((m_qp->post_sig_cnt - m_qp->comp_cnt) * mcm_ib_signal_rate),
+ mcm_ib_signal_rate);
+ *data += (m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end;
+ break;
+ }
- if (m_wr->wr_id == m_wr->wr.atomic.swap) {
- char *sbuf = (char*)m_wr->sg_list->addr;
-#ifdef MCM_PROFILE
- if (mcm_profile) {
- uint32_t diff = ((mcm_ts_us() + 1000000) - m_wr->xrc_remote_srq_num) % 1000000;
- uint32_t ops = (uint32_t)m_wr->wr.atomic.rkey;
-
- if ((diff/ops) > 100000)
- mlog(0, "tx_thread: slow readfrom -> m_wr %p wr_id %#016Lx ln=%d, pending %d, %u usecs (%u-%u), CPU_id=%d\n",
- m_wr, m_wr->wr_id, m_wr->sg_list->length, ops, diff/ops, mcm_ts_us(), m_wr->xrc_remote_srq_num,
- sched_getcpu());
-
- m_wr->xrc_remote_srq_num = 0;
- m_wr->wr.atomic.rkey = 0;
-
- if (!m_qp->rf_min || diff/ops < m_qp->rf_min)
- m_qp->rf_min = diff/ops;
- if (!m_qp->rf_max || diff/ops > m_qp->rf_max)
- m_qp->rf_max = diff/ops;
- if (!m_qp->rf_avg)
- m_qp->rf_avg = diff/ops;
+ while (wr_idx != m_qp->wr_hd && wr_max--) {
+ cn_signal = 0; posted = 0; poll_cnt = 100;
+ m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * wr_idx));
+
+ /* inline, OP thread posted */
+ if (m_wr->flags & M_SEND_POSTED) {
+ mlog(1, " POSTED: [%d] qp %p hd %d tl %d idx %d wr %p wr_id %p, addr %p sz %d sflg 0x%x mflg 0x%x\n",
+ done, m_qp, m_qp->wr_hd, m_qp->wr_tl, wr_idx, m_wr,
+ m_wr->org_id, m_wr->wr.sg_list->addr, m_wr->sg->length,
+ m_wr->wr.send_flags, m_wr->flags);
+ done++;
+ if (++wr_idx == m_qp->wr_end)
+ wr_idx = 0;
+ continue;
+ }
+ while ((m_wr->wr.wr_id != m_wr->org_id) && (--poll_cnt));
+ mlog(1, " qp %p hd %d tl %d idx %d wr %p wr_id %#016Lx poll_cnt %d\n",
+ m_qp, m_qp->wr_hd, m_qp->wr_tl, wr_idx, m_wr, m_wr->org_id, poll_cnt);
+
+ if (m_wr->wr.wr_id == m_wr->org_id) {
+ char *sbuf = (char*)m_wr->wr.sg_list->addr;
+
+ mlog(1, " m_wr %p ready for ibv_post addr=%p ln=%d, lkey=%x\n",
+ m_wr, sbuf, m_wr->sg->length, m_wr->sg->lkey);
+ mlog(1, " wr_id %#016Lx next %p sglist %p sge %d op %d flgs"
+ " %d idata 0x%x raddr %p rkey %x \n",
+ m_wr->wr.wr_id, m_wr->wr.next, m_wr->wr.sg_list, m_wr->wr.num_sge, m_wr->wr.opcode,
+ m_wr->wr.send_flags, m_wr->wr.imm_data, m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey);
+
+ /* signaling and eager completion */
+ if (m_wr->wr.send_flags & IBV_SEND_SIGNALED)
+ cn_signal = 1;
+
+ if (!((m_qp->post_cnt+1) % mcm_ib_signal_rate)) {
+ m_wr->wr.send_flags |= IBV_SEND_SIGNALED;
+ m_wr->flags |= M_SEND_MP_SIG;
+ }
+ else if (mix_eager_completion)
+ m_wr->wr.send_flags &= ~IBV_SEND_SIGNALED;
+
+ if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) {
+ m_qp->post_sig_cnt++;
+ if (cn_signal && !mix_eager_completion)
+ m_wr->flags |= M_SEND_CN_SIG;
+ if (mix_shared_buffer)
+ m_wr->m_idx = (sbuf + (m_wr->wr.sg_list->length - 1)) - smd->m_buf;
else
- m_qp->rf_avg = ((diff/ops + m_qp->rf_avg) / 2);
+ m_wr->m_idx = (sbuf + (m_wr->wr.sg_list->length - 1)) - m_qp->m_buf;
+
+ mlog(1, " signaled, qp %p wr %p wr_id %p flgs 0x%x,"
+ " pcnt %d sg_rate %d sqe_cnt %d, hd %d tl %d sz %d\n",
+ m_qp, m_wr, m_wr->wr.wr_id, m_wr->wr.send_flags,
+ m_qp->post_cnt, mcm_ib_signal_rate,
+ m_qp->wr_end, m_qp->wr_hd, m_qp->wr_tl,
+ m_wr->wr.sg_list->length);
}
-#endif
-
- mlog(1, " tx_thread: m_wr %p wr_id %#016Lx data ready for IB write, 1st byte 0x%x last byte 0x%x, ln=%d\n",
- m_wr, m_wr->wr_id, sbuf[0], sbuf[m_wr->sg_list->length-1], m_wr->sg_list->length);
- mlog(1, " tx_thread: wr_id %#016Lx next %p sglist %p sge %d op %d flgs %d idata 0x%x raddr %p rkey %x \n",
- m_wr->wr_id, m_wr->next, m_wr->sg_list, m_wr->num_sge, m_wr->opcode,
- m_wr->send_flags, m_wr->imm_data, m_wr->wr.rdma.remote_addr, m_wr->wr.rdma.rkey);
-
- ret = ibv_post_send(m_qp->ib_qp, m_wr, &bad_wr);
- if (ret) {
+ m_wr->wr.wr_id = (uint64_t)m_wr;
+ ret = ibv_post_send(m_qp->ib_qp, &m_wr->wr, &bad_wr);
+ if (ret || (cn_signal && mix_eager_completion)) {
struct ibv_wc wc;
- mlog(0, " ERR ibv_post_write: %s - wr_id %p\n",
- strerror(errno), m_wr->wr_id);
-
- wc.wr_id = m_wr->wr_id;
- wc.byte_len = 0;
- wc.status = IBV_WC_GENERAL_ERR;
- wc.opcode = IBV_WC_RDMA_WRITE;
+ mlog(1, " dto_event: sig %d ret %d, %s - m_wr %p, wr_id %p\n",
+ cn_signal, ret, strerror(errno),
+ m_wr->wr.wr_id, m_wr->org_id);
+
+ wc.wr_id = m_wr->org_id;
+ wc.byte_len = m_wr->sg->length;
+ wc.status = ret ? IBV_WC_GENERAL_ERR : IBV_WC_SUCCESS;
+ wc.opcode = m_wr->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE;
wc.vendor_err = ret;
mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1);
}
- MCNTR(smd->md, MCM_QP_WRITE);
- mlog(1, " - qp %p wr %p wr_id %#016Lx posted tl=%d hd=%d\n",
- m_qp, m_wr, m_wr->wr_id, m_qp->wr_tl, m_qp->wr_hd);
+ m_qp->post_cnt++;
+ m_wr->flags |= M_SEND_POSTED;
+
+ if (m_wr->wr.opcode == IBV_WR_SEND) {
+ if (m_wr->sg->length <= mix_inline_threshold)
+ MCNTR(smd->md, MCM_QP_SEND_INLINE);
+ else
+ MCNTR(smd->md, MCM_QP_SEND);
+ } else {
+ if (m_wr->sg->length <= mix_inline_threshold)
+ MCNTR(smd->md, MCM_QP_WRITE_INLINE);
+ else
+ MCNTR(smd->md, MCM_QP_WRITE_SEG);
+ }
+ mlog(1, " qp %p wr %p wr_id %#016Lx posted tl=%d"
+ " hd=%d idx=%d pst=%d,%d cmp %d\n",
+ m_qp, m_wr, m_wr->org_id, m_qp->wr_tl,
+ m_qp->wr_hd, wr_idx, m_qp->post_cnt,
+ m_qp->post_sig_cnt, m_qp->comp_cnt);
- m_wr->wr_id = 0;
posted++;
- if (++m_qp->wr_tl == m_qp->wr_end)
- m_qp->wr_tl = 0;
}
if (!posted) {
- mlog(1, " - qp %p wr_id %#016Lx still not ready\n", m_qp, m_wr->wr.atomic.swap);
+ mlog(1, " qp %p wr %p wr_id %#016Lx still not ready\n", m_qp, m_wr, m_wr->org_id);
break;
}
if (++wr_idx == m_qp->wr_end) /* posted WR, move to next */
wr_idx = 0;
+
+ if (smd->destroy) {
+ mlog(0, " SMD destroy - QP %p hd %d tl %d pst %d,%d cmp %d, pending data %d, events %d\n",
+ m_qp, m_qp->wr_hd, m_qp->wr_tl,m_qp->post_cnt,
+ m_qp->post_sig_cnt, m_qp->comp_cnt, data, events);
+ mlog(0, " wr %p wr_id %p org_id %p sglist %p sge %d ln %d op %d flgs"
+ " %x idata 0x%x raddr %p rkey %x m_flgs %x\n",
+ m_wr, m_wr->wr.wr_id, m_wr->org_id, m_wr->wr.sg_list,
+ m_wr->wr.num_sge, m_wr->sg->length, m_wr->wr.opcode,
+ m_wr->wr.send_flags, m_wr->wr.imm_data,
+ m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey, m_wr->flags);
+
+ }
+ }
+ *data += ((m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end) - done;
+ *events += m_qp->post_sig_cnt - m_qp->comp_cnt;
+
+ if (smd->destroy) {
+ mlog(0, " SMD destroy - QP %p hd %d tl %d pst %d,%d cmp %d, pending data %d, events %d\n",
+ m_qp, m_qp->wr_hd, m_qp->wr_tl,m_qp->post_cnt,
+ m_qp->post_sig_cnt, m_qp->comp_cnt, data, events);
}
- *data += (m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end;
m_qp = get_next_entry(&m_qp->entry, &smd->qplist);
}
- mlog(1, " tx_tread: pending data = %d on device smd %p\n", *data, smd);
pthread_mutex_unlock(&smd->qplock);
}
-/* Post SEND message request, operation channel */
-static int mix_post_write(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg)
+/* initiate proxy data transfer, operation channel */
+static int m_proxy_data(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg, struct mcm_qp *m_qp)
{
- int len, ret, i;
+ int len, ret, i, retries;
off_t l_off, r_off;
uint64_t total_offset;
int l_start, l_end, l_len, cacheln_off, seg_len;
- struct mcm_qp *m_qp;
- struct ibv_send_wr *m_wr;
+ struct mcm_wr *m_wr;
struct ibv_sge *m_sge;
- /* hdr already read, get mix_sent_t data */
- len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
- if (ret != len) {
- mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len);
- return -1;
- }
mlog(1, " q_id %d, q_ctx %p, len %d, wr_id %p, sge %d, op %x flgs %x wr_idx %d\n",
pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, pmsg->wr.wr_id,
pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags, pmsg->sge[0].lkey);
- m_qp = (struct mcm_qp*)pmsg->qp_ctx;
total_offset = 0;
+ if (pmsg->wr.opcode == IBV_WR_SEND)
+ MCNTR(smd->md, MCM_MX_SEND);
+ else
+ MCNTR(smd->md, MCM_MX_WRITE);
+
/* lock, tx thread can be posting */
pthread_mutex_lock(&smd->qplock);
- m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * m_qp->wr_hd));
- m_sge = (struct ibv_sge *)((char*)m_wr + 80);
+ m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd));
+ m_sge = m_wr->sg;
mlog(1, " m_wr %p m_sge %p \n", m_wr, m_sge);
- memcpy(&m_wr->num_sge, &pmsg->wr.num_sge, 40);
- m_wr->wr.atomic.swap = pmsg->wr.wr_id;
- m_wr->sg_list = m_sge;
- m_wr->next = 0;
-#ifdef MCM_PROFILE
- m_wr->xrc_remote_srq_num = 0;
-#endif
- m_wr->wr.rdma.remote_addr += total_offset;
- m_wr->num_sge = 0;
+ memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40);
+ m_wr->org_id = pmsg->wr.wr_id;
+ m_wr->wr.sg_list = m_sge;
+ m_wr->wr.next = 0;
+ m_wr->wr.wr.rdma.remote_addr += total_offset;
+ m_wr->wr.num_sge = 0;
+ m_wr->wr.wr_id = 0;
+ m_wr->w_idx = m_qp->wr_hd;
+ m_wr->flags = 0;
+ m_wr->context = (uint64_t)m_qp;
for (i=0;i<pmsg->wr.num_sge;i++) {
r_off = ALIGN_DOWN_64(r_off);
while (l_len) {
- m_wr->num_sge ++;
+ m_wr->wr.num_sge++;
+ retries = 1;
+
+ /* Send or last available WR, send all */
+ if (pmsg->wr.opcode == IBV_WR_SEND)
+ seg_len = l_len;
+ else
+ seg_len = (l_len > m_qp->m_seg)? m_qp->m_seg : l_len;
- seg_len = (l_len > m_qp->m_seg)? m_qp->m_seg : l_len;
-
if (mix_shared_buffer) {
l_start = ALIGN_64(smd->m_hd);
if ((l_start + seg_len) > smd->m_shared_len)
l_end = l_start + seg_len;
if (l_start < smd->m_tl && l_end > smd->m_tl) {
- mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d\n",
- smd->m_hd, smd->m_tl, seg_len);
- return -1; /* todo queue up, don't fail */
+ mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d, retries %d\n",
+ smd->m_hd, smd->m_tl, seg_len, retries);
+ ret = -1;
+ goto bail;
}
-
l_off = smd->m_offset + l_start;
}
else {
if (l_start < m_qp->m_tl && l_end > m_qp->m_tl) {
mlog(0, " ERR: mix_post_write stalled, insufficient proxy memory, hd 0x%x, tl 0x%x, len %d\n",
m_qp->m_hd, m_qp->m_tl, seg_len);
- return -1; /* todo queue up, don't fail */
+ ret = -1;
+ goto bail;
}
-
l_off = m_qp->m_off + l_start;
}
mlog(1, " SCIF_readfrom[%d] l_off %p, r_off %p, l_start 0x%x l_end 0x%x seg_len %d, len %d cacheln_off %d \n",
i, l_off, r_off, l_start, l_end, seg_len, len, cacheln_off);
-#ifdef MCM_PROFILE
- if (mcm_profile) {
- m_wr->xrc_remote_srq_num = (uint32_t)mcm_ts_us();
- m_wr->wr.atomic.rkey = (uint32_t)((m_qp->wr_hd + m_qp->wr_end - m_qp->wr_tl) % m_qp->wr_end)+1;
- mlog(1, " op_thread: SCIF_readfrom on PERF CPU_id %d, ts=%d us, pending %d\n",
- sched_getcpu(), m_wr->xrc_remote_srq_num, m_wr->wr.atomic.rkey);
- }
-#endif
+
ret = scif_readfrom(smd->scif_tx_ep, l_off, seg_len, r_off, 0);
if (ret) {
mlog(0, " ERR: scif_readfrom, ret %d\n", ret);
- return -1;
+ goto bail;
}
MCNTR(smd->md, MCM_SCIF_READ_FROM);
if (mix_shared_buffer) {
smd->m_hd = l_end;
- m_sge->addr = (uint64_t)(smd->m_buf + l_start + cacheln_off);
+ m_sge->addr = (uint64_t)(smd->m_buf + l_start + cacheln_off);
m_sge->lkey = smd->m_mr->lkey;
- }
- else {
+ } else {
m_qp->m_hd = l_end;
- m_sge->addr = (uint64_t)(m_qp->m_buf + l_start + cacheln_off);
+ m_sge->addr = (uint64_t)(m_qp->m_buf + l_start + cacheln_off);
m_sge->lkey = m_qp->m_mr->lkey;
}
m_sge->length = seg_len - cacheln_off;
m_sge++;
/* if enough for this WR, then set up DMA signal, and move to next WR */
- if (seg_len == m_qp->m_seg || i == pmsg->wr.num_sge - 1) {
- l_off = m_qp->wr_off + (m_qp->wr_hd * DAT_MCM_WR);
+ if (seg_len == m_qp->m_seg || i == pmsg->wr.num_sge - 1) {
+ l_off = m_qp->wr_off + (m_qp->wr_hd * m_qp->wr_sz);
mlog(1, " SCIF_fence_signal[%d] l_off %p, wr_id %p, new wr_hd 0x%x wr_len %d\n",
i, l_off, pmsg->wr.wr_id, m_qp->wr_hd, m_qp->wr_len);
SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL);
if (ret) {
mlog(0," ERR: scif_fence_signal, ret %d \n", ret);
- return -1;
+ goto bail;
}
MCNTR(smd->md, MCM_SCIF_SIGNAL);
+ MCNTR(smd->md, MCM_MX_WRITE_SEG);
- /* remove special flags unless it's the last segment */
+ /* remove IMM unless it's the last segment */
/* NON-COMPLIANT: IMM segmented causes receiver RDMA length will be wrong */
if (l_len || i != pmsg->wr.num_sge -1) {
- if (m_wr->opcode == IBV_WR_RDMA_WRITE_WITH_IMM)
- m_wr->opcode = IBV_WR_RDMA_WRITE;
- m_wr->send_flags &= IBV_SEND_INLINE;
+ if (m_wr->wr.opcode == IBV_WR_RDMA_WRITE_WITH_IMM)
+ m_wr->wr.opcode = IBV_WR_RDMA_WRITE;
+ m_wr->wr.send_flags = 0;
}
+ if (pmsg->len <= mcm_ib_inline)
+ m_wr->wr.send_flags |= IBV_SEND_INLINE;
+
/* took WR slot, move head */
if (++m_qp->wr_hd == m_qp->wr_end)
m_qp->wr_hd = 0;
- if (m_qp->wr_hd == m_qp->wr_tl)
- mlog(0, "ERR: proxy send queue overflow in m_qp %p, queue size: %d\n", m_qp, m_qp->wr_end);
+ if (m_qp->wr_hd == m_qp->wr_tl) {
+ mlog(0, " ERR: post_write stalled waiting for proxy WR, hd 0x%x, tl 0x%x, len %d, retries %d\n",
+ m_qp->wr_hd, m_qp->wr_tl, seg_len, retries);
+ pthread_mutex_unlock(&smd->qplock);
+ ret = -1;
+ goto bail;
+ }
pthread_mutex_unlock(&smd->qplock);
ret = write(smd->md->mc->tx_pipe[1], "w", sizeof("w")); /* signal tx_thread */
pthread_mutex_lock(&smd->qplock);
/* prepare the next WR */
- m_wr = (struct ibv_send_wr *)(m_qp->wr_buf + (DAT_MCM_WR * m_qp->wr_hd));
- m_sge = (struct ibv_sge *)((char*)m_wr + 80);
-
- mlog(1, " next m_wr %p m_sge %p \n", m_wr, m_sge);
-
- memcpy(&m_wr->num_sge, &pmsg->wr.num_sge, 40);
- m_wr->wr.atomic.swap = pmsg->wr.wr_id;
- m_wr->sg_list = m_sge;
- m_wr->next = 0;
-#ifdef MCM_PROFILE
- m_wr->xrc_remote_srq_num = 0;
-#endif
- m_wr->wr.rdma.remote_addr += total_offset;
- m_wr->num_sge = 0;
+ m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd));
+ m_sge = m_wr->sg;
+
+ mlog(1, " next m_wr %p m_sge %p \n", m_wr, m_wr->sg);
+
+ memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40);
+ m_wr->org_id = pmsg->wr.wr_id;
+ m_wr->wr.sg_list = m_wr->sg;
+ m_wr->wr.next = 0;
+ m_wr->wr.wr.rdma.remote_addr += total_offset;
+ m_wr->wr.num_sge = 0;
+ m_wr->wr.wr_id = 0;
+ m_wr->w_idx = m_qp->wr_hd;
+ m_wr->flags = 0;
+ m_wr->context = (uint64_t)m_qp;
}
}
}
pthread_mutex_unlock(&smd->qplock);
- if (m_wr->num_sge) {
+ if (m_wr->wr.num_sge) {
pthread_mutex_lock(&smd->qplock);
mlog(0, " Warning: corner case, the last sge has length 0\n");
- l_off = m_qp->wr_off + (m_qp->wr_hd * DAT_MCM_WR);
+ l_off = m_qp->wr_off + (m_qp->wr_hd * m_qp->wr_sz);
mlog(1, " SCIF_fence_signal[%d] l_off %p, wr_id %p, new wr_hd 0x%x wr_len %d\n",
i, l_off, pmsg->wr.wr_id, m_qp->wr_hd, m_qp->wr_len);
SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL);
if (ret) {
mlog(0," ERR: scif_fence_signal, ret %d \n", ret);
- return -1;
+ goto bail;
}
MCNTR(smd->md, MCM_SCIF_SIGNAL);
+ MCNTR(smd->md, MCM_MX_WRITE_SEG);
/* took WR slot, move head */
if (++m_qp->wr_hd == m_qp->wr_end)
ret = write(smd->md->mc->tx_pipe[1], "w", sizeof("w")); /* signal tx_thread */
pthread_mutex_unlock(&smd->qplock);
}
+ ret = 0;
+bail:
+ if (ret) {
+ struct ibv_wc wc;
+
+ wc.wr_id = pmsg->wr.wr_id;
+ wc.byte_len = 0;
+ wc.status = IBV_WC_GENERAL_ERR;
+ wc.opcode = pmsg->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE;
+ wc.vendor_err = ret;
+ mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1);
+ }
mlog(1, " exit: q_id %d, q_ctx %p, len %d, wr_hd = %d\n",
pmsg->qp_id, (void*)pmsg->qp_ctx, pmsg->len, m_qp->wr_hd);
- return 0;
+
+ return ret;
+}
+
+/* Post SEND message request, IB send or rdma write, operation channel */
+static int mix_post_send(mcm_scif_dev_t *smd, dat_mix_send_t *pmsg)
+{
+ int len, ret, l_start, l_end;
+ struct mcm_qp *m_qp;
+ struct mcm_wr *m_wr, *m_wr_prev;
+
+ /* hdr already read, get operation data */
+ ret = -1;
+ len = sizeof(dat_mix_send_t) - sizeof(dat_mix_hdr_t);
+ ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: scif_recv WR, ret %d, exp %d\n", ret, len);
+ return -1;
+ }
+ m_qp = (struct mcm_qp*)pmsg->qp_ctx;
+
+ mlog(1, " q_id %d, q_num %x ln %d, wr_id %p, sge %d, op %x flgs %x pst %d,%d cmp %d, inl %d, %s\n",
+ pmsg->qp_id, m_qp->ib_qp->qp_num, pmsg->len, pmsg->wr.wr_id,
+ pmsg->wr.num_sge, pmsg->wr.opcode, pmsg->wr.send_flags, m_qp->post_cnt,
+ m_qp->post_sig_cnt, m_qp->comp_cnt, pmsg->hdr.flags & MIX_OP_INLINE ? 1:0,
+ pmsg->wr.opcode == IBV_WR_SEND ? "SND":"WR");
+
+ if (!(pmsg->hdr.flags & MIX_OP_INLINE))
+ return (m_proxy_data(smd, pmsg, m_qp));
+
+ if (pmsg->wr.opcode == IBV_WR_SEND)
+ MCNTR(smd->md, MCM_MX_SEND_INLINE);
+ else
+ MCNTR(smd->md, MCM_MX_WRITE_INLINE);
+
+ pthread_mutex_lock(&smd->qplock);
+ m_wr = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * m_qp->wr_hd));
+ if (m_qp->wr_hd == 0)
+ m_wr_prev = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * (m_qp->wr_end - 1)));
+ else
+ m_wr_prev = (struct mcm_wr *)(m_qp->wr_buf + (m_qp->wr_sz * (m_qp->wr_hd - 1)));
+
+ len = pmsg->len;
+
+ mlog(1, " inline, m_wr %p m_wr_prev %p m_sge %p len %d hd %d tl %d\n",
+ m_wr, m_wr_prev, m_wr->sg, len, m_qp->wr_hd, m_qp->wr_tl);
+
+ /* IB_WR */
+ memcpy(&m_wr->wr.num_sge, &pmsg->wr.num_sge, 40);
+ m_wr->wr.sg_list = m_wr->sg;
+ m_wr->wr.next = 0;
+ m_wr->wr.num_sge = len ? 1:0;
+
+ /* M_WR */
+ m_wr->org_id = pmsg->wr.wr_id;
+ m_wr->w_idx = m_qp->wr_hd;
+ m_wr->flags = 0;
+ m_wr->context = (uint64_t)m_qp;
+
+ if (mix_shared_buffer) {
+ l_start = ALIGN_64(smd->m_hd);
+ if ((l_start + len) > smd->m_shared_len)
+ l_start = 0;
+ l_end = l_start + len;
+
+ if (l_start < smd->m_tl && l_end > smd->m_tl) {
+ mlog(0, " ERR: send inline stalled, no bufs, hd %d tl %d ln %d\n",
+ smd->m_hd, smd->m_tl, len);
+ goto bail; /* todo queue up, don't fail */
+ }
+ m_wr->sg->addr = (uint64_t)(smd->m_buf + l_start);
+ m_wr->sg->lkey = smd->m_mr->lkey;
+ m_wr->sg->length = len;
+ smd->m_hd = m_wr->m_idx = l_end; /* move proxy buffer hd */
+ }
+ else {
+ l_start = ALIGN_64(m_qp->m_hd);
+ if ((l_start + len) > m_qp->m_len)
+ l_start = 0;
+ l_end = l_start + len;
+
+ if (l_start < m_qp->m_tl && l_end > m_qp->m_tl) {
+ mlog(0, " ERR: send inline stalled, no bufs, hd %d tl %d ln %d\n",
+ m_qp->m_hd, m_qp->m_tl, len);
+ goto bail; /* todo queue up, don't fail */
+ }
+ m_wr->sg->addr = (uint64_t)(m_qp->m_buf + l_start);
+ m_wr->sg->lkey = m_qp->m_mr->lkey;
+ m_wr->sg->length = len;
+ m_qp->m_hd = m_wr->m_idx = l_end; /* move proxy buffer hd */
+ }
+
+ if (len) {
+ /* copy data into proxy buffer, signal TX thread via wr_id */
+ ret = scif_recv(smd->scif_op_ep, (void*)m_wr->sg->addr, len, SCIF_RECV_BLOCK);
+ if (ret != len) {
+ mlog(0, " ERR: scif_recv inline DATA, ret %d, exp %d\n", ret, len);
+ goto bail;
+ }
+ }
+
+ if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) {
+ if (mix_eager_completion)
+ m_wr->flags |= M_SEND_CN_EAGER_SIG;
+ else
+ m_wr->flags |= M_SEND_CN_SIG;
+ }
+
+ if (len <= mcm_ib_inline)
+ m_wr->wr.send_flags |= IBV_SEND_INLINE;
+
+ /* order ok, nothing pending, post here */
+ if (m_qp->wr_hd == m_qp->wr_tl ||
+ m_wr_prev->flags & M_SEND_POSTED) {
+ struct ibv_send_wr *bad_wr;
+
+ /* segmentation separate from consumer signaling */
+ if (!((m_qp->post_cnt+1) % mcm_ib_signal_rate)) {
+ m_wr->flags |= M_SEND_MP_SIG;
+ m_wr->wr.send_flags |= IBV_SEND_SIGNALED;
+ }
+ else if (mix_eager_completion)
+ m_wr->wr.send_flags &= ~IBV_SEND_SIGNALED;
+
+ m_wr->flags |= M_SEND_POSTED;
+ m_wr->wr.wr_id = (uint64_t)m_wr;
+
+ ret = ibv_post_send(m_qp->ib_qp, &m_wr->wr, &bad_wr);
+ if (!ret) {
+ m_qp->post_cnt++;
+ if (m_wr->wr.send_flags & IBV_SEND_SIGNALED) {
+ m_qp->post_sig_cnt++;
+ mlog(1, " INLINE signaled, qp %p wr %p wr_id %p cn_flgs %x wr_flgs %x,"
+ " pcnt %d sg_rate %d sqe_cnt %d, hd %d tl %d sz %d\n",
+ m_qp, m_wr, m_wr->wr.wr_id, pmsg->wr.send_flags,
+ m_wr->wr.send_flags,
+ m_qp->post_cnt, mcm_ib_signal_rate,
+ m_qp->wr_end, m_qp->wr_hd, m_qp->wr_tl, len);
+ }
+ if (m_wr->wr.opcode == IBV_WR_SEND)
+ MCNTR(smd->md, MCM_QP_SEND_INLINE);
+ else
+ MCNTR(smd->md, MCM_QP_WRITE_INLINE);
+ }
+ } else
+ ret = 0;
+
+ /* took WR slot, move hd */
+ if (++m_qp->wr_hd == m_qp->wr_end)
+ m_qp->wr_hd = 0;
+
+ /* signal TX, CQ thread, this WR data ready */
+ m_wr->wr.wr_id = pmsg->wr.wr_id;
+ if (!(m_wr->flags & M_SEND_POSTED) ||
+ ((m_wr->flags & M_SEND_POSTED) &&
+ (m_wr->wr.send_flags & IBV_SEND_SIGNALED)))
+ write(smd->md->mc->tx_pipe[1], "w", sizeof("w"));
+
+bail:
+ pthread_mutex_unlock(&smd->qplock);
+
+ if (ret ||
+ ((m_wr->flags & M_SEND_POSTED) &&
+ (m_wr->flags & M_SEND_CN_EAGER_SIG))) {
+ struct ibv_wc wc;
+
+ wc.wr_id = pmsg->wr.wr_id;
+ wc.byte_len = len;
+ wc.status = ret ? IBV_WC_GENERAL_ERR:IBV_WC_SUCCESS;
+ wc.opcode = pmsg->wr.opcode == IBV_WR_SEND ? IBV_WC_SEND:IBV_WC_RDMA_WRITE;
+ wc.vendor_err = ret;
+ mix_dto_event(m_qp->ib_qp->send_cq->cq_context, &wc, 1);
+ }
+
+ return ret;
}
/* receive data on connected SCIF endpoints, operation and unsolicited channels, */
case MIX_SEND:
ret = mix_post_send(smd, (dat_mix_send_t *)phdr);
break;
- case MIX_WRITE:
- ret = mix_post_write(smd, (dat_mix_send_t *)phdr);
- break;
case MIX_LISTEN:
ret = mix_listen(smd, (dat_mix_listen_t *)phdr);
break;
int finalize = 1;
pthread_mutex_lock(&cm->lock);
- mlog(1," enter: state = %s \n", mcm_state_str(cm->state));
+ mlog(0," enter: state = %s \n", mcm_state_str(cm->state));
switch (cm->state) {
case MCM_CONNECTED:
/* CONSUMER: move to err state to flush */
cm->msg.op = htons(MCM_DREQ);
cm->retries = 0;
finalize = 0; /* wait for DREP */
- mlog(1, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n",
+ mlog(0, " DREQ_out (%d): cm_id %d %x %x %x -> %x %x %x\n",
cm->retries+1, cm->entry.tid,
htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn),
htons(cm->msg.sport),htons(cm->msg.daddr.lid),
/* DREQ received, send DREP and schedule event, finalize */
cm->msg.op = htons(MCM_DREP);
cm->state = MCM_DISCONNECTED;
- mlog(1, " DREQ_in: cm_id %d send DREP %x %x %x -> %x %x %x\n",
+ mlog(0, " DREQ_in: cm_id %d send DREP %x %x %x -> %x %x %x\n",
cm->entry.tid, htons(cm->msg.saddr.lid),htonl(cm->msg.saddr.qpn),
htons(cm->msg.sport),htons(cm->msg.daddr.lid),
htonl(cm->msg.dqpn), htons(cm->msg.dport));
void mcm_cq_event(struct mcm_cq *m_cq)
{
struct ibv_cq *ib_cq;
+ struct mcm_qp *m_qp;
+ struct mcm_wr *m_wr;
void *cq_ctx;
- int ret, notify = 0;
- struct ibv_wc wc[10];
+ int i, ret, num, notify = 0;
+ struct ibv_wc wc[DAT_MIX_WC_MAX];
+ struct ibv_wc wc_ev[DAT_MIX_WC_MAX];
- mlog(1," m_cq(%p) \n", m_cq);
+ mlog(2," m_cq(%p) \n", m_cq);
ret = ibv_get_cq_event(m_cq->ib_ch, &ib_cq, (void *)&cq_ctx);
- mlog(1," m_cq %p == cq_ctx %p \n", m_cq, cq_ctx);
- if (ret == 0) {
+ if (ret == 0)
ibv_ack_cq_events(m_cq->ib_cq, 1);
- }
+
retry:
- ret = ibv_poll_cq(m_cq->ib_cq, 10, wc);
- mlog(1," completions = %d \n", ret);
+ ret = ibv_poll_cq(m_cq->ib_cq, DAT_MIX_WC_MAX, wc);
if (ret <= 0) {
if (!ret && !notify) {
ibv_req_notify_cq(m_cq->ib_cq, 0);
notify = 1;
goto retry;
}
+ mlog(2," m_cq(%p) empty, armed... \n", m_cq);
return;
} else
notify = 0;
- /* NOTE: if WR was segmented update length, no context to handle so let mcm client handle */
- mix_dto_event(m_cq, wc, ret);
+ num = 0;
+ for (i=0; i<ret; i++) {
+ m_wr = (struct mcm_wr *)wc[i].wr_id;
+ m_qp = (struct mcm_qp *)m_wr->context;
+
+ mlog(1," wr_id[%d of %d] m_wr %p m_qp %p\n", i, ret, m_wr, m_qp);
+
+ m_qp->comp_cnt++;
+
+ if (wc[i].status == IBV_WC_SUCCESS) {
+ if (m_wr->flags & M_SEND_CN_SIG) {
+ wc_ev[num].wr_id = m_wr->org_id;
+ wc_ev[num].status = IBV_WC_SUCCESS;
+ wc_ev[num].byte_len = wc[i].byte_len;
+ num++;
+ }
+ }
+ else {
+ /* segmentation, only report first error */
+ if (m_cq->prev_id != m_wr->org_id) {
+ char *sbuf = (char*)m_wr->sg->addr;
+
+ mlog(0," DTO ERR: st %d, vn %x pst %d cmp %d qstate 0x%x\n",
+ wc[i].status, wc[i].vendor_err, m_qp->post_cnt,
+ m_qp->comp_cnt, m_qp->ib_qp->state);
+ mlog(0, " DTO ERR: m_wr %p laddr %p=0x%x - %p=0x%x, len=%d, lkey=%x\n",
+ m_wr, sbuf, sbuf[0], &sbuf[m_wr->sg->length],
+ sbuf[m_wr->sg->length], m_wr->sg->length, m_wr->sg->lkey);
+ mlog(0, " DTO ERR: wr_id %#016Lx next %p sglist %p sge %d op %d flgs"
+ " %d idata 0x%x raddr %p rkey %x \n",
+ m_wr->org_id, m_wr->wr.next, m_wr->sg, m_wr->wr.num_sge,
+ m_wr->wr.opcode, m_wr->wr.send_flags, m_wr->wr.imm_data,
+ m_wr->wr.wr.rdma.remote_addr, m_wr->wr.wr.rdma.rkey);
+
+ m_cq->prev_id = m_wr->org_id;
+ wc_ev[num].wr_id = m_wr->org_id;
+ wc_ev[num].status = wc[i].status;
+ num++;
+ }
+ }
+ if (mix_shared_buffer) {
+ mlog(1," smd->mbuf_tl %p to %p, wr_tl %d to %d, pst %d,%d cmp %d\n",
+ m_qp->smd->m_tl, m_wr->m_idx, m_qp->wr_tl,
+ m_wr->w_idx, m_qp->post_cnt, m_qp->post_sig_cnt,
+ m_qp->comp_cnt);
+ pthread_mutex_lock(&m_qp->smd->qplock);
+ m_qp->smd->m_tl = m_wr->m_idx;
+ m_qp->wr_tl = m_wr->w_idx;
+ pthread_mutex_unlock(&m_qp->smd->qplock);
+ }
+ else {
+ mlog(1," qp->mbuf_tl %p to %p, wr_tl %p to %d\n",
+ m_qp->m_tl, m_wr->m_idx);
+ pthread_mutex_lock(&m_qp->smd->qplock);
+ m_qp->m_tl = m_wr->m_idx;
+ m_qp->wr_tl = m_wr->w_idx;
+ pthread_mutex_unlock(&m_qp->smd->qplock);
+ }
+ }
+ if (num)
+ mix_dto_event(m_cq, wc_ev, num);
+
goto retry;
}
mcm_client_t *mc = (mcm_client_t*)mic_client;
struct mcm_ib_dev *md;
struct mcm_scif_dev *smd;
+ struct mcm_cq *m_cq;
struct pollfd set;
- int i, data =0;
+ int i, data, events;
char rbuf[2];
if (mcm_affinity) {
while (!finished) {
pthread_mutex_lock(&mc->txlock);
- data = 0;
+ data = 0, events = 0;
for (i=0;i<MCM_IB_MAX;i++) {
md = &mc->mdev[i];
if (md->ibctx == NULL)
/* all active DAPL MCM clients on this IB device */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
- while (smd) {
- smd->ref_cnt++;
+ while (smd && !smd->destroy) {
+ smd->th_ref_cnt++;
pthread_mutex_unlock(&md->slock);
- m_post_pending_wr(smd, &data); /* chk pending WR's */
+
+ pthread_mutex_lock(&smd->cqlock);
+ m_cq = get_head_entry(&smd->cqlist);
+ while (m_cq) {
+ mcm_cq_event(m_cq); /* chk DTO completion */
+ m_cq = get_next_entry(&m_cq->entry, &smd->cqlist);
+ }
+ pthread_mutex_unlock(&smd->cqlock);
+
+ m_post_pending_wr(smd, &data, &events); /* chk pending WR's */
+
pthread_mutex_lock(&md->slock);
- smd->ref_cnt--;
+ smd->th_ref_cnt--;
smd = get_next_entry(&smd->entry, &md->smd_list);
}
pthread_mutex_unlock(&md->slock);
}
pthread_mutex_unlock(&mc->txlock);
- if (!data) { /* don't sleep if we have pending writes, on any device */
+ if (!data && !events) { /* don't sleep if we have pending writes or completions */
set.fd = mc->tx_pipe[0];
set.events = POLLIN;
set.revents = 0;
- mlog(1, "TX thread waiting, no data\n");
+ mlog(1, " waiting, no data\n");
poll(&set, 1, -1); /* no more pending, next scif_readfrom will signal */
- mlog(1, "TX thread signaled\n");
+ mlog(1, " signaled\n");
- if (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN)
+ while (mcm_poll(mc->tx_pipe[0], POLLIN) == POLLIN)
read(mc->tx_pipe[0], rbuf, 2);
}
}
struct mcm_fd_set *set;
struct mcm_ib_dev *md;
struct mcm_scif_dev *smd, *next;
- struct mcm_cq *m_cq;
char rbuf[2];
int i, ret, time_ms;
mcm_fd_set(md->ibctx->async_fd, set, POLLIN);
mcm_fd_set(md->rch->fd, set, POLLIN);
- /* all active SCIF MIC clients */
+ /* all active SCIF MIC clients, OP channels */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd) {
- pthread_mutex_lock(&smd->cqlock);
- m_cq = get_head_entry(&smd->cqlist);
- while (m_cq) {
- mcm_fd_set(m_cq->ib_ch->fd, set, POLLIN);
- m_cq = get_next_entry(&m_cq->entry, &smd->cqlist);
- }
- pthread_mutex_unlock(&smd->cqlock);
mcm_fd_set(smd->scif_op_ep, set, POLLIN);
mcm_check_timers(smd, &time_ms);
- smd = get_next_entry(&smd->entry, &md->smd_list);;
+ smd = get_next_entry(&smd->entry, &md->smd_list);
}
pthread_mutex_unlock(&md->slock);
}
/* SCIF MIC client ops and conn messages */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
- while (smd) {
- smd->ref_cnt++;
+ while (smd && !smd->destroy) {
+ smd->th_ref_cnt++;
next = get_next_entry(&smd->entry, &md->smd_list);
pthread_mutex_unlock(&md->slock);
- pthread_mutex_lock(&smd->cqlock);
- m_cq = get_head_entry(&smd->cqlist);
- while (m_cq) {
- ret = mcm_poll(m_cq->ib_ch->fd, POLLIN);
- if (ret == POLLIN)
- mcm_cq_event(m_cq); /* completions */
- m_cq = get_next_entry(&m_cq->entry, &smd->cqlist);
- }
- pthread_mutex_unlock(&smd->cqlock);
-
ret = mcm_poll(smd->scif_op_ep, POLLIN); /* operations */
if (ret == POLLIN)
ret = mix_scif_recv(smd, smd->scif_op_ep);
pthread_mutex_lock(&md->slock);
- smd->ref_cnt--;
+ smd->th_ref_cnt--;
next = get_next_entry(&smd->entry, &md->smd_list);
if (ret == POLLERR)
}
pthread_mutex_unlock(&mc->oplock);
}
+ free(set);
mlog(0, "OP,CM,Event thread exiting\n");
}
void mpxy_cm_thread(void *mic_client)
md = &mc->mdev[i];
if (md->ibctx == NULL)
continue;
+
set[fds].fd = mc->cm_pipe[0];
set[fds].events = POLLIN;
set[fds].revents = 0;