]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
mpxyd: CM optimizations for MIC clients, improved checking on inbound CM messages
authorArlin Davis <arlin.r.davis@intel.com>
Wed, 29 May 2013 23:00:32 +0000 (16:00 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Wed, 29 May 2013 23:00:32 +0000 (16:00 -0700)
allow CM operations to be received on OP or EV channels from
MIC clients and provide each SMD channel with aligned message buffer
for scif_recv processing.

add checking for NO match at MD level after checking all SMD children
for inbound CM message match and add dump_cm_lists function for debug.

add check for inline message threshold, DAT_MIX_INLINE_MAX

Signed-off-by: Arlin Davis <arlin.r.davis@intel.com>
dapl/svc/mpxyd.c

index 754b4b7d5f2187c4a1233403c4dbd0c5986e860b..643d9ee7081198ec466af5cc8f8fc4b661e268db 100644 (file)
@@ -307,6 +307,7 @@ typedef struct mcm_scif_dev {
        int                     m_len;          /* buffer size */
        int                     m_seg;          /* segment size */
        int                     m_shared_len;   /* shared buffer size */
+       char                    *cmd_buf;       /* operation command buffer  */
 
 } mcm_scif_dev_t;
 
@@ -822,8 +823,11 @@ static void mpxy_set_options( int debug_mode )
                        mix_buffer_sg = atoi(value);
                else if (!strcasecmp("buffer_alignment", opt))
                        mix_align = atoi(value);
-               else if (!strcasecmp("buffer_inline_threshold", opt))
+               else if (!strcasecmp("buffer_inline_threshold", opt)) {
                        mix_inline_threshold = atoi(value);
+                       if (mix_inline_threshold > DAT_MIX_INLINE_MAX)
+                               mix_inline_threshold = DAT_MIX_INLINE_MAX;
+               }
                else if (!strcasecmp("mcm_depth", opt))
                        mcm_depth = atoi(value);
                else if (!strcasecmp("scif_port_id", opt))
@@ -1651,7 +1655,8 @@ static void mcm_destroy_smd(mcm_scif_dev_t *smd)
                ibv_dereg_mr(smd->m_mr);
 
        free (smd->m_buf);
-       mlog(1, " m_buf unregistered and freed \n");
+       free (smd->cmd_buf);
+       mlog(1, " m_buf, cmd_buf unregistered and freed\n");
 
        if (smd->ref_cnt) {
                mlog(0, " WARNING: ref_cnt not 0, = %d \n", smd->ref_cnt);
@@ -1688,7 +1693,16 @@ static mcm_scif_dev_t *mcm_create_smd(mcm_ib_dev_t *md, scif_epd_t op_ep, scif_e
        memset(smd, 0, sizeof(*smd));
        smd->md = md;
 
-       /* small shared SEND message pool, TODO protect this across QP's */
+       ret = posix_memalign((void **)&smd->cmd_buf, 64, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX));
+       if (ret) {
+               mlog(0, "failed to allocate smd cmd_buf, m_len=%d, ERR: %d\n",
+                       ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX), ret);
+               goto err;
+       }
+       mlog(1, "Allocated smd cmd_buf = %p len %d\n",
+               smd->cmd_buf, ALIGN_64(DAT_MIX_MSG_MAX + DAT_MIX_INLINE_MAX));
+
+       /* small shared SEND message pool */
        smd->m_seg = mix_buffer_sg;
        if (mix_shared_buffer)
                smd->m_shared_len = mix_buffer_mb * (1024 * 1024);
@@ -2664,8 +2678,8 @@ static int mix_cm_event(mcm_cm_t *m_cm, uint32_t event)
        return (scif_send_msg(m_cm->smd->scif_ev_ep, (void*)&msg, len));
 }
 
-/* Active: new connection request operation, create CM object */
-static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+/* Active: new connection request operation, consumer context, create CM object */
+static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
 {
        int len, ret;
        struct mcm_qp *m_qp = NULL;
@@ -2673,7 +2687,7 @@ static int mix_cm_req_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
 
        /* hdr already read, get operation data */
        len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
-       ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+       ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
        if (ret != len) {
                mlog(0, " ERR: ret %d, exp %d\n", ret, len);
                return ret;
@@ -2739,14 +2753,14 @@ resp:
 }
 
 /* disconnect request out */
-static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
 {
        int len, ret;
        struct mcm_cm *m_cm;
 
        /* hdr already read, get operation data */
        len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
-       ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+       ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
        if (ret != len) {
                mlog(0, " ERR: ret %d, exp %d\n", ret, len);
                return ret;
@@ -2767,14 +2781,14 @@ static int mix_cm_disc_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
 }
 
 /* Active, reply received, send RTU, unsolicited channel */
-static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rtu_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
 {
        int len, ret;
        struct mcm_cm *m_cm;
 
        /* hdr already read, get operation data */
        len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
-       ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+       ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
        if (ret != len) {
                mlog(0, " ERR: ret %d, exp %d\n", ret, len);
                return ret;
@@ -2935,14 +2949,14 @@ static int mix_cm_rtu_in(mcm_cm_t *m_cm, dat_mcm_msg_t *pkt, int pkt_len)
 }
 
 /* PASSIVE, accept connect request from client, cr_reply */
-static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
 {
        int len, ret;
        struct mcm_cm *m_cm;
 
        /* hdr already read, get operation data */
        len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
-       ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+       ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
        if (ret != len) {
                mlog(0, " ERR: ret %d, exp %d\n", ret, len);
                return ret;
@@ -3024,14 +3038,14 @@ static int mix_cm_rep_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
 }
 
 /* PASSIVE, user reject from MIX client */
-static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg)
+static int mix_cm_rej_out(mcm_scif_dev_t *smd, dat_mix_cm_t *pmsg, scif_epd_t scif_ep)
 {
        int len, ret;
        struct mcm_cm *m_cm;
 
        /* hdr already read, get operation data */
        len = sizeof(dat_mix_cm_t) - sizeof(dat_mix_hdr_t);
-       ret = scif_recv(smd->scif_op_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
+       ret = scif_recv(scif_ep, ((char*)pmsg + sizeof(dat_mix_hdr_t)), len, SCIF_RECV_BLOCK);
        if (ret != len) {
                mlog(0, " ERR: ret %d, exp %d\n", ret, len);
                return ret;
@@ -3651,21 +3665,25 @@ bail:
        return ret;
 }
 
+static int last_op;
+
 /* receive data on connected SCIF endpoints, operation and unsolicited channels,  */
 static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep)
 {
-       char cmd[DAT_MIX_MSG_MAX];
-       dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)cmd;
+       dat_mix_hdr_t *phdr = (dat_mix_hdr_t *)smd->cmd_buf;
        int ret, len;
 
        len = sizeof(*phdr);
        mlog(1, " -> scif_ep %d len %d ...\n", scif_ep, len);
        ret = scif_recv(scif_ep, phdr, len, SCIF_RECV_BLOCK);
        if ((ret != len) || (phdr->ver != DAT_MIX_VER)) {
-               mlog(0, " ERR: rcv on scif_ep %d, ret %d, exp %d, VER=%d\n",
-                    scif_ep, ret, len, phdr->ver);
-               return -1;
+               mlog(0, " ERR: smd %p ep %d ret %d exp %d ver %d op %d flgs %d last_op %s\n",
+                    smd, scif_ep, ret, len, phdr->ver, phdr->op,
+                    phdr->flags, mix_op_str(last_op));
+               return POLLERR; /* fatal, close device */
        }
+       last_op = phdr->op;
+
        mlog(1, " <- %d bytes: ver %d, op %d, flags %d scif_ep %d\n",
             len, phdr->ver, phdr->op, phdr->flags, scif_ep);
 
@@ -3703,26 +3721,26 @@ static int mix_scif_recv(mcm_scif_dev_t *smd, scif_epd_t scif_ep)
                ret = mix_listen_free(smd, phdr);
                break;
        case MIX_CM_REQ:
-               ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr);
+               ret = mix_cm_req_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
                break;
        case MIX_CM_REP:
        case MIX_CM_ACCEPT:
-               ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr);
+               ret = mix_cm_rep_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
                break;
        case MIX_CM_REJECT:
-               ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr);
+               ret = mix_cm_rej_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
                break;
        case MIX_CM_RTU:
        case MIX_CM_EST:
-               ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr);
+               ret = mix_cm_rtu_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
                break;
        case MIX_CM_DISC:
-               ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr);
+               ret = mix_cm_disc_out(smd, (dat_mix_cm_t *)phdr, scif_ep);
                break;
        case MIX_CM_DREP:
        default:
-               mlog(0, " ERROR!!! unknown MIX operation: %d\n", phdr->op);
-               return -1;
+               mlog(0, " ERR: SMD %p unknown msg->op: %d, close device\n", smd, phdr->op);
+               return POLLERR; /* fatal, close device */
        }
        MCNTR(smd->md, MCM_SCIF_RECV);
        return ret;
@@ -4039,9 +4057,9 @@ static int mcm_cm_rep_out(mcm_cm_t *cm)
        }
 
        if (cm->retries == cm->md->retries) {
-               mlog(0, " CM_REPLY: RETRIES EXHAUSTED (lid port qpn)"
+               mlog(0, " CM_REPLY: RETRIES (%d) EXHAUSTED (lid port qpn)"
                         " %x %x %x -> %x %x %x\n",
-                        htons(cm->msg.saddr.lid),
+                        cm->retries, htons(cm->msg.saddr.lid),
                         htons(cm->msg.sport),
                         htonl(cm->msg.saddr.qpn),
                         htons(cm->msg.daddr.lid),
@@ -4176,8 +4194,55 @@ static void mcm_process_recv(mcm_ib_dev_t *md, dat_mcm_msg_t *msg, mcm_cm_t *cm,
        }
 }
 
+
+static void mcm_dump_cm_lists(mcm_scif_dev_t *smd)
+{
+       mcm_cm_t *cm = NULL, *next;
+       LLIST_ENTRY *list;
+       pthread_mutex_t *lock;
+
+       mlog(0, " SMD %p : \n");
+
+       /* listen list*/
+       list = &smd->llist;
+       lock = &smd->llock;
+       pthread_mutex_lock(lock);
+       next = get_head_entry(list);
+       while (next) {
+               cm = next;
+               next = get_next_entry(&cm->entry, list);
+
+               mlog(0,  " CM_LIST %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n",
+                       mcm_state_str(cm->state),
+                       ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+                       ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id),
+                       ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+                       ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id));
+       }
+       pthread_mutex_unlock(lock);
+
+       /* conn list */
+       list = &smd->clist;
+       lock = &smd->clock;
+       pthread_mutex_lock(lock);
+       next = get_head_entry(list);
+       while (next) {
+               cm = next;
+               next = get_next_entry(&cm->entry, list);
+
+               mlog(0,  " CM_CONN %s [lid, port, cqp, iqp, pid]: SRC %x %x %x %x %x DST %x %x %x %x %x\n",
+                       mcm_state_str(cm->state),
+                       ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
+                       ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn), ntohl(cm->msg.s_id),
+                       ntohs(cm->msg.daddr.lid), ntohs(cm->msg.dport),
+                       ntohl(cm->msg.dqpn), ntohl(cm->msg.daddr.qpn), ntohl(cm->msg.d_id));
+       }
+       pthread_mutex_unlock(lock);
+
+}
+
 /* Find matching CM object for this receive message, return CM reference, timer */
-mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg)
+mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg, int *dup)
 {
        mcm_cm_t *cm = NULL, *next, *found = NULL;
        LLIST_ENTRY *list;
@@ -4195,6 +4260,7 @@ mcm_cm_t *mcm_get_smd_cm(mcm_scif_dev_t *smd, dat_mcm_msg_t *msg)
        /* conn list first, duplicate requests for MCM_REQ */
        list = &smd->clist;
        lock = &smd->clock;
+       *dup = 0;
 
 retry_listenq:
        pthread_mutex_lock(lock);
@@ -4246,6 +4312,7 @@ retry_listenq:
                                         ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
                                         ntohs(cm->msg.saddr.lid), ntohs(cm->msg.sport),
                                         ntohl(cm->msg.sqpn), ntohl(cm->msg.saddr.qpn));
+                               *dup = 1;
                                MCNTR(cm->md, MCM_CM_ERR_REQ_DUP);
                                return NULL;
                        }
@@ -4261,38 +4328,6 @@ retry_listenq:
                goto retry_listenq;
        }
 
-/* don't send reject becasue it may match next smd */
-#if 0
-       /* not match on listenq for valid request, send reject */
-       if (ntohs(msg->op) == MCM_REQ && !found) {
-               mlog(1, " mcm_recv: NO LISTENER for %s %x %x i%x c%x"
-                       " < %x %x %x, sending reject\n",
-                       mcm_op_str(ntohs(msg->op)),
-                       ntohs(msg->daddr.lid), ntohs(msg->dport),
-                       ntohl(msg->daddr.qpn), ntohl(msg->dqpn),
-                       ntohs(msg->saddr.lid), ntohs(msg->sport),
-                       ntohl(msg->saddr.qpn));
-
-               mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM);
-       }
-#endif
-
-       if (!found) {
-               mlog(1,  " mcm_recv: WARNING no match - op %s [lid, port, cqp, iqp, pid]:"
-                        " %x %x %x %x %x <- %x %x %x %x l_pid %x r_pid %x\n",
-                        mcm_op_str(ntohs(msg->op)),
-                        ntohs(msg->daddr.lid), ntohs(msg->dport),
-                        ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
-                        ntohl(msg->d_id), ntohs(msg->saddr.lid),
-                        ntohs(msg->sport), ntohl(msg->sqpn),
-                        ntohl(msg->saddr.qpn), ntohl(msg->s_id),
-                        ntohl(msg->d_id));
-
-               if (ntohs(msg->op) == MCM_DREP) {
-                       MCNTR(cm->md, MCM_CM_ERR_DREP_DUP);
-               }
-       }
-
        return found;
 }
 
@@ -4301,17 +4336,54 @@ mcm_cm_t *mcm_get_cm(mcm_ib_dev_t *md, dat_mcm_msg_t *msg)
 {
        mcm_cm_t *cm = NULL;
        mcm_scif_dev_t *smd;
+       int dup;
 
        /* Walk scif device client list */
        pthread_mutex_lock(&md->slock);
        smd = get_head_entry(&md->smd_list);
        while (smd) {
-               cm = mcm_get_smd_cm(smd, msg);
+               cm = mcm_get_smd_cm(smd, msg, &dup);
                if (cm)
                        break;
                smd = get_next_entry(&smd->entry, &md->smd_list);
        }
        pthread_mutex_unlock(&md->slock);
+
+       if (!cm) {
+               mlog(0,  " no match - op %s [lid, port, cqp, iqp, pid]:"
+                        " %x %x %x %x %x <- %x %x %x %x lpid %x rpid %x\n",
+                        mcm_op_str(ntohs(msg->op)),
+                        ntohs(msg->daddr.lid), ntohs(msg->dport),
+                        ntohl(msg->dqpn), ntohl(msg->daddr.qpn),
+                        ntohl(msg->d_id), ntohs(msg->saddr.lid),
+                        ntohs(msg->sport), ntohl(msg->sqpn),
+                        ntohl(msg->saddr.qpn), ntohl(msg->s_id),
+                        ntohl(msg->d_id));
+
+               if (!dup && (ntohs(msg->op) == MCM_REQ)) {
+                       mlog(0, " mcm_recv: NO LISTENER for %s %x %x i%x c%x"
+                               " < %x %x %x, sending reject\n",
+                               mcm_op_str(ntohs(msg->op)),
+                               ntohs(msg->daddr.lid), ntohs(msg->dport),
+                               ntohl(msg->daddr.qpn), ntohl(msg->dqpn),
+                               ntohs(msg->saddr.lid), ntohs(msg->sport),
+                               ntohl(msg->saddr.qpn));
+                       mcm_cm_rej_out(smd->md, msg, MCM_REJ_CM);
+               }
+
+               if (ntohs(msg->op) == MCM_DREP) {
+                       MCNTR(cm->md, MCM_CM_ERR_DREP_DUP);
+               }
+
+               /* debug */
+               pthread_mutex_lock(&md->slock);
+               smd = get_head_entry(&md->smd_list);
+               while (smd) {
+                       mcm_dump_cm_lists(smd);
+                       smd = get_next_entry(&smd->entry, &md->smd_list);
+               }
+               pthread_mutex_unlock(&md->slock);
+       }
        return cm;
 }
 
@@ -4770,7 +4842,11 @@ void mpxy_op_thread(void *mic_client)
                                ret = mcm_poll(smd->scif_op_ep, POLLIN); /* operations */
                                if (ret == POLLIN)
                                        ret = mix_scif_recv(smd, smd->scif_op_ep);
-
+                               if (ret != POLLERR) {
+                                       ret = mcm_poll(smd->scif_ev_ep, POLLIN); /* CM, events */
+                                       if (ret == POLLIN)
+                                               ret = mix_scif_recv(smd, smd->scif_ev_ep);
+                               }
                                pthread_mutex_lock(&md->slock);
                                smd->th_ref_cnt--;
                                next = get_next_entry(&smd->entry, &md->smd_list);