]> git.openfabrics.org - ~ardavis/dapl.git/commitdiff
Modify the openib_scm provider to support both OFED and WinOF releases.
authorSean Hefty <sean.hefty@intel.com>
Tue, 17 Feb 2009 15:24:27 +0000 (07:24 -0800)
committerArlin Davis <arlin.r.davis@intel.com>
Tue, 17 Feb 2009 15:24:27 +0000 (07:24 -0800)
This takes advantage of having a libibverbs compatibility library.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
Makefile.am
dapl/openib_scm/README [deleted file]
dapl/openib_scm/dapl_ib_cm.c
dapl/openib_scm/dapl_ib_cq.c
dapl/openib_scm/dapl_ib_dto.h
dapl/openib_scm/dapl_ib_mem.c
dapl/openib_scm/dapl_ib_util.c
dapl/openib_scm/dapl_ib_util.h
dapl/openib_scm/linux/openib_osd.h [new file with mode: 0644]
dapl/openib_scm/windows/openib_osd.h [new file with mode: 0644]
dapl/udapl/linux/dapl_osd.h

index bfc93f72562cecdcb0bcf25f3bb7dcb3c3f3d5b8..5044e36cbb94874d5fab7a6bc110c016798bafe6 100755 (executable)
@@ -49,7 +49,8 @@ dapl_udapl_libdaploscm_la_CFLAGS = $(AM_CFLAGS) -D_GNU_SOURCE $(OSFLAGS) $(XFLAG
                                 -DOPENIB -DCQ_WAIT_OBJECT \
                                 -I$(srcdir)/dat/include/ -I$(srcdir)/dapl/include/ \
                                 -I$(srcdir)/dapl/common -I$(srcdir)/dapl/udapl/linux \
-                                -I$(srcdir)/dapl/openib_scm
+                                -I$(srcdir)/dapl/openib_scm \
+                               -I$(srcdir)/dapl/openib_scm/linux
 
 if HAVE_LD_VERSION_SCRIPT
     dat_version_script = -Wl,--version-script=$(srcdir)/dat/udat/libdat2.map
diff --git a/dapl/openib_scm/README b/dapl/openib_scm/README
deleted file mode 100644 (file)
index 239dfe6..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-
-OpenIB uDAPL provider using socket-based CM, in leiu of uCM/uAT, to setup QP/channels.
-
-to build:
-
-cd dapl/udapl
-make VERBS=openib_scm clean
-make VERBS=openib_scm
-
-
-Modifications to common code:
-
-- added dapl/openib_scm directory 
-
-       dapl/udapl/Makefile
-
-New files for openib_scm provider
-
-       dapl/openib/dapl_ib_cq.c
-       dapl/openib/dapl_ib_dto.h
-       dapl/openib/dapl_ib_mem.c
-       dapl/openib/dapl_ib_qp.c
-       dapl/openib/dapl_ib_util.c
-       dapl/openib/dapl_ib_util.h
-       dapl/openib/dapl_ib_cm.c
-
-A simple dapl test just for openib_scm testing...
-
-       test/dtest/dtest.c
-       test/dtest/makefile
-
-       server: dtest -s 
-       client: dtest -h hostname
-
-known issues:
-
-       no memory windows support in ibverbs, dat_create_rmr fails.
-       
-
-
index 80a7d5eaecc9afed4418d54e3f4b9ee933033838..9a15e42282d82a100200ae416e5083205843cdb3 100644 (file)
 #include "dapl_cr_util.h"
 #include "dapl_name_service.h"
 #include "dapl_ib_util.h"
-
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
-#include <byteswap.h>
-#include <poll.h>
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-static inline uint64_t cpu_to_be64(uint64_t x) {return bswap_64(x);}
-#elif __BYTE_ORDER == __BIG_ENDIAN
-static inline uint64_t cpu_to_be64(uint64_t x) {return x;}
-#endif
+#include "dapl_osd.h"
 
 extern int g_scm_pipe[2];
 
+#if defined(_WIN32) || defined(_WIN64)
+enum DAPL_FD_EVENTS {
+       DAPL_FD_READ    = 0x1,
+       DAPL_FD_WRITE   = 0x2,
+       DAPL_FD_ERROR   = 0x4
+};
+
+static int dapl_config_socket(DAPL_SOCKET s)
+{
+       unsigned long nonblocking = 1;
+       return ioctlsocket(s, FIONBIO, &nonblocking);
+}
+
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr, 
+                              int addrlen)
+{
+       int err;
+
+       connect(s, addr, addrlen);
+       err = WSAGetLastError();
+       return (err == WSAEWOULDBLOCK) ? EAGAIN : err;
+}
+
+struct dapl_fd_set {
+       struct fd_set set[3];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+       return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+       FD_ZERO(&set->set[0]);
+       FD_ZERO(&set->set[1]);
+       FD_ZERO(&set->set[2]);
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+                       enum DAPL_FD_EVENTS event)
+{
+       FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);
+       FD_SET(s, &set->set[2]);
+       return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+       struct fd_set rw_fds;
+       struct fd_set err_fds;
+       struct timeval tv;
+       int ret;
+
+       FD_ZERO(&rw_fds);
+       FD_ZERO(&err_fds);
+       FD_SET(s, &rw_fds);
+       FD_SET(s, &err_fds);
+
+       tv.tv_sec = 0;
+       tv.tv_usec = 0;
+
+       if (event == DAPL_FD_READ)
+               ret = select(1, &rw_fds, NULL, &err_fds, &tv);
+       else
+               ret = select(1, NULL, &rw_fds, &err_fds, &tv);
+
+       if (ret == 0)
+               return 0;
+       else if (FD_ISSET(s, &rw_fds))
+               return event;
+       else if (FD_ISSET(s, &err_fds))
+               return DAPL_FD_ERROR;
+       else
+               return WSAGetLastError();
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+       return select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
+}
+#else // _WIN32 || _WIN64
+enum DAPL_FD_EVENTS {
+       DAPL_FD_READ    = POLLIN,
+       DAPL_FD_WRITE   = POLLOUT,
+       DAPL_FD_ERROR   = POLLERR
+};
+
+static int dapl_config_socket(DAPL_SOCKET s)
+{
+       int ret;
+
+       ret = fcntl(s, F_GETFL); 
+       if (ret >= 0)
+               ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);
+       return ret;
+}
+
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr, int addrlen)
+{
+       int ret;
+
+       ret = connect(s, addr, addrlen);
+
+       return (errno == EINPROGRESS) ? EAGAIN : ret;
+}
+
+struct dapl_fd_set {
+       int index;
+       struct pollfd set[DAPL_FD_SETSIZE];
+};
+
+static struct dapl_fd_set *dapl_alloc_fd_set(void)
+{
+       return dapl_os_alloc(sizeof(struct dapl_fd_set));
+}
+
+static void dapl_fd_zero(struct dapl_fd_set *set)
+{
+       set->index = 0;
+}
+
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
+                       enum DAPL_FD_EVENTS event)
+{
+       if (set->index == DAPL_FD_SETSIZE - 1) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n", 
+                        set->index + 1);
+               return -1;
+       }
+
+       set->set[set->index].fd = s;
+       set->set[set->index].revents = 0;
+       set->set[set->index++].events = event;
+       return 0;
+}
+
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
+{
+       struct pollfd fds;
+       int ret;
+
+       fds.fd = s;
+       fds.events = event;
+       fds.revents = 0;
+       ret = poll(&fds, 1, 0);
+       if (ret <= 0)
+               return ret;
+
+       return fds.revents;
+}
+
+static int dapl_select(struct dapl_fd_set *set)
+{
+       return poll(set->set, set->index, -1);
+}
+#endif
+
 static struct ib_cm_handle *dapli_cm_create(void)
 { 
        struct ib_cm_handle *cm_ptr;
@@ -85,7 +228,7 @@ static struct ib_cm_handle *dapli_cm_create(void)
 
        (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
        cm_ptr->dst.ver = htons(DSCM_VER);
-       cm_ptr->socket = -1;
+       cm_ptr->socket = DAPL_INVALID_SOCKET;
        return cm_ptr;
 bail:
        dapl_os_free(cm_ptr, sizeof(*cm_ptr));
@@ -100,8 +243,8 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
        
        /* cleanup, never made it to work queue */
        if (cm_ptr->state == SCM_INIT) {
-               if (cm_ptr->socket >= 0)  
-                       close(cm_ptr->socket);
+               if (cm_ptr->socket != DAPL_INVALID_SOCKET)  
+                       closesocket(cm_ptr->socket);
                dapl_os_free(cm_ptr, sizeof(*cm_ptr));
                return;
        }
@@ -112,9 +255,9 @@ static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr)
                cm_ptr->ep->cm_handle = IB_INVALID_HANDLE;
 
        /* close socket if still active */
-       if (cm_ptr->socket >= 0) {
-               close(cm_ptr->socket);
-               cm_ptr->socket = -1;
+       if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
+               closesocket(cm_ptr->socket);
+               cm_ptr->socket = DAPL_INVALID_SOCKET;
        }
        dapl_os_unlock(&cm_ptr->lock);
 
@@ -172,14 +315,14 @@ dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
                return DAT_SUCCESS;
        } else {
                /* send disc date, close socket, schedule destroy */
-               if (cm_ptr->socket >= 0) { 
-                       if (write(cm_ptr->socket, 
-                                 &disc_data, sizeof(disc_data)) == -1)
+               if (cm_ptr->socket != DAPL_INVALID_SOCKET) { 
+                       if (send(cm_ptr->socket, (char *) &disc_data,
+                                       sizeof(disc_data), 0) == -1)
                                dapl_log(DAPL_DBG_TYPE_WARN, 
                                         " cm_disc: write error = %s\n", 
                                         strerror(errno));
-                       close(cm_ptr->socket);
-                       cm_ptr->socket = -1;
+                       closesocket(cm_ptr->socket);
+                       cm_ptr->socket = DAPL_INVALID_SOCKET;
                }
                cm_ptr->state = SCM_DISCONNECTED;
        }
@@ -211,7 +354,7 @@ void
 dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
 {
        int             len, opt = 1;
-       struct iovec    iovec[2];
+       struct iovec iov[2];
        struct dapl_ep  *ep_ptr = cm_ptr->ep;
 
        if (err) {
@@ -226,18 +369,21 @@ dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
                     " socket connected, write QP and private data\n"); 
 
        /* no delay for small packets */
-       setsockopt(cm_ptr->socket,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
+       setsockopt(cm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
+               (char *) &opt, sizeof(opt));
 
        /* send qp info and pdata to remote peer */
-       iovec[0].iov_base = &cm_ptr->dst;
-       iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+       iov[0].iov_base = (void *) &cm_ptr->dst;
+       iov[0].iov_len = sizeof(ib_qp_cm_t);
        if (cm_ptr->dst.p_size) {
-               iovec[1].iov_base = cm_ptr->p_data;
-               iovec[1].iov_len  = ntohl(cm_ptr->dst.p_size);
+               iov[1].iov_base = cm_ptr->p_data;
+               iov[1].iov_len = ntohl(cm_ptr->dst.p_size);
+               len = writev(cm_ptr->socket, iov, 2);
+       } else {
+               len = writev(cm_ptr->socket, iov, 1);
        }
 
-       len = writev(cm_ptr->socket, iovec, (cm_ptr->dst.p_size ? 2:1));
-       if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
+       if (len != (ntohl(cm_ptr->dst.p_size) + sizeof(ib_qp_cm_t))) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                         " CONN_PENDING write: ERR %s, wcnt=%d -> %s\n",
                         strerror(errno), len,
@@ -253,9 +399,9 @@ dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
         dapl_dbg_log(DAPL_DBG_TYPE_CM,
                      " connected: sending SRC GID subnet %016llx id %016llx\n",
                      (unsigned long long) 
-                       cpu_to_be64(cm_ptr->dst.gid.global.subnet_prefix),
+                       htonll(cm_ptr->dst.gid.global.subnet_prefix),
                      (unsigned long long) 
-                       cpu_to_be64(cm_ptr->dst.gid.global.interface_id));
+                       htonll(cm_ptr->dst.gid.global.interface_id));
 
        /* queue up to work thread to avoid blocking consumer */
        cm_ptr->state = SCM_RTU_PENDING;
@@ -290,25 +436,23 @@ dapli_socket_connect(DAPL_EP              *ep_ptr,
                return DAT_INSUFFICIENT_RESOURCES;
 
        /* create, connect, sockopt, and exchange QP information */
-       if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) {
+       if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) == DAPL_INVALID_SOCKET) {
                dapl_os_free( cm_ptr, sizeof( *cm_ptr ) );
                return DAT_INSUFFICIENT_RESOURCES;
        }
 
-       /* non-blocking */
-       ret = fcntl(cm_ptr->socket, F_GETFL); 
-        if (ret < 0 || fcntl(cm_ptr->socket,
-                              F_SETFL, ret | O_NONBLOCK) < 0) {
-                dapl_log(DAPL_DBG_TYPE_ERR,
-                         " socket connect: fcntl on socket %d ERR %d %s\n",
-                         cm_ptr->socket, ret,
-                         strerror(errno));
-                goto bail;
-        }
+       ret = dapl_config_socket(cm_ptr->socket); 
+       if (ret < 0) {
+               dapl_log(DAPL_DBG_TYPE_ERR,
+                       " socket connect: config socket %d ERR %d %s\n",
+                       cm_ptr->socket, ret, strerror(errno));
+               goto bail;
+       }
 
        ((struct sockaddr_in*)r_addr)->sin_port = htons(r_qual);
-       ret = connect(cm_ptr->socket, r_addr, sizeof(*r_addr));
-       if (ret && errno != EINPROGRESS) {
+       ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *) r_addr,
+                               sizeof(*r_addr));
+       if (ret && ret != EAGAIN) {
                dapl_log(DAPL_DBG_TYPE_ERR,
                         " socket connect ERROR: %s -> %s r_qual %d\n",
                         strerror(errno), 
@@ -391,16 +535,13 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t        cm_ptr)
 {
        DAPL_EP         *ep_ptr = cm_ptr->ep;
        int             len;
-       struct iovec    iovec[2];
        short           rtu_data = htons(0x0E0F);
        ib_cm_events_t  event = IB_CME_DESTINATION_REJECT;
 
        /* read DST information into cm_ptr, overwrite SRC info */
        dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer QP data\n"); 
 
-       iovec[0].iov_base = &cm_ptr->dst;
-       iovec[0].iov_len  = sizeof(ib_qp_cm_t);
-       len = readv(cm_ptr->socket, iovec, 1);
+       len = recv(cm_ptr->socket, (char *) &cm_ptr->dst, sizeof(ib_qp_cm_t), 0);
        if (len != sizeof(ib_qp_cm_t) || ntohs(cm_ptr->dst.ver) != DSCM_VER) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                     " CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
@@ -456,9 +597,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t  cm_ptr)
        /* read private data into cm_handle if any present */
        dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read private data\n"); 
        if (cm_ptr->dst.p_size) {
-               iovec[0].iov_base = cm_ptr->p_data;
-               iovec[0].iov_len  = cm_ptr->dst.p_size;
-               len = readv(cm_ptr->socket, iovec, 1);
+               len = recv(cm_ptr->socket, cm_ptr->p_data, cm_ptr->dst.p_size, 0);
                if (len != cm_ptr->dst.p_size) {
                        dapl_log(DAPL_DBG_TYPE_ERR, 
                            " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
@@ -495,7 +634,7 @@ dapli_socket_connect_rtu(dp_ib_cm_handle_t  cm_ptr)
        dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n"); 
 
        /* complete handshake after final QP state change */
-       if (write(cm_ptr->socket, &rtu_data, sizeof(rtu_data)) == -1) {
+       if (send(cm_ptr->socket, (char *) &rtu_data, sizeof(rtu_data), 0) == -1) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                         " CONN_RTU: write error = %s\n", strerror(errno));
                goto bail;
@@ -564,7 +703,7 @@ dapli_socket_listen(DAPL_IA         *ia_ptr,
        cm_ptr->hca = ia_ptr->hca_ptr;
        
        /* bind, listen, set sockopt, accept, exchange data */
-       if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+       if ((cm_ptr->socket = socket(AF_INET, SOCK_STREAM, 0)) == DAPL_INVALID_SOCKET) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                         " ERR: listen socket create: %s\n", 
                         strerror(errno));
@@ -572,7 +711,8 @@ dapli_socket_listen(DAPL_IA         *ia_ptr,
                goto bail;
        }
 
-       setsockopt(cm_ptr->socket,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
+       setsockopt(cm_ptr->socket, SOL_SOCKET, SO_REUSEADDR,
+               (char *) &opt, sizeof(opt));
        addr.sin_port        = htons(serviceID);
        addr.sin_family      = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
@@ -625,7 +765,7 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
 
        (void) dapl_os_memzero(acm_ptr, sizeof(*acm_ptr));
        
-       acm_ptr->socket = -1;
+       acm_ptr->socket = DAPL_INVALID_SOCKET;
        acm_ptr->sp = cm_ptr->sp;
        acm_ptr->hca = cm_ptr->hca;
 
@@ -633,7 +773,7 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
        acm_ptr->socket = accept(cm_ptr->socket, 
                                (struct sockaddr*)&acm_ptr->dst.ia_address, 
                                (socklen_t*)&len);
-       if (acm_ptr->socket < 0) {
+       if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                        " accept: ERR %s on FD %d l_cr %p\n",
                        strerror(errno),cm_ptr->socket,cm_ptr); 
@@ -664,7 +804,7 @@ dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
        dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read QP data\n"); 
 
        /* read in DST QP info, IA address. check for private data */
-       len = read(acm_ptr->socket, &acm_ptr->dst, sizeof(ib_qp_cm_t));
+       len = recv(acm_ptr->socket, (char *) &acm_ptr->dst, sizeof(ib_qp_cm_t), 0);
        if (len != sizeof(ib_qp_cm_t) || 
            ntohs(acm_ptr->dst.ver) != DSCM_VER) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
@@ -700,8 +840,7 @@ dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
 
        /* read private data into cm_handle if any present */
        if (acm_ptr->dst.p_size) {
-               len = read( acm_ptr->socket, 
-                           acm_ptr->p_data, acm_ptr->dst.p_size);
+               len = recv(acm_ptr->socket, acm_ptr->p_data, acm_ptr->dst.p_size, 0);
                if (len != acm_ptr->dst.p_size) {
                        dapl_log(DAPL_DBG_TYPE_ERR, 
                                     " accept read pdata: ERR %s, rcnt=%d\n",
@@ -757,14 +896,14 @@ dapli_socket_accept_usr(DAPL_EP           *ep_ptr,
        DAPL_IA         *ia_ptr = ep_ptr->header.owner_ia;
        dp_ib_cm_handle_t  cm_ptr = cr_ptr->ib_cm_handle;
        ib_qp_cm_t      local;
-       struct iovec    iovec[2];
+       struct iovec    iov[2];
        int             len;
 
        if (p_size > IB_MAX_REP_PDATA_SIZE) 
                return DAT_LENGTH_ERROR;
 
        /* must have a accepted socket */
-       if (cm_ptr->socket < 0)
+       if (cm_ptr->socket == DAPL_INVALID_SOCKET)
                return DAT_INTERNAL_ERROR;
        
        dapl_dbg_log(DAPL_DBG_TYPE_EP, 
@@ -844,14 +983,17 @@ dapli_socket_accept_usr(DAPL_EP           *ep_ptr,
 
        local.ia_address = ia_ptr->hca_ptr->hca_address;
        local.p_size = htonl(p_size);
-       iovec[0].iov_base = &local;
-       iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+       iov[0].iov_base = (void *) &local;
+       iov[0].iov_len = sizeof(ib_qp_cm_t);
        if (p_size) {
-               iovec[1].iov_base = p_data;
-               iovec[1].iov_len  = p_size;
+               iov[1].iov_base = p_data;
+               iov[1].iov_len = p_size;
+               len = writev(cm_ptr->socket, iov, 2);
+       } else {
+               len = writev(cm_ptr->socket, iov, 1);
        }
-       len = writev(cm_ptr->socket, iovec, (p_size ? 2:1));
-       if (len != (p_size + sizeof(ib_qp_cm_t))) {
+
+       if (len != (p_size + sizeof(ib_qp_cm_t))) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                         " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
                         strerror(errno), len,
@@ -859,6 +1001,7 @@ dapli_socket_accept_usr(DAPL_EP            *ep_ptr,
                             &cm_ptr->dst.ia_address)->sin_addr)); 
                goto bail;
        }
+
        dapl_dbg_log(DAPL_DBG_TYPE_CM, 
                     " ACCEPT_USR: local port=0x%x lid=0x%x"
                     " qpn=0x%x psize=%d\n",
@@ -867,9 +1010,9 @@ dapli_socket_accept_usr(DAPL_EP            *ep_ptr,
         dapl_dbg_log(DAPL_DBG_TYPE_CM,
                      " ACCEPT_USR SRC GID subnet %016llx id %016llx\n",
                      (unsigned long long) 
-                       cpu_to_be64(local.gid.global.subnet_prefix),
+                       htonll(local.gid.global.subnet_prefix),
                      (unsigned long long) 
-                       cpu_to_be64(local.gid.global.interface_id));
+                       htonll(local.gid.global.interface_id));
 
        /* save state and reference to EP, queue for RTU data */
        cm_ptr->ep = ep_ptr;
@@ -894,7 +1037,7 @@ dapli_socket_accept_rtu(dp_ib_cm_handle_t  cm_ptr)
        short           rtu_data = 0;
 
        /* complete handshake after final QP state change */
-       len = read(cm_ptr->socket, &rtu_data, sizeof(rtu_data));
+       len = recv(cm_ptr->socket, (char *) &rtu_data, sizeof(rtu_data), 0);
        if (len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f) {
                dapl_log(DAPL_DBG_TYPE_ERR, 
                         " ACCEPT_RTU: ERR %s, rcnt=%d rdata=%x\n",
@@ -1108,9 +1251,9 @@ dapls_ib_remove_conn_listener (
 
        /* close accepted socket, free cm_srvc_handle and return */
        if (cm_ptr != NULL) {
-               if (cm_ptr->socket >= 0) {
-                       close(cm_ptr->socket );
-                       cm_ptr->socket = -1;
+               if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
+                       closesocket(cm_ptr->socket);
+                       cm_ptr->socket = DAPL_INVALID_SOCKET;
                }
                /* cr_thread will free */
                cm_ptr->state = SCM_DESTROY;
@@ -1195,27 +1338,29 @@ dapls_ib_reject_connection(
        IN DAT_COUNT psize,
        IN const DAT_PVOID pdata)
 {
-       struct iovec iovec[2];
+       struct iovec iov[2];
 
        dapl_dbg_log (DAPL_DBG_TYPE_EP,
                      " reject(cm %p reason %x, pdata %p, psize %d)\n",
                      cm_ptr, reason, pdata, psize);
 
        /* write reject data to indicate reject */
-       if (cm_ptr->socket >= 0) {
+       if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
                cm_ptr->dst.rej = (uint16_t)reason;
                cm_ptr->dst.rej = htons(cm_ptr->dst.rej);
-               iovec[0].iov_base = &cm_ptr->dst;
-               iovec[0].iov_len  = sizeof(ib_qp_cm_t);
+
+               iov[0].iov_base = (void *) &cm_ptr->dst;
+               iov[0].iov_len = sizeof(ib_qp_cm_t);
                if (psize) {
-                       iovec[1].iov_base = pdata;
-                       iovec[2].iov_len = psize;
-                       writev(cm_ptr->socket, &iovec[0], 2);
-               } else
-                       writev(cm_ptr->socket, &iovec[0], 1);
-
-               close(cm_ptr->socket);
-               cm_ptr->socket = -1;
+                       iov[1].iov_base = pdata;
+                       iov[1].iov_len = psize;
+                       writev(cm_ptr->socket, iov, 2);
+               } else {
+                       writev(cm_ptr->socket, iov, 1);
+               }
+
+               closesocket(cm_ptr->socket);
+               cm_ptr->socket = DAPL_INVALID_SOCKET;
        }
 
        /* cr_thread will destroy CR */
@@ -1444,138 +1589,141 @@ dapls_ib_get_cm_event (
 }
 
 /* outbound/inbound CR processing thread to avoid blocking applications */
-#define SCM_MAX_CONN 8192
 void cr_thread(void *arg) 
 {
-    struct dapl_hca    *hca_ptr = arg;
-    dp_ib_cm_handle_t  cr, next_cr;
-    int                opt,ret,idx;
-    socklen_t          opt_len;
-    char               rbuf[2];
-    struct pollfd      ufds[SCM_MAX_CONN];
-     
-    dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca %p\n",hca_ptr);
-
-    dapl_os_lock( &hca_ptr->ib_trans.lock );
-    hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
-    while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
-       idx=0;
-       ufds[idx].fd = g_scm_pipe[0]; /* wakeup and process work */
-        ufds[idx].events = POLLIN;
-       ufds[idx].revents = 0;
-
-       if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
-            next_cr = dapl_llist_peek_head (&hca_ptr->ib_trans.list);
-       else
-           next_cr = NULL;
-
-       while (next_cr) {
-           cr = next_cr;
-           if ((cr->socket == -1 && cr->state == SCM_DESTROY) ||
-               hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-
-               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: Free %p\n", cr);
-               next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
-                                               (DAPL_LLIST_ENTRY*)&cr->entry );
-               dapl_llist_remove_entry(&hca_ptr->ib_trans.list, 
-                                       (DAPL_LLIST_ENTRY*)&cr->entry);
-               dapl_os_free(cr, sizeof(*cr));
-               continue;
-           }
-
-           if (idx==SCM_MAX_CONN-1) {
-               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
-                            "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",idx+1);
-               continue;
-           }
-               
-           /* Add to ufds for poll, check for immediate work */
-           ufds[++idx].fd = cr->socket; /* add listen or cr */
-           ufds[idx].revents = 0;
-           if (cr->state == SCM_CONN_PENDING)
-               ufds[idx].events = POLLOUT;
-           else
-               ufds[idx].events = POLLIN;
-
-           /* check socket for event, accept in or connect out */
-           dapl_dbg_log(DAPL_DBG_TYPE_CM," poll cr=%p, fd=%d,%d\n", 
-                               cr, cr->socket, ufds[idx].fd);
-           dapl_os_unlock(&hca_ptr->ib_trans.lock);
-           ret = poll(&ufds[idx],1,0);
-           dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                        " poll wakeup ret=%d cr->st=%d"
-                        " ev=0x%x fd=%d\n",
-                        ret,cr->state,ufds[idx].revents,ufds[idx].fd);
-
-           /* data on listen, qp exchange, and on disconnect request */
-           if ((ret == 1) && ufds[idx].revents == POLLIN) {
-               if (cr->socket > 0) {
-                       if (cr->state == SCM_LISTEN)
-                               dapli_socket_accept(cr);
-                       else if (cr->state == SCM_ACCEPTING)
-                               dapli_socket_accept_data(cr);
-                       else if (cr->state == SCM_ACCEPTED)
-                               dapli_socket_accept_rtu(cr);
-                       else if (cr->state == SCM_RTU_PENDING)
-                               dapli_socket_connect_rtu(cr);
-                       else if (cr->state == SCM_CONNECTED)
-                               dapli_socket_disconnect(cr);
+       struct dapl_hca *hca_ptr = arg;
+       dp_ib_cm_handle_t cr, next_cr;
+       int opt, ret;
+       socklen_t opt_len;
+       char rbuf[2];
+       struct dapl_fd_set *set;
+       enum DAPL_FD_EVENTS event;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread: ENTER hca %p\n", hca_ptr);
+       set = dapl_alloc_fd_set();
+       if (!set)
+               goto out;
+
+       dapl_os_lock(&hca_ptr->ib_trans.lock);
+       hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
+
+       while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) {
+               dapl_fd_zero(set);
+               dapl_fd_set(g_scm_pipe[0], set, DAPL_FD_READ);
+
+               if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
+                       next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);
+               else
+                       next_cr = NULL;
+
+               while (next_cr) {
+                       cr = next_cr;
+                       if ((cr->socket == DAPL_INVALID_SOCKET && cr->state == SCM_DESTROY) ||
+                               hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
+                               next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+                                               (DAPL_LLIST_ENTRY*)&cr->entry);
+                               dapl_llist_remove_entry(&hca_ptr->ib_trans.list, 
+                                               (DAPL_LLIST_ENTRY*)&cr->entry);
+                               dapl_os_free(cr, sizeof(*cr));
+                               continue;
+                       }
+
+                       event = (cr->state == SCM_CONN_PENDING) ?
+                               DAPL_FD_WRITE : DAPL_FD_READ;
+                       if (dapl_fd_set(cr->socket, set, event)) {
+                               dapl_log(DAPL_DBG_TYPE_ERR,
+                                        " cr_thread: DESTROY CR st=%d fd %d"
+                                        " -> %s\n", cr->state, cr->socket,
+                                        inet_ntoa(((struct sockaddr_in*)
+                                            &cr->dst.ia_address)->sin_addr));
+                               dapli_cm_destroy(cr);
+                               continue;
+                       }
+
+                       dapl_dbg_log(DAPL_DBG_TYPE_CM, " poll cr=%p, fd=%d\n",
+                               cr, cr->socket);
+                       dapl_os_unlock(&hca_ptr->ib_trans.lock);
+
+                       ret = dapl_poll(cr->socket, event);
+
+                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                               " poll wakeup ret=%d cr->st=%d fd=%d\n",
+                               ret, cr->state, cr->socket);
+
+                       /* data on listen, qp exchange, and on disconnect request */
+                       if (ret == DAPL_FD_READ) {
+                               if (cr->socket != DAPL_INVALID_SOCKET) {
+                                       switch (cr->state) {
+                                       case SCM_LISTEN:
+                                               dapli_socket_accept(cr);
+                                               break;
+                                       case SCM_ACCEPTING:
+                                               dapli_socket_accept_data(cr);
+                                               break;
+                                       case SCM_ACCEPTED:
+                                               dapli_socket_accept_rtu(cr);
+                                               break;
+                                       case SCM_RTU_PENDING:
+                                               dapli_socket_connect_rtu(cr);
+                                               break;
+                                       case SCM_CONNECTED:
+                                               dapli_socket_disconnect(cr);
+                                               break;
+                                       default:
+                                               break;
+                                       }
+                               }
+                       /* connect socket is writable, check status */
+                       } else if (ret == DAPL_FD_WRITE || ret == DAPL_FD_ERROR) {
+                               if (cr->state == SCM_CONN_PENDING) {
+                                       opt = 0;
+                                       ret = getsockopt(cr->socket, SOL_SOCKET,
+                                               SO_ERROR, (char *) &opt, &opt_len);
+                                       if (!ret)
+                                               dapli_socket_connected(cr, opt);
+                                       else
+                                               dapli_socket_connected(cr, errno);
+                               } else {
+                                       dapl_log(DAPL_DBG_TYPE_CM,
+                                               " CM poll ERR, wrong state(%d) -> %s SKIP\n", cr->state,
+                                               inet_ntoa(((struct sockaddr_in*)&cr->dst.ia_address)->sin_addr));
+                               }
+                       } else if (ret != 0) {
+                               dapl_log(DAPL_DBG_TYPE_CM,
+                                       " CM poll warning %s, ret=%d st=%d -> %s\n",
+                                       strerror(errno), ret, cr->state,
+                                       inet_ntoa(((struct sockaddr_in*)
+                                               &cr->dst.ia_address)->sin_addr));
+
+                               /* POLLUP, NVAL, or poll error, issue event if connected */
+                               if (cr->state == SCM_CONNECTED)
+                                       dapli_socket_disconnect(cr);
+                       } 
+
+                       dapl_os_lock(&hca_ptr->ib_trans.lock);
+                       next_cr =  dapl_llist_next_entry(&hca_ptr->ib_trans.list,
+                               (DAPL_LLIST_ENTRY*)&cr->entry);
                }
-           /* connect socket is writable, check status */
-           } else if ((ret == 1) && 
-                       (ufds[idx].revents & POLLOUT ||
-                        ufds[idx].revents & POLLERR)) {
-               if (cr->state == SCM_CONN_PENDING) {
-                       opt = 0;
-                       ret = getsockopt(cr->socket, SOL_SOCKET, 
-                                        SO_ERROR, &opt, &opt_len);
-                       if (!ret)
-                               dapli_socket_connected(cr,opt);
-                       else
-                               dapli_socket_connected(cr,errno);
-               } else {
-                       dapl_log(DAPL_DBG_TYPE_CM,
-                                " CM poll ERR, wrong state(%d) -> %s SKIP\n",
-                                cr->state,
-                                inet_ntoa(((struct sockaddr_in*)
-                                       &cr->dst.ia_address)->sin_addr));
+
+               dapl_os_unlock(&hca_ptr->ib_trans.lock);
+               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, fds=%d\n",
+                            set->index+1);
+               dapl_select(set);
+               dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
+
+               /* if pipe used to wakeup, consume */
+               if (dapl_poll(g_scm_pipe[0], DAPL_FD_READ) == DAPL_FD_READ) {
+                       if (read(g_scm_pipe[0], rbuf, 2) == -1)
+                               dapl_log(DAPL_DBG_TYPE_CM,
+                                        " cr_thread: read pipe error = %s\n", 
+                                        strerror(errno));
                }
-           } else if (ret != 0) {
-               dapl_log(DAPL_DBG_TYPE_CM,
-                        " CM poll warning %s, ret=%d revnt=%x st=%d -> %s\n",
-                        strerror(errno), ret, ufds[idx].revents, cr->state,
-                        inet_ntoa(((struct sockaddr_in*)
-                               &cr->dst.ia_address)->sin_addr));
-
-               /* POLLUP, NVAL, or poll error, issue event if connected */
-               if (cr->state == SCM_CONNECTED)
-                       dapli_socket_disconnect(cr);
-           } 
-           dapl_os_lock(&hca_ptr->ib_trans.lock);
-           next_cr =  dapl_llist_next_entry(&hca_ptr->ib_trans.list,
-                                            (DAPL_LLIST_ENTRY*)&cr->entry);
+               dapl_os_lock(&hca_ptr->ib_trans.lock);
        } 
+
        dapl_os_unlock(&hca_ptr->ib_trans.lock);
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: sleep, %d\n", idx+1);
-       poll(ufds,idx+1,-1); /* infinite, all sockets and pipe */
-       /* if pipe used to wakeup, consume */
-       if (ufds[0].revents == POLLIN)
-               if (read(g_scm_pipe[0], rbuf, 2) == -1)
-                       dapl_log(DAPL_DBG_TYPE_CM,
-                                " cr_thread: read pipe error = %s\n",
-                                strerror(errno));
-       dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread: wakeup\n");
-       dapl_os_lock(&hca_ptr->ib_trans.lock);
-    } 
-    dapl_os_unlock(&hca_ptr->ib_trans.lock);   
-    hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
-    dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p) exit\n",hca_ptr);
+       free(set);
+out:
+       hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cr_thread(hca %p) exit\n",hca_ptr);
 }
-
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- *  tab-width: 8
- * End:
- */
index 7d6bd4ff9e7b9bc5e9e089f3c472fad3a84a5bf4..59fff111d5dd74d4770d4fb5686bc7fd4cd117c1 100644 (file)
  *
  **************************************************************************/
 
+#include "openib_osd.h"
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_lmr_util.h"
 #include "dapl_evd_util.h"
 #include "dapl_ring_buffer_util.h"
-#include <sys/poll.h>
-#include <signal.h>
 
-int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+#if defined(_WIN64) || defined(_WIN32)
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
 {
-        DAT_RETURN dat_status;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
 
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+       if (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN)
+               return;
 
-        /* create thread to process inbound connect request */
-       hca_ptr->ib_trans.cq_state = IB_THREAD_INIT;
-        dat_status = dapl_os_thread_create(cq_thread, (void*)hca_ptr, &hca_ptr->ib_trans.cq_thread);
-        if (dat_status != DAT_SUCCESS)
-        {
-                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
-                             " cq_thread_init: failed to create thread\n");
-                return 1;
-        }
+       /* destroy cr_thread and lock */
+       hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
+       SetEvent(hca_ptr->ib_trans.ib_cq->event);
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
+       while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
+               dapl_os_sleep_usec(20000);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",dapl_os_getpid());
+}
+
+static void cq_thread(void *arg)
+{
+       struct dapl_hca *hca_ptr = arg;
+       struct dapl_evd *evd_ptr;
+       struct ibv_cq   *ibv_cq = NULL;
+
+       hca_ptr->ib_trans.cq_state = IB_THREAD_RUN;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
        
-       /* wait for thread to start */
-       while (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN) {
-                struct timespec sleep, remain;
-                sleep.tv_sec = 0;
-                sleep.tv_nsec = 20000000; /* 20 ms */
-                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                             " cq_thread_init: waiting for cq_thread\n");
-                nanosleep (&sleep, &remain);
-        }
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%d) exit\n",getpid());
-        return 0;
+       /* wait on DTO event, or signal to abort */
+       while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
+               if (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq, &ibv_cq, (void*)&evd_ptr)) {
+
+                       if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
+                               ibv_ack_cq_events(ibv_cq, 1);
+                               return;
+                       }
+
+                       /* process DTO event via callback */
+                       dapl_evd_dto_callback(hca_ptr->ib_hca_handle, evd_ptr->ib_cq_handle,
+                               (void*)evd_ptr );
+
+                       ibv_ack_cq_events(ibv_cq, 1);
+               }
+       }
+       hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
 }
 
+#else // _WIN32 || _WIN64
+
 void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
 {
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
 
        if (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN)
                return;
 
-        /* destroy cr_thread and lock */
-        hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
-        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
-        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
-        while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
-                struct timespec sleep, remain;
-                sleep.tv_sec = 0;
-                sleep.tv_nsec = 2000000; /* 2 ms */
-                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
-                             " cq_thread_destroy: waiting for cq_thread\n");
-                nanosleep (&sleep, &remain);
-        }
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid());
+       /* destroy cr_thread and lock */
+       hca_ptr->ib_trans.cq_state = IB_THREAD_CANCEL;
+       pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) cancel\n",hca_ptr);
+       while (hca_ptr->ib_trans.cq_state != IB_THREAD_EXIT) {
+               dapl_os_sleep_usec(20000);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",dapl_os_getpid());
 }
 
 /* catch the signal */
 static void ib_cq_handler(int signum)
 {
-        return;
+       return;
 }
 
-void cq_thread( void *arg )
+static void cq_thread(void *arg)
 {
-        struct dapl_hca *hca_ptr = arg;
-        struct dapl_evd *evd_ptr;
-        struct ibv_cq   *ibv_cq = NULL;
+       struct dapl_hca *hca_ptr = arg;
+       struct dapl_evd *evd_ptr;
+       struct ibv_cq   *ibv_cq = NULL;
        sigset_t        sigset;
 
        sigemptyset(&sigset);
-        sigaddset(&sigset,SIGUSR1);
-        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-        signal(SIGUSR1, ib_cq_handler);
+       sigaddset(&sigset,SIGUSR1);
+       pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+       signal(SIGUSR1, ib_cq_handler);
 
        hca_ptr->ib_trans.cq_state = IB_THREAD_RUN;
-       
+
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
        
-        /* wait on DTO event, or signal to abort */
-        while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
-                struct pollfd cq_fd = {
-                        .fd      = hca_ptr->ib_trans.ib_cq->fd,
-                        .events  = POLLIN,
-                        .revents = 0
-                };
+       /* wait on DTO event, or signal to abort */
+       while (hca_ptr->ib_trans.cq_state == IB_THREAD_RUN) {
+               struct pollfd cq_fd = {
+                       .fd      = hca_ptr->ib_trans.ib_cq->fd,
+                       .events  = POLLIN,
+                       .revents = 0
+               };
                if ((poll(&cq_fd, 1, -1) == 1) &&
-                       (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq,  
-                                  &ibv_cq, (void*)&evd_ptr))) {
+                       (!ibv_get_cq_event(hca_ptr->ib_trans.ib_cq, &ibv_cq, (void*)&evd_ptr))) {
 
                        if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) {
                                ibv_ack_cq_events(ibv_cq, 1);
@@ -144,15 +158,40 @@ void cq_thread( void *arg )
                        }
 
                        /* process DTO event via callback */
-                       dapl_evd_dto_callback ( hca_ptr->ib_hca_handle,
-                                               evd_ptr->ib_cq_handle,
-                                               (void*)evd_ptr );
+                       dapl_evd_dto_callback(hca_ptr->ib_hca_handle,
+                               evd_ptr->ib_cq_handle, (void*)evd_ptr );
 
                        ibv_ack_cq_events(ibv_cq, 1);
                } 
-        }
-        hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
-        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+       }
+       hca_ptr->ib_trans.cq_state = IB_THREAD_EXIT;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+}
+
+#endif // _WIN32 || _WIN64
+
+
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+{
+       DAT_RETURN dat_status;
+
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+
+       /* create thread to process inbound connect request */
+       hca_ptr->ib_trans.cq_state = IB_THREAD_INIT;
+       dat_status = dapl_os_thread_create(cq_thread, (void*)hca_ptr, &hca_ptr->ib_trans.cq_thread);
+       if (dat_status != DAT_SUCCESS) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                       " cq_thread_init: failed to create thread\n");
+               return 1;
+       }
+
+       /* wait for thread to start */
+       while (hca_ptr->ib_trans.cq_state != IB_THREAD_RUN) {
+               dapl_os_sleep_usec(20000);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%d) exit\n",dapl_os_getpid());
+       return 0;
 }
 
 
@@ -308,11 +347,11 @@ dapls_ib_cq_alloc (
        IN  DAPL_EVD            *evd_ptr,
        IN  DAT_COUNT           *cqlen )
 {
+       struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
+
        dapl_dbg_log ( DAPL_DBG_TYPE_UTIL, 
                "dapls_ib_cq_alloc: evd %p cqlen=%d \n", evd_ptr, *cqlen );
 
-       struct ibv_comp_channel *channel = ia_ptr->hca_ptr->ib_trans.ib_cq;
-
 #ifdef CQ_WAIT_OBJECT
        if (evd_ptr->cq_wait_obj_handle)
                channel = evd_ptr->cq_wait_obj_handle;
index 45000b9071b198ebccd0b8498b314b2de1afac84..fa19d01ddefd8b13d4d48addc67079dabb4c51df 100644 (file)
@@ -147,12 +147,6 @@ dapls_ib_post_send (
        IN  const DAT_RMR_TRIPLET       *remote_iov,
        IN  DAT_COMPLETION_FLAGS        completion_flags)
 {
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " post_snd: ep %p op %d ck %p sgs",
-                    "%d l_iov %p r_iov %p f %d\n",
-                    ep_ptr, op_type, cookie, segments, local_iov, 
-                    remote_iov, completion_flags);
-
        ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
        ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
        struct ibv_send_wr wr;
@@ -162,6 +156,12 @@ dapls_ib_post_send (
        DAT_COUNT i, total_len;
        int ret;
        
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " post_snd: ep %p op %d ck %p sgs",
+                    "%d l_iov %p r_iov %p f %d\n",
+                    ep_ptr, op_type, cookie, segments, local_iov, 
+                    remote_iov, completion_flags);
+
        dapl_dbg_log(DAPL_DBG_TYPE_EP,
                     " post_snd: ep %p cookie %p segs %d l_iov %p\n",
                     ep_ptr, cookie, segments, local_iov);
@@ -317,12 +317,6 @@ dapls_ib_post_ext_send (
        IN  DAT_COMPLETION_FLAGS        completion_flags,
        IN  DAT_IB_ADDR_HANDLE          *remote_ah)
 {
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " post_ext_snd: ep %p op %d ck %p sgs",
-                    "%d l_iov %p r_iov %p f %d\n",
-                    ep_ptr, op_type, cookie, segments, local_iov, 
-                    remote_iov, completion_flags, remote_ah);
-
        ib_data_segment_t ds_array[DEFAULT_DS_ENTRIES];
        ib_data_segment_t *ds_array_p, *ds_array_start_p = NULL;
        struct ibv_send_wr wr;
@@ -330,6 +324,12 @@ dapls_ib_post_ext_send (
        DAT_COUNT i, total_len;
        int ret;
        
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,
+                    " post_ext_snd: ep %p op %d ck %p sgs",
+                    "%d l_iov %p r_iov %p f %d\n",
+                    ep_ptr, op_type, cookie, segments, local_iov, 
+                    remote_iov, completion_flags, remote_ah);
+
        dapl_dbg_log(DAPL_DBG_TYPE_EP,
                     " post_snd: ep %p cookie %p segs %d l_iov %p\n",
                     ep_ptr, cookie, segments, local_iov);
index 54340ed0231b0725ebaac8f44a86c2541a98c275..9a97e5e467c368b540c47fab9255acc66f817155 100644 (file)
@@ -1,4 +1,4 @@
-/*
+       /*
  * Copyright (c) 2005-2007 Intel Corporation.  All rights reserved.
  *
  * This Software is licensed under one of the following licenses:
  *
  **********************************************************************/
 
-#include <sys/ioctl.h>  /* for IOCTL's */
-#include <sys/types.h>  /* for socket(2) and related bits and pieces */
-#include <sys/socket.h> /* for socket(2) */
-#include <net/if.h>     /* for struct ifreq */
-#include <net/if_arp.h> /* for ARPHRD_ETHER */
-#include <unistd.h>            /* for _SC_CLK_TCK */
-
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_lmr_util.h"
@@ -215,10 +208,9 @@ dapls_ib_mr_register(IN  DAPL_IA *ia_ptr,
        lmr->param.registered_address = (DAT_VADDR)(uintptr_t)virt_addr;
 
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
-                    " mr_register: mr=%p addr=%p h %x pd %p ctx %p "
+                    " mr_register: mr=%p addr=%p pd %p ctx %p "
                     "lkey=0x%x rkey=0x%x priv=%x\n", 
                     lmr->mr_handle, lmr->mr_handle->addr, 
-                    lmr->mr_handle->handle,    
                     lmr->mr_handle->pd, lmr->mr_handle->context,
                     lmr->mr_handle->lkey, lmr->mr_handle->rkey, 
                     length, dapls_convert_privileges(privileges));
index 92b45d5c794585c3ae7c7e22d1e01f85ac0fc129..d82d3f5060f13e7668bda26ac277e6e2db80c8f0 100644 (file)
 static const char rcsid[] = "$Id:  $";
 #endif
 
+#include "openib_osd.h"
 #include "dapl.h"
 #include "dapl_adapter_util.h"
 #include "dapl_ib_util.h"
+#include "dapl_osd.h"
 
 #include <stdlib.h>
-#include <netinet/tcp.h>
-#include <sys/utsname.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <unistd.h>    
-#include <fcntl.h>
 
 int g_dapl_loopback_connection = 0;
 int g_scm_pipe[2];
@@ -88,52 +84,43 @@ char *dapl_ib_mtu_str(enum ibv_mtu mtu)
        }
 }
 
-/* just get IP address for hostname */
-DAT_RETURN getipaddr( char *addr, int addr_len)
+static DAT_RETURN getlocalipaddr(DAT_SOCK_ADDR *addr, int addr_len)
 {
-       struct sockaddr_in      *ipv4_addr = (struct sockaddr_in*)addr;
-       struct hostent          *h_ptr;
-       struct utsname          ourname;
+       struct sockaddr_in *sin;
+       struct addrinfo *res, hint, *ai;
+       int ret;
+       char hostname[256];
 
-       if (uname(&ourname) < 0)  {
-                dapl_log(DAPL_DBG_TYPE_ERR, 
-                         " open_hca: uname err=%s\n", strerror(errno));
+       if (addr_len < sizeof(*sin)) {
                return DAT_INTERNAL_ERROR;
        }
 
-       h_ptr = gethostbyname(ourname.nodename);
-       if (h_ptr == NULL) {
-                dapl_log(DAPL_DBG_TYPE_ERR, 
-                         " open_hca: gethostbyname err=%s\n", 
-                         strerror(errno));
-               return DAT_INTERNAL_ERROR;
+       ret = gethostname(hostname,256);
+       if (ret) 
+               return ret;
+
+       memset(&hint, 0, sizeof hint);
+       hint.ai_flags = AI_PASSIVE; 
+       hint.ai_family = AF_INET;
+       hint.ai_socktype = SOCK_STREAM;
+       hint.ai_protocol = IPPROTO_TCP;
+
+       ret = getaddrinfo(hostname, NULL, &hint, &res);
+       if (ret) 
+               return ret;
+
+       ret = DAT_INVALID_ADDRESS;
+       for (ai = res; ai; ai = ai->ai_next) {
+               sin = (struct sockaddr_in *) ai->ai_addr;
+               if (*((uint32_t *) &sin->sin_addr) != htonl(0x7f000001)) {
+                       *((struct sockaddr_in *) addr) = *sin;
+                       ret = DAT_SUCCESS;
+                       break;
+               }
        }
 
-       if (h_ptr->h_addrtype == AF_INET) {
-               int i;
-               struct in_addr  **alist =
-                       (struct in_addr **)h_ptr->h_addr_list;
-
-               *(uint32_t*)&ipv4_addr->sin_addr = 0;
-               ipv4_addr->sin_family = AF_INET;
-               
-               /* Walk the list of addresses for host */
-               for (i=0; alist[i] != NULL; i++) {
-                      /* first non-loopback address */                 
-                      if (*(uint32_t*)alist[i] != htonl(0x7f000001)) {
-                               dapl_os_memcpy(&ipv4_addr->sin_addr,
-                                              h_ptr->h_addr_list[i],
-                                              4);
-                               break;
-                       }
-               }
-               /* if no acceptable address found */
-               if (*(uint32_t*)&ipv4_addr->sin_addr == 0)
-                       return DAT_INVALID_ADDRESS;
-       } else 
-               return DAT_INVALID_ADDRESS;
-
-       return DAT_SUCCESS;
+       freeaddrinfo(res);
+       return ret;
 }
 
 /*
@@ -165,6 +152,28 @@ int32_t dapls_ib_release (void)
        return 0;
 }
 
+#if defined(_WIN64) || defined(_WIN32)
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       return 0;
+}
+#else // _WIN64 || WIN32
+int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+       int opts;
+
+       opts = fcntl(channel->fd, F_GETFL); /* uCQ */
+       if (opts < 0 || fcntl(channel->fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+               dapl_log(DAPL_DBG_TYPE_ERR, 
+                        " dapls_create_comp_channel: fcntl on ib_cq->fd %d ERR %d %s\n", 
+                        channel->fd, opts, strerror(errno));
+               return errno;
+       }
+
+       return 0;
+}
+#endif
+
 /*
  * dapls_ib_open_hca
  *
@@ -187,7 +196,6 @@ DAT_RETURN dapls_ib_open_hca (
         IN   DAPL_HCA          *hca_ptr)
 {
        struct ibv_device **dev_list;
-       int             opts;
        int             i;
        DAT_RETURN      dat_status = DAT_SUCCESS;
 
@@ -219,7 +227,7 @@ found:
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," open_hca: Found dev %s %016llx\n", 
                     ibv_get_device_name(hca_ptr->ib_trans.ib_dev),
                     (unsigned long long)
-                    bswap_64(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
+                    ntohll(ibv_get_device_guid(hca_ptr->ib_trans.ib_dev)));
 
        hca_ptr->ib_hca_handle = ibv_open_device(hca_ptr->ib_trans.ib_dev);
        if (!hca_ptr->ib_hca_handle) {
@@ -268,13 +276,7 @@ found:
                goto bail;
        }
 
-       opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
-       if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd, 
-                             F_SETFL, opts | O_NONBLOCK) < 0) {
-               dapl_log(DAPL_DBG_TYPE_ERR, 
-                        " open_hca: fcntl on ib_cq->fd %d ERR %d %s\n", 
-                        hca_ptr->ib_trans.ib_cq->fd, opts,
-                        strerror(errno));
+       if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
                goto bail;
        }
 
@@ -309,16 +311,11 @@ found:
        
        /* wait for thread */
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-               struct timespec sleep, remain;
-               sleep.tv_sec = 0;
-               sleep.tv_nsec = 2000000; /* 2 ms */
-               dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
-                            " open_hca: waiting for cr_thread\n");
-               nanosleep (&sleep, &remain);
+               dapl_os_sleep_usec(20000);
        }
 
        /* get the IP address of the device */
-       dat_status = getipaddr((char*)&hca_ptr->hca_address, 
+       dat_status = getlocalipaddr((DAT_SOCK_ADDR*) &hca_ptr->hca_address,
                                sizeof(DAT_SOCK_ADDR6));
        
        dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
@@ -376,16 +373,13 @@ DAT_RETURN dapls_ib_close_hca (   IN   DAPL_HCA   *hca_ptr )
                         " thread_destroy: thread wakeup err = %s\n", 
                         strerror(errno));
        while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) {
-               struct timespec sleep, remain;
-               sleep.tv_sec = 0;
-               sleep.tv_nsec = 2000000; /* 2 ms */
                dapl_dbg_log(DAPL_DBG_TYPE_UTIL, 
                             " close_hca: waiting for cr_thread\n");
                if (write(g_scm_pipe[1], "w", sizeof "w") == -1)
                        dapl_log(DAPL_DBG_TYPE_UTIL, 
                                 " thread_destroy: thread wakeup err = %s\n", 
                                 strerror(errno));
-               nanosleep (&sleep, &remain);
+               dapl_os_sleep_usec(20000);
        }
        dapl_os_lock_destroy(&hca_ptr->ib_trans.lock);
 
index 863da2bc44f1a67e16e761dc3a6542bae6d4e187..fd1c24e971b5ec97dbc4e10103ebecb5d7874c1d 100644 (file)
@@ -49,8 +49,8 @@
 #ifndef _DAPL_IB_UTIL_H_
 #define _DAPL_IB_UTIL_H_
 
+#include "openib_osd.h"
 #include <infiniband/verbs.h>
-#include <byteswap.h>
 
 #ifdef DAT_EXTENSIONS
 #include <dat2/dat_ib_extensions.h>
@@ -73,8 +73,6 @@ typedef       struct ibv_wc           ib_work_completion_t;
 typedef        struct ibv_context      *ib_hca_handle_t;
 typedef ib_hca_handle_t                dapl_ibal_ca_t;
 
-/* CM mappings, user CM not complete use SOCKETS */
-
 /* destination info to exchange, define wire protocol version */
 #define DSCM_VER 3
 typedef struct _ib_qp_cm
@@ -86,7 +84,7 @@ typedef struct _ib_qp_cm
        uint32_t                qpn;
        uint32_t                p_size;
        DAT_SOCK_ADDR6          ia_address;
-        union ibv_gid          gid;
+       union ibv_gid           gid;
        uint16_t                qp_type; 
 } ib_qp_cm_t;
 
@@ -110,20 +108,18 @@ struct ib_cm_handle
        struct dapl_llist_entry entry;
        DAPL_OS_LOCK            lock;
        SCM_STATE               state;
-       int                     socket;
+       DAPL_SOCKET             socket;
        struct dapl_hca         *hca;
        struct dapl_sp          *sp;    
-       struct dapl_ep          *ep;    
+       struct dapl_ep          *ep;
        ib_qp_cm_t              dst;
-       unsigned char           p_data[256];
+       unsigned char           p_data[256];    /* must follow ib_qp_cm_t */
        struct ibv_ah           *ah;
 };
 
 typedef struct ib_cm_handle    *dp_ib_cm_handle_t;
 typedef dp_ib_cm_handle_t      ib_cm_srvc_handle_t;
 
-DAT_RETURN getipaddr(char *addr, int addr_len);
-
 /* CM events */
 typedef enum 
 {
@@ -141,9 +137,6 @@ typedef enum
 
 } ib_cm_events_t;
 
-/* prototype for cm thread */
-void cr_thread (void *arg);
-
 /* Operation and state mappings */
 typedef enum   ibv_send_flags  ib_send_op_type_t;
 typedef        struct  ibv_sge         ib_data_segment_t;
@@ -289,7 +282,7 @@ typedef struct _ib_hca_transport
        DAPL_OS_LOCK            cq_lock;        
        int                     max_inline_send;
        ib_thread_state_t       cq_state;
-       DAPL_OS_THREAD          cq_thread;
+       DAPL_OS_THREAD                  cq_thread;
        struct ibv_comp_channel *ib_cq;
        int                     cr_state;
        DAPL_OS_THREAD          thread;
@@ -317,7 +310,6 @@ typedef uint32_t ib_shm_transport_t;
 /* prototypes */
 int32_t        dapls_ib_init (void);
 int32_t        dapls_ib_release (void);
-void cq_thread (void *arg);
 void cr_thread(void *arg);
 int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
 void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
@@ -349,7 +341,7 @@ dapl_convert_errno( IN int err, IN const char *str )
     if (!err)  return DAT_SUCCESS;
        
 #if DAPL_DBG
-    if ((err != EAGAIN) && (err != ETIME) && (err != ETIMEDOUT))
+    if ((err != EAGAIN) && (err != ETIMEDOUT))
        dapl_dbg_log (DAPL_DBG_TYPE_ERR," %s %s\n", str, strerror(err));
 #endif 
 
@@ -357,24 +349,15 @@ dapl_convert_errno( IN int err, IN const char *str )
     {
        case EOVERFLOW  : return DAT_LENGTH_ERROR;
        case EACCES     : return DAT_PRIVILEGES_VIOLATION;
-       case ENXIO      : 
-       case ERANGE     : 
        case EPERM      : return DAT_PROTECTION_VIOLATION;                
-       case EINVAL     :
-        case EBADF     : 
-       case ENOENT     :
-       case ENOTSOCK   : return DAT_INVALID_HANDLE;
+       case EINVAL     : return DAT_INVALID_HANDLE;
        case EISCONN    : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_CONNECTED;
        case ECONNREFUSED : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_NOTREADY;
-       case ETIME      :           
        case ETIMEDOUT  : return DAT_TIMEOUT_EXPIRED;
        case ENETUNREACH: return DAT_INVALID_ADDRESS | DAT_INVALID_ADDRESS_UNREACHABLE;
        case EADDRINUSE : return DAT_CONN_QUAL_IN_USE;
        case EALREADY   : return DAT_INVALID_STATE | DAT_INVALID_STATE_EP_ACTCONNPENDING;
-        case ENOSPC    : 
-       case ENOMEM     :
-        case E2BIG     :
-        case EDQUOT    : return DAT_INSUFFICIENT_RESOURCES;
+       case ENOMEM     : return DAT_INSUFFICIENT_RESOURCES;
         case EAGAIN    : return DAT_QUEUE_EMPTY;
        case EINTR      : return DAT_INTERRUPTED_CALL;
        case EAFNOSUPPORT : return DAT_INVALID_ADDRESS | DAT_INVALID_ADDRESS_MALFORMED;
diff --git a/dapl/openib_scm/linux/openib_osd.h b/dapl/openib_scm/linux/openib_osd.h
new file mode 100644 (file)
index 0000000..235a82e
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#include <endian.h>
+#include <netinet/in.h>
+
+#if __BYTE_ORDER == __BIG_ENDIAN
+#define htonll(x) (x)
+#define ntohll(x) (x)
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+#define htonll(x)  bswap_64(x)
+#define ntohll(x)  bswap_64(x)
+#endif
+
+#define DAPL_SOCKET int
+#define DAPL_INVALID_SOCKET -1
+#define DAPL_FD_SETSIZE 8192
+
+#define closesocket close
+
+#endif // OPENIB_OSD_H
diff --git a/dapl/openib_scm/windows/openib_osd.h b/dapl/openib_scm/windows/openib_osd.h
new file mode 100644 (file)
index 0000000..67c70ec
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef OPENIB_OSD_H
+#define OPENIB_OSD_H
+
+#ifndef FD_SETSIZE
+#define FD_SETSIZE 1024 /* Set before including winsock2 - see select help */
+#define DAPL_FD_SETSIZE FD_SETSIZE
+#endif
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <io.h>
+#include <fcntl.h>
+
+#define ntohll _byteswap_uint64
+#define htonll _byteswap_uint64
+
+#define pipe(x) _pipe(x, 4096, _O_TEXT)
+#define read _read
+#define write _write
+#define DAPL_SOCKET SOCKET
+#define DAPL_INVALID_SOCKET INVALID_SOCKET
+
+/* allow casting to WSABUF */
+struct iovec
+{
+       u_long iov_len;
+       char FAR* iov_base;
+};
+
+static int writev(DAPL_SOCKET s, struct iovec *vector, int count)
+{
+       int len, ret;
+
+       ret = WSASend(s, (WSABUF *) vector, count, &len, 0, NULL, NULL);
+       return ret ? ret : len;
+}
+
+#endif // OPENIB_OSD_H
+
index 6fef9af810f4ae49d378df6a4b3dda9500ca393c..ae029446afacf81963144f70e88768406a2befa7 100644 (file)
@@ -302,6 +302,15 @@ dapl_os_thread_create (
        IN  void                        *data,
        OUT DAPL_OS_THREAD              *thread_id );
 
+STATIC _INLINE_ void
+dapl_os_sleep_usec(int usec)
+{
+       struct timespec sleep, remain;
+
+       sleep.tv_sec = 0;
+       sleep.tv_nsec = usec * 1000;
+       nanosleep(&sleep, &remain);
+}
 
 /*
  * Lock Functions