static const char rcsid[] = "$Id: $";
#endif
+#include "openib_osd.h"
#include "dapl.h"
#include "dapl_adapter_util.h"
#include "dapl_ib_util.h"
+#include "dapl_osd.h"
#include <stdlib.h>
-#include <netinet/tcp.h>
-#include <sys/poll.h>
-#include <fcntl.h>
-
-#include <sys/ioctl.h> /* for IOCTL's */
-#include <sys/types.h> /* for socket(2) and related bits and pieces */
-#include <sys/socket.h> /* for socket(2) */
-#include <net/if.h> /* for struct ifreq */
-#include <net/if_arp.h> /* for ARPHRD_INFINIBAND */
-#include <arpa/inet.h> /* for inet_ntoa */
int g_dapl_loopback_connection = 0;
-int g_ib_pipe[2];
struct rdma_event_channel *g_cm_events = NULL;
ib_thread_state_t g_ib_thread_state = 0;
DAPL_OS_THREAD g_ib_thread;
DAPL_OS_LOCK g_hca_lock;
struct dapl_llist_entry *g_hca_list;
+#if defined(_WIN64) || defined(_WIN32)
+#include "..\..\..\..\..\etc\user\comp_channel.cpp"
+#include "..\..\..\..\..\etc\user\dlist.c"
+
+#define getipaddr_netdev(x,y,z) -1
+struct ibvw_windata windata;
+
+static int dapls_os_init(void)
+{
+ return ibvw_get_windata(&windata, IBVW_WINDATA_VERSION);
+}
+
+static void dapls_os_release(void)
+{
+ if (windata.comp_mgr)
+ ibvw_release_windata(&windata, IBVW_WINDATA_VERSION);
+ windata.comp_mgr = NULL;
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ channel->comp_channel.Milliseconds = 0;
+ return 0;
+}
+
+static int dapls_config_cm_channel(struct rdma_event_channel *channel)
+{
+ channel->channel.Milliseconds = 0;
+ return 0;
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+ verbs->channel.Milliseconds = 0;
+ return 0;
+}
+
+static int dapls_thread_signal(void)
+{
+ CompManagerCancel(windata.comp_mgr);
+ return 0;
+}
+#else // _WIN64 || WIN32
+int g_ib_pipe[2];
+
+static int dapls_os_init(void)
+{
+ /* create pipe for waking up work thread */
+ return pipe(g_ib_pipe);
+}
+
+static void dapls_os_release(void)
+{
+ /* close pipe? */
+}
+
/* Get IP address using network device name */
static int getipaddr_netdev(char *name, char *addr, int addr_len)
{
return ret;
}
+static int dapls_config_fd(int fd)
+{
+ int opts;
+
+ opts = fcntl(fd, F_GETFL);
+ if (opts < 0 || fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " dapls_config_fd: fcntl on fd %d ERR %d %s\n",
+ fd, opts, strerror(errno));
+ return errno;
+ }
+
+ return 0;
+}
+
+static int dapls_config_comp_channel(struct ibv_comp_channel *channel)
+{
+ return dapls_config_fd(channel->fd);
+}
+
+static int dapls_config_cm_channel(struct rdma_event_channel *channel)
+{
+ return dapls_config_fd(channel->fd);
+}
+
+static int dapls_config_verbs(struct ibv_context *verbs)
+{
+ return dapls_config_fd(verbs->async_fd);
+}
+
+static int dapls_thread_signal(void)
+{
+ return write(g_ib_pipe[1], "w", sizeof "w");
+}
+#endif
+
/* Get IP address using network name, address, or device name */
static int getipaddr(char *name, char *addr, int len)
{
/* initialize hca list for CQ events */
dapl_llist_init_head(&g_hca_list);
- /* create pipe for waking up work thread */
- if (pipe(g_ib_pipe))
+ if (dapls_os_init())
return 1;
return 0;
dapli_ib_thread_destroy();
if (g_cm_events != NULL)
rdma_destroy_event_channel(g_cm_events);
+ dapls_os_release();
return 0;
}
*/
DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA *hca_ptr)
{
- long opts;
struct rdma_cm_id *cm_id = NULL;
union ibv_gid *gid;
int ret;
dapl_os_unlock(&g_hca_lock);
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " open_hca: RDMA channel created(%p,%d)\n",
- g_cm_events, g_cm_events->fd);
+ " open_hca: RDMA channel created (%p)\n", g_cm_events);
dat_status = dapli_ib_thread_init();
if (dat_status != DAT_SUCCESS)
/* keep reference to IB device and cm_id */
hca_ptr->ib_trans.cm_id = cm_id;
hca_ptr->ib_hca_handle = cm_id->verbs;
+ dapls_config_verbs(cm_id->verbs);
hca_ptr->port_num = cm_id->port_num;
gid = &cm_id->route.addr.addr.ibaddr.sgid;
DAPL_DBG_TYPE_UTIL,
" open_hca: ctx=%p port=%d GID subnet %016llx id %016llx\n",
cm_id->verbs,cm_id->port_num,
- (unsigned long long)bswap_64(gid->global.subnet_prefix),
- (unsigned long long)bswap_64(gid->global.interface_id));
+ (unsigned long long)ntohll(gid->global.subnet_prefix),
+ (unsigned long long)ntohll(gid->global.interface_id));
/* set inline max with env or default, get local lid and gid 0 */
if (hca_ptr->ib_hca_handle->device->transport_type
strerror(errno));
goto bail;
}
- dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
- " open_hca: CQ channel created(fd=%d)\n",
- hca_ptr->ib_trans.ib_cq->fd);
+ dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " open_hca: CQ channel created\n");
- opts = fcntl(hca_ptr->ib_trans.ib_cq->fd, F_GETFL); /* uCQ */
- if (opts < 0 || fcntl(hca_ptr->ib_trans.ib_cq->fd,
- F_SETFL, opts | O_NONBLOCK) < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " open_hca: fcntl on ib_cq->fd %d ERR %d %s\n",
- hca_ptr->ib_trans.ib_cq->fd, opts,
- strerror(errno));
+ if (dapls_config_comp_channel(hca_ptr->ib_trans.ib_cq)) {
goto bail;
}
-
+
/*
* Put new hca_transport on list for async and CQ event processing
* Wakeup work thread to add to polling list
dapl_llist_add_tail(&g_hca_list,
(DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry,
&hca_ptr->ib_trans.entry);
- if (write(g_ib_pipe[1], "w", sizeof "w") == -1)
+ if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" open_hca: thread wakeup error = %s\n",
strerror(errno));
* Wakeup work thread to remove from polling list
*/
hca_ptr->ib_trans.destroy = 1;
- if (write(g_ib_pipe[1], "w", sizeof "w") == -1)
+ if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" destroy: thread wakeup error = %s\n",
strerror(errno));
/* wait for thread to remove HCA references */
while (hca_ptr->ib_trans.destroy != 2) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 10000000; /* 10 ms */
- if (write(g_ib_pipe[1], "w", sizeof "w") == -1)
+ if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" destroy: thread wakeup error = %s\n",
strerror(errno));
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread_destroy: wait on hca %p destroy\n");
- nanosleep (&sleep, &remain);
+ dapl_os_sleep_usec(10000);
}
bail:
return (DAT_SUCCESS);
DAT_RETURN dapli_ib_thread_init(void)
{
- long opts;
DAT_RETURN dat_status;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " ib_thread_init(%d)\n", getpid());
+ " ib_thread_init(%d)\n", dapl_os_getpid());
dapl_os_lock(&g_hca_lock);
if (g_ib_thread_state != IB_THREAD_INIT) {
}
/* uCMA events non-blocking */
- opts = fcntl(g_cm_events->fd, F_GETFL); /* uCMA */
- if (opts < 0 || fcntl(g_cm_events->fd,
- F_SETFL, opts | O_NONBLOCK) < 0) {
+ if (dapls_config_cm_channel(g_cm_events)) {
dapl_os_unlock(&g_hca_lock);
return(dapl_convert_errno(errno, "create_thread ERR: cm_fd"));
}
/* wait for thread to start */
dapl_os_lock(&g_hca_lock);
while (g_ib_thread_state != IB_THREAD_RUN) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread_init: waiting for ib_thread\n");
dapl_os_unlock(&g_hca_lock);
- nanosleep (&sleep, &remain);
+ dapl_os_sleep_usec(2000);
dapl_os_lock(&g_hca_lock);
}
dapl_os_unlock(&g_hca_lock);
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " ib_thread_init(%d) exit\n",getpid());
+ " ib_thread_init(%d) exit\n",dapl_os_getpid());
return DAT_SUCCESS;
}
int retries = 10;
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " ib_thread_destroy(%d)\n", getpid());
+ " ib_thread_destroy(%d)\n", dapl_os_getpid());
/*
* wait for async thread to terminate.
* pthread_join would be the correct method
goto bail;
g_ib_thread_state = IB_THREAD_CANCEL;
- if (write(g_ib_pipe[1], "w", sizeof "w") == -1)
+ if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" destroy: thread wakeup error = %s\n",
strerror(errno));
while ((g_ib_thread_state != IB_THREAD_EXIT) && (retries--)) {
- struct timespec sleep, remain;
- sleep.tv_sec = 0;
- sleep.tv_nsec = 2000000; /* 2 ms */
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread_destroy: waiting for ib_thread\n");
- if (write(g_ib_pipe[1], "w", sizeof "w") == -1)
+ if (dapls_thread_signal() == -1)
dapl_log(DAPL_DBG_TYPE_UTIL,
" destroy: thread wakeup error = %s\n",
strerror(errno));
dapl_os_unlock( &g_hca_lock );
- nanosleep(&sleep, &remain);
+ dapl_os_sleep_usec(2000);
dapl_os_lock( &g_hca_lock );
}
dapl_os_unlock( &g_hca_lock );
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
- " ib_thread_destroy(%d) exit\n",getpid());
+ " ib_thread_destroy(%d) exit\n",dapl_os_getpid());
}
void dapli_async_event_cb(struct _ib_hca_transport *hca)
{
struct ibv_async_event event;
- struct pollfd async_fd = {
- .fd = hca->cm_id->verbs->async_fd,
- .events = POLLIN,
- .revents = 0
- };
dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " async_event(%p)\n",hca);
if (hca->destroy)
return;
- if ((poll(&async_fd, 1, 0)==1) &&
- (!ibv_get_async_event(hca->cm_id->verbs, &event))) {
+ if (!ibv_get_async_event(hca->cm_id->verbs, &event)) {
switch (event.event_type) {
case IBV_EVENT_CQ_ERR:
}
}
+#if defined(_WIN64) || defined(_WIN32)
+/* work thread for uAT, uCM, CQ, and async events */
+void dapli_thread(void *arg)
+{
+ struct _ib_hca_transport *hca;
+ struct _ib_hca_transport *uhca[8];
+ COMP_CHANNEL *channel;
+ int ret, idx, cnt;
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d,0x%x): ENTER: \n",
+ dapl_os_getpid(), g_ib_thread);
+
+ dapl_os_lock(&g_hca_lock);
+ for (g_ib_thread_state = IB_THREAD_RUN;
+ g_ib_thread_state == IB_THREAD_RUN; dapl_os_lock(&g_hca_lock)) {
+
+ idx = 0;
+ hca = dapl_llist_is_empty(&g_hca_list) ? NULL :
+ dapl_llist_peek_head(&g_hca_list);
+
+ while (hca) {
+ uhca[idx++] = hca;
+ hca = dapl_llist_next_entry(&g_hca_list,
+ (DAPL_LLIST_ENTRY *) &hca->entry);
+ }
+ cnt = idx;
+
+ dapl_os_unlock(&g_hca_lock);
+ ret = CompManagerPoll(windata.comp_mgr, INFINITE, &channel);
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+ " ib_thread(%d) poll_event 0x%x\n",
+ dapl_os_getpid(), ret);
+
+ dapli_cma_event_cb();
+
+ /* check and process CQ and ASYNC events, per device */
+ for (idx = 0; idx < cnt; idx++) {
+ if (uhca[idx]->destroy == 1) {
+ dapl_os_lock(&g_hca_lock);
+ dapl_llist_remove_entry(&g_hca_list,
+ (DAPL_LLIST_ENTRY*) &uhca[idx]->entry);
+ dapl_os_unlock(&g_hca_lock);
+ uhca[idx]->destroy = 2;
+ } else {
+ dapli_cq_event_cb(uhca[idx]);
+ dapli_async_event_cb(uhca[idx]);
+ }
+ }
+ }
+
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " ib_thread(%d) EXIT\n", dapl_os_getpid());
+ g_ib_thread_state = IB_THREAD_EXIT;
+ dapl_os_unlock(&g_hca_lock);
+}
+#else // _WIN64 || WIN32
/* work thread for uAT, uCM, CQ, and async events */
void dapli_thread(void *arg)
{
dapl_dbg_log (DAPL_DBG_TYPE_UTIL,
" ib_thread(%d,0x%x): ENTER: pipe %d ucma %d\n",
- getpid(), g_ib_thread, g_ib_pipe[0], g_cm_events->fd);
+ dapl_os_getpid(), g_ib_thread, g_ib_pipe[0], g_cm_events->fd);
/* Poll across pipe, CM, AT never changes */
dapl_os_lock( &g_hca_lock );
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread(%d) poll_fd: hca[%d]=%p, async=%d"
" pipe=%d cm=%d cq=d\n",
- getpid(), hca, ufds[idx-1].fd,
+ dapl_os_getpid(), hca, ufds[idx-1].fd,
ufds[0].fd, ufds[1].fd, ufds[idx].fd);
hca = dapl_llist_next_entry(
if (ret <= 0) {
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread(%d): ERR %s poll\n",
- getpid(),strerror(errno));
+ dapl_os_getpid(),strerror(errno));
dapl_os_lock(&g_hca_lock);
continue;
}
dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
" ib_thread(%d) poll_event: "
" async=0x%x pipe=0x%x cm=0x%x cq=0x%x\n",
- getpid(), ufds[idx-1].revents, ufds[0].revents,
+ dapl_os_getpid(), ufds[idx-1].revents, ufds[0].revents,
ufds[1].revents, ufds[idx].revents);
/* uCMA events */
dapl_os_lock(&g_hca_lock);
}
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid());
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",dapl_os_getpid());
g_ib_thread_state = IB_THREAD_EXIT;
dapl_os_unlock(&g_hca_lock);
}
+#endif
/*
* dapls_set_provider_specific_attr