int m_len; /* buffer size */
int m_seg; /* segment size */
int m_shared_len; /* shared buffer size */
+ char *cmd_buf; /* operation command buffer */
} mcm_scif_dev_t;
mix_buffer_sg = atoi(value);
else if (!strcasecmp("buffer_alignment", opt))
mix_align = atoi(value);
- else if (!strcasecmp("buffer_inline_threshold", opt))
+ else if (!strcasecmp("buffer_inline_threshold", opt)) {
mix_inline_threshold = atoi(value);
+ if (mix_inline_threshold > DAT_MIX_INLINE_MAX)
+ mix_inline_threshold = DAT_MIX_INLINE_MAX;
+ }
else if (!strcasecmp("mcm_depth", opt))
mcm_depth = atoi(value);
else if (!strcasecmp("scif_port_id", opt))
ibv_dereg_mr(smd->m_mr);
free (smd->m_buf);
- mlog(1, " m_buf unregistered and freed \n");
+ free (smd->cmd_buf);
+ mlog(1, " m_buf, cmd_buf unregistered and freed\n");
if (smd->ref_cnt) {
mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt);
memset(smd, 0, sizeof(*smd));
smd->md = md;
- /* small shared SEND message pool, TODO protect this across QP's */
+ ret = posix_memalign((void **)&smd->cmd_buf, 64, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX));
+ if (ret) {
+ mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n",
+ ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret);
+ goto err;
+ }
+ mlog(1, "Allocated smd cmd_buf = %p len %d\n",
+ smd->cmd_buf, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX));
+
+ /* small shared SEND message pool */
smd->m_seg = mix_buffer_sg;
if (mix_shared_buffer)
smd->m_shared_len = mix_buffer_mb * (1024 * 1024);
return (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len));
}
-/* Active: new connection request operation, create CM object */
-static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+/* Active: new connection request operation, consumer context, create CM object */
+static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
{
int len, ret;
struct mcm_qp *m_qp = NULL;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
if (ret != len) {
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
/* disconnect request out */
-static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
{
int len, ret;
struct mcm_cm *m_cm;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
if (ret != len) {
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
/* Active, reply received, send RTU, unsolicited channel */
-static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
{
int len, ret;
struct mcm_cm *m_cm;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
if (ret != len) {
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
/* PASSIVE, accept connect request from client, cr_reply */
-static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
{
int len, ret;
struct mcm_cm *m_cm;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
if (ret != len) {
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
}
/* PASSIVE, user reject from MIX client */
-static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
{
int len, ret;
struct mcm_cm *m_cm;
/* hdr already read, get operation data */
len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
- ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+ ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
if (ret != len) {
mlog(0, " ERR: ret %d, exp %d\n", ret, len);
return ret;
return ret;
}
+static int last_op;
+
/* receive data on connected SCIF endpoints, operation and unsolicited channels, */
static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep)
{
- char cmd[DAT_MIX_MSG_MAX];
- dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd;
+ dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)smd->cmd_buf;
int ret, len;
len = sizeof(*phdr);
mlog(1, " -> scif_ep %d len %d ...\n", scif_ep, len);
ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK);
if ((ret != len) || (phdr->ver != DAT_MIX_VER)) {
- mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n",
- scif_ep, ret, len, phdr->ver);
- return -1;
+ mlog(0, " ERR: smd %p ep %d ret %d exp %d ver %d op %d flgs %d last_op %s\n",
+ smd, scif_ep, ret, len, phdr->ver, phdr->op,
+ phdr->flags, mix_op_str(last_op));
+ return POLLERR; /* fatal, close device */
}
+ last_op = phdr->op;
+
mlog(1, " <- %d bytes: ver %d, op %d, flags %d scif_ep %d\n",
len, phdr->ver, phdr->op, phdr->flags, scif_ep);
ret = mix_listen_free(smd, phdr);
break;
case MIX_CM_REQ:
- ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr);
+ ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
break;
case MIX_CM_REP:
case MIX_CM_ACCEPT:
- ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr);
+ ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
break;
case MIX_CM_REJECT:
- ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr);
+ ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
break;
case MIX_CM_RTU:
case MIX_CM_EST:
- ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr);
+ ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
break;
case MIX_CM_DISC:
- ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr);
+ ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
break;
case MIX_CM_DREP:
default:
- mlog(0, " ERROR!!! unknown MIX operation: %d\n", phdr->op);
- return -1;
+ mlog(0, " ERR: SMD %p unknown msg->op: %d, close device\n", smd, phdr->op);
+ return POLLERR; /* fatal, close device */
}
MCNTR(smd->md, MCM_SCIF_RECV);
return ret;
}
if (cm->retries == cm->md->retries) {
- mlog(0, " CM_REPLY: RETRIES EXHAUSTED (lid port qpn)"
+ mlog(0, " CM_REPLY: RETRIES (%d) EXHAUSTED (lid port qpn)"
" %x %x %x -> %x %x %x\n",
- htons(cm->msg.saddr.lid),
+ cm->retries, htons(cm->msg.saddr.lid),
htons(cm->msg.sport),
htonl(cm->msg.saddr.qpn),
htons(cm->msg.daddr.lid),
}
}
+
+static void mcm_dump_cm_lists(mcm_scif_dev_t *smd)
+{
+ mcm_cm_t *cm = NULL, *next;
+ LLIST_ENTRY *list;
+ pthread_mutex_t *lock;
+
+ mlog(0, " SMD %p : \n");
+
+ /* listen list*/
+ list = &smd->llist;
+ lock = &smd->llock;
+ pthread_mutex_lock(lock);
+ next = get_head_entry(list);
+ while (next) {
+ cm = next;
+ next = get_next_entry(&cm->entry, list);
+
+ mlog(0, " CM_LIST %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n",
+ mcm_state_str(cm->state),
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id));
+ }
+ pthread_mutex_unlock(lock);
+
+ /* conn list */
+ list = &smd->clist;
+ lock = &smd->clock;
+ pthread_mutex_lock(lock);
+ next = get_head_entry(list);
+ while (next) {
+ cm = next;
+ next = get_next_entry(&cm->entry, list);
+
+ mlog(0, " CM_CONN %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n",
+ mcm_state_str(cm->state),
+ ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+ ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id),
+ ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+ ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id));
+ }
+ pthread_mutex_unlock(lock);
+
+}
+
/* Find matching CM object for this receive message, return CM reference, timer */
-mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg)
+mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg, int *dup)
{
mcm_cm_t *cm = NULL, *next, *found = NULL;
LLIST_ENTRY *list;
/* conn list first, duplicate requests for MCM_REQ */
list = &smd->clist;
lock = &smd->clock;
+ *dup = 0;
retry_listenq:
pthread_mutex_lock(lock);
ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn));
+ *dup = 1;
MCNTR(cm->md, MCM_CM_ERR_REQ_DUP);
return NULL;
}
goto retry_listenq;
}
-/* don't send reject becasue it may match next smd */
-#if 0
- /* not match on listenq for valid request, send reject */
- if (ntohs(msg->op) == MCM_REQ && !found) {
- mlog(1, " mcm_recv: NO LISTENER for %s %x %x i%x c%x"
- " < %x %x %x, sending reject\n",
- mcm_op_str(ntohs(msg->op)),
- ntohs(msg->daddr.lid), ntohs(msg->dport),
- ntohl(msg->daddr.qpn), ntohl(msg->dqpn),
- ntohs(msg->saddr.lid), ntohs(msg->sport),
- ntohl(msg->saddr.qpn));
-
- mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM);
- }
-#endif
-
- if (!found) {
- mlog(1, " mcm_recv: WARNING no match - op %s [lid, port, cqp, iqp, pid]:"
- " %x %x %x %x %x <- %x %x %x %x l_pid %x r_pid %x\n",
- mcm_op_str(ntohs(msg->op)),
- ntohs(msg->daddr.lid), ntohs(msg->dport),
- ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
- ntohl(msg->d_id), ntohs(msg->saddr.lid),
- ntohs(msg->sport), ntohl(msg->sqpn),
- ntohl(msg->saddr.qpn), ntohl(msg->s_id),
- ntohl(msg->d_id));
-
- if (ntohs(msg->op) == MCM_DREP) {
- MCNTR(cm->md, MCM_CM_ERR_DREP_DUP);
- }
- }
-
return found;
}
{
mcm_cm_t *cm = NULL;
mcm_scif_dev_t *smd;
+ int dup;
/* Walk scif device client list */
pthread_mutex_lock(&md->slock);
smd = get_head_entry(&md->smd_list);
while (smd) {
- cm = mcm_get_smd_cm(smd, msg);
+ cm = mcm_get_smd_cm(smd, msg, &dup);
if (cm)
break;
smd = get_next_entry(&smd->entry, &md->smd_list);
}
pthread_mutex_unlock(&md->slock);
+
+ if (!cm) {
+ mlog(0, " no match - op %s [lid, port, cqp, iqp, pid]:"
+ " %x %x %x %x %x <- %x %x %x %x lpid %x rpid %x\n",
+ mcm_op_str(ntohs(msg->op)),
+ ntohs(msg->daddr.lid), ntohs(msg->dport),
+ ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
+ ntohl(msg->d_id), ntohs(msg->saddr.lid),
+ ntohs(msg->sport), ntohl(msg->sqpn),
+ ntohl(msg->saddr.qpn), ntohl(msg->s_id),
+ ntohl(msg->d_id));
+
+ if (!dup && (ntohs(msg->op) == MCM_REQ)) {
+ mlog(0, " mcm_recv: NO LISTENER for %s %x %x i%x c%x"
+ " < %x %x %x, sending reject\n",
+ mcm_op_str(ntohs(msg->op)),
+ ntohs(msg->daddr.lid), ntohs(msg->dport),
+ ntohl(msg->daddr.qpn), ntohl(msg->dqpn),
+ ntohs(msg->saddr.lid), ntohs(msg->sport),
+ ntohl(msg->saddr.qpn));
+ mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM);
+ }
+
+ if (ntohs(msg->op) == MCM_DREP) {
+ MCNTR(cm->md, MCM_CM_ERR_DREP_DUP);
+ }
+
+ /* debug */
+ pthread_mutex_lock(&md->slock);
+ smd = get_head_entry(&md->smd_list);
+ while (smd) {
+ mcm_dump_cm_lists(smd);
+ smd = get_next_entry(&smd->entry, &md->smd_list);
+ }
+ pthread_mutex_unlock(&md->slock);
+ }
return cm;
}
ret = mcm_poll(smd->scif_op_ep, POLLIN); /* operations */
if (ret == POLLIN)
ret = mix_scif_recv(smd, smd->scif_op_ep);
-
+ if (ret != POLLERR) {
+ ret = mcm_poll(smd->scif_ev_ep, POLLIN); /* CM, events */
+ if (ret == POLLIN)
+ ret = mix_scif_recv(smd, smd->scif_ev_ep);
+ }
pthread_mutex_lock(&md->slock);
smd->th_ref_cnt--;
next = get_next_entry(&smd->entry, &md->smd_list);