]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
add extension.h update mpxyd.c
authorArlin Davis <arlin.r.davis@intel.com>
Wed, 6 Jun 2012 19:19:19 +0000 (12:19 -0700)
committerArlin Davis <arlin.r.davis@intel.com>
Wed, 6 Jun 2012 19:19:19 +0000 (12:19 -0700)
dapl/svc/mpxyd.c
daplmpxy.init.in [deleted file]
dat/include/dat2/dat_mic_extensions.h [new file with mode: 0755]

index 45bc7f2ea1c3dd900c07998d65435898fe95753e..b8f5646a47b1ef67ea3d0321a329cd23de3ffae4 100644 (file)
 #  include <config.h>
 #endif /* HAVE_CONFIG_H */
 
-#include <stdio.h>
-#include <stdarg.h>
+#include <stdlib.h>
 #include <string.h>
-#include <arpa/inet.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <byteswap.h>
+#include <pthread.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/time.h>
+#include <malloc.h>
+#include <stdarg.h>
 #include <getopt.h>
 #include <fcntl.h>
 #include <scif.h>
 #include "dat2/udat.h"
+#include "dat2/dat_mic_extensions.h"
 
 /*
- * Service options - set through mpxy_opts file.
+ * Service options - set through mpxyd.conf file.
  */
 static char *opts_file = MPXYD_CONF;
-static char log_file[128] = "/var/log/mpxyd.log";
+//static char log_file[128] = "/var/log/mpxyd.log";
+static char log_file[128] = "stdout";
 static int log_level = 0;
 static char lock_file[128] = "/var/run/mpxyd.pid";
-static short service_id = SCIF_OFED_PORT_7;
-static int buffer_pool_mb = 64;
-static int tx_depth = 100;
-static int tx_signal = 10;
-
-static FILE *lfile;
-static lock_t llock;
-
-#define mpxy_log(level, format, ...) \
+static char scif_dev[32] = "scif";
+static short scif_sport = SCIF_OFED_PORT_7;
+static scif_epd_t scif_ep;
+static struct scif_portID scif_id;
+
+/* scif-rdma cmd and data channel parameters */
+static int mix_buffer_mb = 64;
+static int mix_buffer_sg = 128 * 1024;
+static int mix_cmd_depth = 50;
+static int mix_cmd_size = 256;
+
+/* cm parameters */
+static int mcm_depth = 500;
+static int mcm_size = 256;
+static int mcm_signal = 100;
+static int mcm_retry_cnt = 10;
+static int mcm_rep_ms = 800;
+static int mcm_rtu_ms = 400;
+
+static FILE *logfile;
+static pthread_t mpxy_thread;
+static pthread_mutex_t flock;
+
+/* lists, fds, etc  */
+static struct llist_entry
+{
+    struct llist_entry *next;
+    struct llist_entry *prev;
+    void               *data;
+};
+
+#define MCM_FD_SETSIZE 1024
+struct mcm_fd_set {
+       int index;
+       struct pollfd set[MCM_FD_SETSIZE];
+};
+
+/* IB verbs device lists */
+static struct ibv_device **iblist;
+static struct llist_entry mcm_llist;
+static pthread_mutex_t mcm_llock;
+
+/* Support for IB devices - One service per device: UD QP for fabric CM services */
+static struct mcm_ib_dev {
+       DLIST_ENTRY             entry;
+       DLIST_ENTRY             mix_list; /* MIC client open instances */
+       pthread_mutex_t         mix_lock;
+       /* MCM - IB Device Resources */
+       ibv_context             *ib_dev;
+       uint16_t                port;
+       struct ibv_pd           *pd;
+       struct ibv_cq           *scq;
+       struct ibv_cq           *rcq;
+       struct ibv_qp           *qp;
+       struct ibv_mr           *mr_rbuf;
+       struct ibv_mr           *mr_sbuf;
+       ib_cm_msg_t             *sbuf;
+       ib_cm_msg_t             *rbuf;
+       struct ibv_comp_channel *rch;
+       struct ibv_ah           **ah;
+       union dat_mcm_addr      addr;
+       uint16_t                lid;
+       uint8_t                 sl;
+       uint16_t                pkey;
+       int                     pkey_idx;
+};
+
+/* per MIC MCM client open, SCIF device: TODO share message resources across clients? */
+static struct mcm_scif_dev {
+       struct list_entry       entry;
+       pthread_mutex_t         lock;
+       struct mcm_ib_dev       *mcm_dev;
+       scif_epd_t              ep;
+       struct scif_portID      peer;
+       off_t                   r_address;
+       off_t                   r_offset;
+       int                     r_len;
+       dat_mix_msg_t           *sbuf;
+       dat_mix_msg_t           *rbuf;
+};
+
+#define mlog(level, format, ...) \
        mpxy_write(level, "%s: "format, __func__, ## __VA_ARGS__)
 
 static void mpxy_write(int level, const char *format, ...)
@@ -71,25 +151,126 @@ static void mpxy_write(int level, const char *format, ...)
 
        gettimeofday(&tv, NULL);
        va_start(args, format);
-       lock_acquire(&llock);
-       fprintf(flog, "%u.%03u: ", (unsigned) tv.tv_sec, (unsigned) (tv.tv_usec / 1000));
-       vfprintf(flog, format, args);
-       fflush(flog);
-       lock_release(&llock);
+       pthread_mutex_lock(&flock);
+       fprintf(logfile, "%u.%03u: ", (unsigned) tv.tv_sec, (unsigned) (tv.tv_usec / 1000));
+       vfprintf(logfile, format, args);
+       fflush(logfile);
+       pthread_mutex_unlock(&flock);
        va_end(args);
 }
+/* link list  helper resources */
+static void init_list(struct llist_entry *head)
+{
+        head->next = head;
+        head->prev = head;
+        head->data = NULL;
+}
+
+static int list_empty(struct llist_entry *head)
+{
+        return head->next == head;
+}
+
+static void *get_head_entry(struct llist_entry *head)
+{
+       if (list_empty(head))
+               return NULL;
+       else
+               return head->data;
+}
+
+static void *get_next_entry(struct llist_entry *entry, struct lllist_entry *head)
+{
+       if (entry->next == head)
+               return NULL;
+       else
+               return entry->data;
+}
+
+static void insert_head(struct llist_entry *entry, struct llist_entry *head, void *data)
+{
+       entry->next = head->next;
+       entry->prev = head;
+       entry->data = data;
+        head->next->Prev = entry;
+        head->next = entry;
+}
+
+static void insert_tail(struct llist_entry *entry, struct llist_entry *head, void *data)
+{
+        insert_head(entry, head->prev, data);
+}
+
+static void remove_entry(struct llist_entry *entry)
+{
+        entry->prev->next = entry->Next;
+        entry->next->prev = entry->Prev;
+        entry->data = NULL;
+}
+
+/* FD helper resources */
+static struct mcm_fd_set *mcm_alloc_fd_set(void)
+{
+       return malloc(sizeof(struct mcm_fd_set));
+}
+
+static void mcm_fd_zero(struct mcm_fd_set *set)
+{
+       set->index = 0;
+}
+
+static int mcm_fd_set(int fd, struct dapl_fd_set *set,
+                      enum DAPL_FD_EVENTS event)
+{
+       if (set->index == MCM_FD_SETSIZE - 1) {
+               mlog(0," mcm exceeded FD_SETSIZE %d\n", set->index + 1);
+               return -1;
+       }
+       set->set[set->index].fd = fd;
+       set->set[set->index].revents = 0;
+       set->set[set->index++].events = event;
+       return 0;
+}
+
+static int mcm_poll(int fd, int event)
+{
+       struct pollfd fds;
+       int ret;
+
+       fds.fd = s;
+       fds.events = event;
+       fds.revents = 0;
+       ret = poll(&fds, 1, 0);
+       mlog(0, " poll: fd=%d ret=%d, event=0x%x\n", s, ret, fds.revents);
+       if (ret == 0)
+               return 0;
+       else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
+               return POLLERR;
+       else
+               return fds.revents;
+}
+
+static int mcm_select(struct mcm_fd_set *set, int time_ms)
+{
+       int ret;
+
+       mlog(1, " select: sleep, fds=%d\n", set->index);
+       ret = poll(set->set, set->index, time_ms);
+       mlog(1, " select: wakeup, ret=0x%x\n", ret);
+       return ret;
+}
 
 static FILE *mpxy_open_log(void)
 {
        FILE *f;
 
-       if (!stricmp(lfile, "stdout"))
+       if (!strcasecmp(log_file, "stdout"))
                return stdout;
 
-       if (!stricmp(lfile, "stderr"))
+       if (!strcasecmp(log_file, "stderr"))
                return stderr;
 
-       if (!(f = fopen(lfile, "w")))
+       if (!(f = fopen(log_file, "w")))
                f = stdout;
 
        return f;
@@ -111,19 +292,21 @@ static void mpxy_set_options(void)
                if (sscanf(s, "%32s%32s", opt, value) != 2)
                        continue;
 
-               if (!stricmp("log_file", opt))
+               if (!strcasecmp("log_file", opt))
                        strcpy(log_file, value);
-               else if (!stricmp("log_level", opt))
+               else if (!strcasecmp("log_level", opt))
                        log_level = atoi(value);
-               else if (!stricmp("lock_file", opt))
+               else if (!strcasecmp("lock_file", opt))
                        strcpy(lock_file, value);
-               else if (!stricmp("buffer_pool_mb", opt))
-                       buffer_pool_mb = atoi(value);
-               else if (!stricmp("service_id", opt))
-                       service_id = (short) atoi(value);
-               else if (!stricmp("tx_depth", opt))
+               else if (!strcasecmp("rdma_buffer_kb", opt))
+                       rdma_buffer_size = atoi(value);
+               else if (!strcasecmp("cm_msg_depth", opt))
+                       cm_msg_depth = atoi(value);
+               else if (!strcasecmp("scif_port_id", opt))
+                       scif_sport = (short) atoi(value);
+               else if (!strcasecmp("tx_depth", opt))
                        tx_depth = atoi(value);
-               else if (!stricmp("tx_signal", opt))
+               else if (!strcasecmp("tx_signal_rate", opt))
                        tx_signal = atoi(value);
        }
 
@@ -132,15 +315,16 @@ static void mpxy_set_options(void)
 
 static void mpxy_log_options(void)
 {
-       mpxy_log(0, "log level %d\n", log_level);
-       mpxy_log(0, "lock file %s\n", lock_file);
-       mpxy_log(0, "server_port %d\n", service_id);
-       mpxy_log(0, "rdma buffer pool size %d\n", buffer_size);
-       mpxy_log(0, "transmit queue depth %d\n", tx_depth);
-       mpxy_log(0, "transmit completion rate %d\n", tx_signal);
+       mlog(0, "log level %d\n", log_level);
+       mlog(0, "lock file %s\n", lock_file);
+       mlog(0, "SCIF server_port %d\n", scif_sport);
+       mlog(0, "rdma buffer pool size %d\n", buffer_pool_mb);
+       mlog(0, "transmit queue depth %d\n", tx_depth);
+       mlog(0, "transmit completion signal rate %d\n", tx_signal);
+       mlog(0, "uDAPL provider/device - %s\n", dapl_dev);
 }
 
-static int mpxy_open_lock(void)
+static int mpxy_open_lock_file(void)
 {
        int lock_fd;
        char pid[16];
@@ -181,17 +365,423 @@ static void daemonize(void)
        freopen("/dev/null", "w", stderr);
 }
 
+static int init_scif()
+{
+       int ret;
+
+       ret = scif_get_nodeIDs(NULL, 0, &scif_id.node);
+       if (ret < 0) {
+               mlog(1, "scif_get_nodeIDs() failed with error %d\n", errno);
+               return -1;
+       }
+       mlog(1," Got SCIF node_id: %d\n", (uint16_t)scif_id.node);
+       if (scif_id.node != 0) {
+               mlog(1,"ERROR scif node_id must be 0, get_nodeID = %d\n", (uint16_t)scif_id.node);
+               return -1;
+       }
+
+       scif_ep = scif_open();
+       if (scif_ep < 0) {
+               mlog(0, "scif_open() failed with error %d\n", errno);
+               return -1;
+       }
+       mlog(1,"Opened SCIF endpoint for listening\n");
+
+       ret = scif_bind(scif_ep, scif_sport);
+       if (ret < 0) {
+               fprintf(stderr, "scif_bind() failed with error %d\n", errno);
+               scif_close(scif_ep);
+               return -1;
+       }
+       scif_id.port = ret;
+       mlog(1,"Bind to reserved SCIF OFED port %d\n", (uint16_t)scif_id.port);
+
+       ret = scif_listen(scif_ep, 5);
+       if (ret < 0) {
+               mlog(0, "scif_listen() failed with error %d\n", errno);
+               scif_close(scif_ep);
+               return -1;
+       }
+
+       return 0;
+}
+
+static void close_scif()
+{
+       scif_close(scif_ep);
+}
+
+static int config_fd(int fd)
+{
+       int opts;
+
+       opts = fcntl(fd, F_GETFL);
+       if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+               mlog(0, " config_fd: fcntl on fd %d ERR %d %s\n", fd, opts, strerror(errno));
+               return errno;
+       }
+
+       return 0;
+}
+
+/* Modify UD-QP from init, rtr, rts, info network order */
+static int modify_ud_qp(struct mcm_dev md, struct ibv_qp qp)
+{
+       struct ibv_qp_attr qp_attr;
+
+       /* modify QP, setup and prepost buffers */
+       dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+       qp_attr.qp_state = IBV_QPS_INIT;
+        qp_attr.pkey_index = md->pkey_idx;
+        qp_attr.port_num = md->port;
+        qp_attr.qkey = DAT_UD_QKEY;
+       if (ibv_modify_qp(qp, &qp_attr,
+                         IBV_QP_STATE          |
+                         IBV_QP_PKEY_INDEX     |
+                          IBV_QP_PORT          |
+                          IBV_QP_QKEY)) {
+               mlog(0, " modify_ud_qp INIT: ERR %s\n", strerror(errno));
+               return 1;
+       }
+       dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+       qp_attr.qp_state = IBV_QPS_RTR;
+       if (ibv_modify_qp(qp, &qp_attr,IBV_QP_STATE)) {
+               mlog(0, " modify_ud_qp RTR: ERR %s\n", strerror(errno));
+               return 1;
+       }
+       dapl_os_memzero((void *)&qp_attr, sizeof(qp_attr));
+       qp_attr.qp_state = IBV_QPS_RTS;
+       qp_attr.sq_psn = 1;
+       if (ibv_modify_qp(qp, &qp_attr,
+                         IBV_QP_STATE | IBV_QP_SQ_PSN)) {
+               mlog(0, " modify_ud_qp RTS: ERR %s\n", strerror(errno));
+               return 1;
+       }
+       return 0;
+}
+
+static int init_ib()
+{
+       struct ibv_port_attr port_attr;
+       int i, num_devices;
+
+       /* get list of all IB devices, open 1st IB type by default */
+       iblist = ibv_get_device_list(&num_devices);
+       if (!iblist) {
+               mlog(0, " ibv_get_dev_list() failed - %d\n", errno);
+               return 1;
+       }
+
+       for (i=0; i < num_devices; ++i) {
+               if (iblist[i].transport_type != IBV_TRANSPORT_IB)
+                       continue;
+               else {
+                       mlog(1, " opening 1st IB device found - %s\n",
+                            ibv_get_device_name(iblist[i]));
+                       break;
+               }
+       }
+       if (i == num_devices) {
+               mlog(1, " no IB devices found, exit\n");
+               ibv_free_device_list(iblist);
+               return 1;
+       }
+
+       return 0;
+}
+
+static void close_ib()
+{
+       ibv_free_device_list(iblist);
+       return;
+}
+
+static int init_mcm_service(struct mcm_ib_dev *md)
+{
+        struct ibv_qp_init_attr qp_create;
+       struct ibv_recv_wr recv_wr, *recv_err;
+        struct ibv_sge sge;
+       int i, mlen = 256; /* overhead for mcm_msg & ibv_grh */
+
+       mlog(1, " create MCM services.. \n");
+
+       /* setup CM timers and queue sizes */
+       md->pd = ibv_alloc_pd(md->ibdev);
+        if (!md->pd)
+                goto bail;
+
+        mlog(1, " allocated PD\n");
+
+       md->rch = ibv_create_comp_channel(md->ibdev);
+       if (!md->rch)
+               goto bail;
+       config_fd(md->rch->fd);
+
+       mlog(1, " allocated rx completion channel\n");
+
+       md->scq = ibv_create_cq(md->ibdev, md->cqe, md, NULL, 0);
+       if (!md->scq)
+               goto bail;
+
+       md->rcq = ibv_create_cq(md->ibdev, md->cqe, md, md->rch, 0);
+       if (!md->rcq)
+               goto bail;
+
+       mlog(1, " created CQ\n");
+
+       if(ibv_req_notify_cq(md->rcq, 0))
+               goto bail;
+
+       memset((void *)&qp_create, 0, sizeof(qp_create));
+       qp_create.qp_type = IBV_QPT_UD;
+       qp_create.send_cq = md->scq;
+       qp_create.recv_cq = md->rcq;
+       qp_create.cap.max_send_wr = qp_create.cap.max_recv_wr = md->qpe;
+       qp_create.cap.max_send_sge = qp_create.cap.max_recv_sge = 1;
+       qp_create.cap.max_inline_data = md->max_inline_send;
+       qp_create.qp_context = (void *)md;
+
+       md->qp = ibv_create_qp(md->pd, &qp_create);
+       if (!md->qp)
+                goto bail;
+
+       mlog(1, " created QP\n");
+
+       md->ah = (ib_ah_handle_t*) malloc(sizeof(ib_ah_handle_t) * 0xffff);
+       md->sid = (uint8_t*) malloc(sizeof(uint8_t) * 0xffff);
+       md->rbuf = (void*) malloc(mlen * md->qpe);
+       md->sbuf = (void*) malloc(mlen * md->qpe);
+       md->s_hd = md->s_tl = 0;
+
+       if (!md->ah || !md->rbuf || !md->sbuf || !md->sid)
+               goto bail;
+
+       (void)memset(md->ah, 0, (sizeof(ib_ah_handle_t) * 0xffff));
+       (void)memset(md->sid, 0, (sizeof(uint8_t) * 0xffff));
+       md->sid[0] = 1; /* resv slot 0, 0 == no ports available */
+       (void)memset(md->rbuf, 0, (mlen * md->qpe));
+       (void)memset(md->sbuf, 0, (mlen * md->qpe));
+
+       md->mr_sbuf = ibv_reg_mr(md->pd, md->sbuf,
+                                (mlen * md->qpe),
+                                IBV_ACCESS_LOCAL_WRITE);
+       if (!md->mr_sbuf)
+               goto bail;
+
+       md->mr_rbuf = ibv_reg_mr(md->pd, md->rbuf,
+                                ((mlen + hlen) * md->qpe),
+                                IBV_ACCESS_LOCAL_WRITE);
+       if (!md->mr_rbuf)
+               goto bail;
+
+       mlog(1, " registered TX and RX CM message pools \n");
+
+       /* modify UD QP: init, rtr, rts */
+       if ((modify_ud_qp(md, md->qp)) != DAT_SUCCESS)
+               goto bail;
+
+       /* post receive buffers, setup head, tail pointers */
+       recv_wr.next = NULL;
+       recv_wr.sg_list = &sge;
+       recv_wr.num_sge = 1;
+       sge.length = mlen + hlen;
+       sge.lkey = md->mr_rbuf->lkey;
+
+       for (i = 0; i < md->qpe; i++) {
+               recv_wr.wr_id =
+                       (uintptr_t)((char *)&md->rbuf[i] +
+                                   sizeof(struct ibv_grh));
+               sge.addr = (uintptr_t) &md->rbuf[i];
+               if (ibv_post_recv(md->qp, &recv_wr, &recv_err))
+                       goto bail;
+       }
+
+       /* save qp_num as part of ia_address, network order */
+       md->addr.ib.qpn = htonl(md->qp->qp_num);
+        return 0;
+
+bail:
+       mlog(1, " ERR: MCM UD-CM services: %s\n",strerror(errno));
+       mcm_destroy(md);
+       return -1;
+}
+
+/****************  MIX operations ********************************/
+
+/* open MCM device, MIC clients via SCIF well known port - SCIF_OFED_PORT_7 */
+static struct scif_mic_dev *open_mcm_device(char *name, int port, scif_epd_t listen_ep)
+{
+       int i;
+       struct mcm_ib_dev *md;
+       struct mcm_scif_dev *smd = NULL;
+
+       pthread_mutex_lock(&mcm_llock);
+       md = get_list_head(&mcm_llist);
+
+       while (md) {
+               if ((!strcmp(ibv_get_device_name(md->ib_dev, name))
+                   && md->port == port))
+                       goto found;
+               else
+                       md = get_next_entry(&md->entry, &mcm_list);
+       }
+
+       /* no IB device object, allocate and init, one per IB device */
+       md = malloc(sizeof(*md));
+       if (md == NULL)
+               goto done;
+       memset(md, 0, sizeof(*md));
+       init_list(&md->list);
+
+       if (init_mcm_service(md)) {
+               free(md);
+               md = NULL;
+               goto done;
+       }
+       /* queue on active device list */
+       insert_tail(&md->list, &mcm_llist, md);
+
+found:
+       /* SCIF MIX device object, allocate and init, one per MIC client */
+       smd = malloc(sizeof(*smd));
+       if (!smd)
+               goto done;
+       memset(smd, 0, sizeof(*smd));
+
+       /* Accept new MIX message connection */
+       scif_accept(listen_ep, &smd->peer, &smd->ep, SCIF_ACCEPT_SYNC);
+
+
+       smd->mcm_dev = md;
+       pthread_mutex_init(&smd->lock, NULL);
+       init_list(&smd->entry);
+
+       /* insert on active MIX device list */
+       pthread_mutex_lock(&md->mix_lock);
+       insert_tail(&smd->entry, &md->mix_list, (void *)smd);
+       pthread_mutex_unlock(&md->mix_lock);
+
+done:
+       pthread_mutex_unlock(&mcm_llock);
+       return smd;
+}
+
+/* close MCM device, MIC clients via MIX */
+static void close_mcm_device(struct mcm_dev *mdev)
+{
+       return;
+}
+
+/*  DAPL MCM message */
+static void mcm_rcv_evd(struct mcm_ib_dev *md)
+{
+       return;
+}
+/*  IB async device event */
+static void mcm_async_evd(struct mcm_ib_dev *md)
+{
+       return;
+}
+/* SCIF MIX message */
+static void mix_rcv_evd(struct mcm_scif_dev *md)
+{
+       return;
+}
+
+/*
+ * MPXY server will listen on both a IB UD QP for fabric CM messages
+ * and a SCIF port for inter-bus MCM operation messages to/from MIC MCM clients.
+ *
+ * MPXY message protocol is very similar to existing DCM but not compatible
+ * therefore we are setting version number to 1 for MCM protocol so it will
+ * not connect incompatible DAPL endpoints by mistake.
+ *
+ * 1st draft - one IB UD QP per device, multiplex multiple MIC opens to same device
+ *             one thread for both SCIF and IB traffic, try FD select for now
+ *             and move to polling memory if we can't get pipelining at wire speeds.
+ *
+ *             TBD: need another thread dedicated as a data mover
+ *
+ */
+static void mpxy_server(void)
+{
+       struct mcm_fd_set *set;
+       struct mcm_ib_dev *md;
+       struct mcm_scif_dev *smd;
+       int time_ms;
+       int i, n, ret;
+
+       /* FD array */
+       set = mcm_alloc_fd_set();
+       if (!set)
+               goto out;
+
+       mlog(0, "server started\n");
+
+       while (1) {
+               time_ms = -1; /* blocking */
+               mcm_fd_zero(set);
+
+               /* trigger on all active IB devices */
+               pthread_mutex_lock(&mcm_llock);
+               md = get_list_head(&mcm_llist);
+               while (md) {
+                       mcm_fd_set(md->ib_dev->async_fd, set, POLLIN);
+                       mcm_fd_set(md->rch->fd, set, POLLIN);
+
+                       /* trigger on all active SCIF ep's */
+                       pthread_mutex_lock(&md->mix_lock);
+                       smd = get_list_head(&md->mix_list);
+                       while (smd) {
+                               mcm_fd_set(smd->ep, set, POLLIN);
+                               smd = get_next_entry(&smd->entry, &md->mix_list);
+                       }
+                       pthread_mutex_unlock(&md->mix_lock);
+                       md = get_next_entry(&md->entry, &mcm_list);
+               }
+               pthread_mutex_unlock(&mcm_llock);
+
+               mcm_select(set, time_ms); /* wait, DAPL MCM or SCIF MIX msgs */
+
+               pthread_mutex_lock(&mcm_llock);
+               md = get_list_head(&mcm_llist);
+               while (md) {
+                       /* process MCM events: async device and CM msgs */
+                       if (mcm_poll(md->rch->fd, POLLIN) == POLLIN)
+                               mcm_rcv_evd(md);
+
+                       if (mcm_poll(md->ib_dev->async_fd, POLLIN) == POLLIN)
+                               mcm_async_evd(md);
+
+                       /* process SCIF cmd channels */
+                       pthread_mutex_lock(&md->mix_lock);
+                       smd = get_list_head(&md->mix_list);
+                       while (smd) {
+                               if (mcm_poll(smd->ep, POLLIN) == POLLIN)
+                                       mix_rcv_evd(smd);
+
+                               smd = get_next_entry(&smd->entry, &md->mix_list);
+                       }
+                       pthread_mutex_unlock(&md->mix_lock);
+                       md = get_next_entry(&md->entry, &mcm_list);
+               }
+               pthread_mutex_unlock(&mcm_llock);
+       }
+
+
+}
+
 static void show_usage(char *program)
 {
         printf("usage: %s\n", program);
         printf("   [-P]             - run as a standard process, (default daemon)\n");
         printf("   [-O option_file] - option configuration file\n");
-        printf("                      (default %s/%s\n", MPXYD_CONF_DIR, MPXYD_OPTS_FILE);
+        printf("                      (default %s\n", MPXYD_CONF);
 }
 
 int main(int argc, char **argv)
 {
-       int i, op, daemon = 1;
+       int op, daemon = 1;
 
        while ((op = getopt(argc, argv, "DPO:")) != -1) {
                switch (op) {
@@ -212,25 +802,40 @@ int main(int argc, char **argv)
                daemonize();
 
        mpxy_set_options();
-       if (pxd_open_lock_file())
+
+       if (mpxy_open_lock_file())
                return -1;
 
-       lock_init(&log_lock);
-       logfile = pxd_open_log();
+       /* init locks */
+       pthread_mutex_init(&flock, NULL);
+       pthread_mutex_init(&mcm_llock, NULL);
+
+       /* init MCM device list */
+       init_list(&mcm_llist);
 
-       mpxy_log(0, "MIC Indirect - SCIF to DAPL RDMA Proxy Service\n");
+       logfile = mpxy_open_log();
+
+       mlog(0, "MIC Indirect - SCIF/IB DAPL RDMA Proxy Service\n");
        mpxy_log_options();
 
-       if (mpxy_open_devices()) {
-               acm_log(0, "ERROR - unable to open any devices\n");
+       if (init_scif()) {
+               mlog(0, "ERROR - unable to open/init SCIF device\n");
+               return -1;
+       }
+
+       if (init_ib()) {
+               mlog(0, "ERROR - unable to open/init IB device\n");
                return -1;
        }
 
-       mpxy_log(1, "starting server\n");
+       mlog(1, "starting server\n");
        mpxy_server();
+       mlog(0, "shutting down\n");
 
+       close_scif();
+       close_ib();
 
-       mpxy_log(0, "shutting down\n");
+       mlog(0, "shutdown complete\n");
        fclose(logfile);
        return 0;
 }
diff --git a/daplmpxy.init.in b/daplmpxy.init.in
deleted file mode 100644 (file)
index 3c68f1e..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/bin/bash
-#
-# Bring up/down the mpxyd daemon
-#
-# chkconfig: 2345 25 75
-# description: Starts/Stops MIC SCIF/DAPL RDMA proxy server
-#
-### BEGIN INIT INFO
-# Provides:       mpxyd
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Required-Start: 
-# Required-Stop: 
-# Should-Start:
-# Should-Stop:
-# Short-Description: Starts and stops the MIC SCIF/DAPL RDMA proxy server
-# Description: The DAPL proxy RDMA server provides a user space implementation
-#      that enables MIC clients to proxy InfiniBand RDMA over to large core resources
-### END INIT INFO
-
-pidfile=/var/run/mpxyd.pid
-subsys=/var/lock/subsys/mpxyd
-
-. /etc/rc.d/init.d/functions
-
-start()
-{
-    echo -n "Starting mpxyd daemon:"
-
-    daemon @prefix@/sbin/mpxyd
-    RC=$?
-    [ $RC -eq 0 ] && touch $subsys
-    echo
-    return $RC
-}
-
-stop()
-{
-    echo -n "Stopping mpxyd daemon:"
-
-    killproc -p $pidfile mpxyd
-    RC=$?
-    rm -f $subsys
-    echo
-    return $RC
-}
-
-status()
-{
-    if [ ! -f $subsys -a ! -f $pidfile ]; then
-       return 3
-    fi
-    if [ -f $pidfile ]; then
-       checkpid `cat $pidfile`
-       return $?
-    fi
-    if [ -f $subsys ]; then
-       return 2
-    fi
-}
-
-restart ()
-{
-    stop
-    start
-}
-
-condrestart ()
-{
-    [ -e $subsys ] && restart || return 0
-}
-
-usage ()
-{
-    echo
-    echo "Usage: `basename $0` {start|stop|restart|condrestart|try-restart|force-reload|status}"
-    echo
-    return 2
-}
-
-case $1 in
-    start|stop|restart|condrestart|try-restart|force-reload)
-       [ `id -u` != "0" ] && exit 4 ;;
-esac
-
-case $1 in
-    start) start; RC=$? ;;
-    stop) stop; RC=$? ;;
-    restart) restart; RC=$? ;;
-    reload) RC=3 ;;
-    condrestart) condrestart; RC=$? ;;
-    try-restart) condrestart; RC=$? ;;
-    force-reload) condrestart; RC=$? ;;
-    status) status; RC=$? ;;
-    *) usage; RC=$? ;;
-esac
-
-exit $RC
diff --git a/dat/include/dat2/dat_mic_extensions.h b/dat/include/dat2/dat_mic_extensions.h
new file mode 100755 (executable)
index 0000000..ad8ffec
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2012 Intel Corporation.  All rights reserved.
+ * 
+ * This Software is licensed under one of the following licenses:
+ * 
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is
+ *    in the file LICENSE.txt in the root directory. The license is also
+ *    available from the Open Source Initiative, see 
+ *    http://www.opensource.org/licenses/cpl.php.
+ * 
+ * 2) under the terms of the "The BSD License" a copy of which is in the file
+ *    LICENSE2.txt in the root directory. The license is also available from
+ *    the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/bsd-license.php.
+ * 
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
+ *    copy of which is in the file LICENSE3.txt in the root directory. The
+ *    license is also available from the Open Source Initiative, see
+ *    http://www.opensource.org/licenses/gpl-license.php.
+ * 
+ * Licensee has the right to choose one of the above licenses.
+ * 
+ * Redistributions of source code must retain the above copyright
+ * notice and one of the license notices.
+ * 
+ * Redistributions in binary form must reproduce both the above copyright
+ * notice, one of the license notices in the documentation
+ * and/or other materials provided with the distribution.
+ */
+
+/**********************************************************************
+ *
+ * HEADER: dat_mic_extensions.h
+ *
+ * PURPOSE: extensions to the DAT API for MIC Proxy RDMA services
+ *
+ *
+ **********************************************************************/
+#ifndef _DAT_MIC_EXTENSIONS_H_
+#define _DAT_MIC_EXTENSIONS_H_
+
+#define DAT_MIC_EXTENSION_VERSION      1
+#define DAT_MIC_ATTR_MIC               "DAT_MIC_SUPPORT"
+
+/* Wire protocol version for MIC Indirect Exchange (MIX) protocol over SCIF */
+#define DAT_MIX_VER 1
+
+typedef enum _dat_mix_ops
+{
+       DAT_MIX_IA_OPEN = 1,
+       DAT_MIX_IA_CLOSE,
+       DAT_MIX_IA_QUERY,
+       DAT_MIX_IA_MR,
+       DAT_MIX_EP_CREATE,
+       DAT_MIX_EP_QUERY,
+       DAT_MIX_WRITE,
+       DAT_MIX_SEND,
+       DAT_MIX_READ,
+       DAT_MIX_LISTEN,
+       DAT_MIX_CM_REQ,
+       DAT_MIX_CM_REP,
+       DAT_MIX_CM_ACCEPT,
+       DAT_MIX_CM_REJECT,
+       DAT_MIX_CM_RTU,
+       DAT_MIX_CM_EST,
+       DAT_MIX_CM_DISC,
+       DAT_MIX_CM_REPLY,
+} dat_mix_ops_t;
+
+/* MIC Indirect CM (MCM) protocol over IB fabric */
+#define DAT_MCM_PDATA_SIZE      64
+union dat_mcm_addr {
+       DAT_SOCK_ADDR6          so;
+       struct {
+               uint16_t        family;  /* sin6_family */
+               uint16_t        lid;     /* sin6_port */
+               uint32_t        qpn;     /* sin6_flowinfo */
+               uint8_t         gid[16]; /* sin6_addr */
+               uint16_t        port;    /* sin6_scope_id */
+               uint8_t         sl;
+               uint8_t         qp_type;
+       } ib;
+};
+
+/* MCM message, 208 bytes */
+typedef struct _dat_mcm_msg
+{
+       uint16_t                ver;
+       uint16_t                op;
+       uint16_t                sport; /* src cm port */
+       uint16_t                dport; /* dst cm port */
+       uint32_t                sqpn;  /* src cm qpn */
+       uint32_t                dqpn;  /* dst cm qpn */
+       uint16_t                p_size;
+       uint32_t                s_id;  /* src pid */
+       uint32_t                d_id;  /* dst pid */
+       uint8_t                 rd_in; /* atomic_rd_in */
+       uint8_t                 resv[5];
+       union dat_mcm_addr      saddr;
+       union dat_mcm_addr      daddr;
+       union dat_mcm_addr      saddr_alt;
+       union dat_mcm_addr      daddr_alt;
+       uint8_t                 p_data[DAT_MCM_PDATA_SIZE];
+
+} dat_mcm_msg_t;
+
+typedef struct _dat_mix_open_op {
+
+};
+
+/* MIX message, 256 bytes */
+typedef struct _dat_mix_msg
+{
+       uint16_t                ver;            /* version */
+       uint16_t                op;             /* operation type */
+       uint32_t                len;            /* operation data length */
+       uint64_t                hdl;            /* handle */
+       uint64_t                ctx;            /* context */
+       uint8_t                 data[232];      /* operation data */
+       
+} dat_mix_msg_t;
+
+#endif         /* _DAT_MIC_EXTENSIONS_H_ */