From d06dcfd25e5d37310d089bcb7f3d3d75fcece75a Mon Sep 17 00:00:00 2001 From: Arlin Davis Date: Tue, 6 Sep 2005 19:34:46 +0000 Subject: [PATCH] r3326: Changes to support async events. Also consolidated the uAT,uCM,uCQ threads into one processing thread. Signed-off-by: Arlin Davis Signed-off-by: James Lentini --- dapl/openib/dapl_ib_cm.c | 308 ++++------------------------- dapl/openib/dapl_ib_cq.c | 98 +++------- dapl/openib/dapl_ib_util.c | 384 ++++++++++++++++++++++++++++++++----- dapl/openib/dapl_ib_util.h | 34 ++-- 4 files changed, 410 insertions(+), 414 deletions(-) diff --git a/dapl/openib/dapl_ib_cm.c b/dapl/openib/dapl_ib_cm.c index 99a475f..dcdaf78 100644 --- a/dapl/openib/dapl_ib_cm.c +++ b/dapl/openib/dapl_ib_cm.c @@ -70,90 +70,6 @@ static inline uint64_t cpu_to_be64(uint64_t x) { return bswap_64(x); } static inline uint64_t cpu_to_be64(uint64_t x) { return x; } #endif -static int g_at_destroy; -static DAPL_OS_THREAD g_at_thread; -static int g_cm_destroy; -static DAPL_OS_THREAD g_cm_thread; -static DAPL_OS_LOCK g_cm_lock; -static struct dapl_llist_entry *g_cm_list; - -int dapli_cm_thread_init(void) -{ - DAT_RETURN dat_status; - - dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_init(%d)\n", getpid()); - - /* initialize cr_list lock */ - dapl_os_lock_init(&g_cm_lock); - - /* initialize CM list for listens on this HCA */ - dapl_llist_init_head(&g_cm_list); - - /* create thread to process inbound connect request */ - dat_status = dapl_os_thread_create(cm_thread, NULL, &g_cm_thread); - if (dat_status != DAT_SUCCESS) - { - dapl_dbg_log(DAPL_DBG_TYPE_ERR, - " cm_thread_init: failed to create thread\n"); - return 1; - } - return 0; -} - -void dapli_cm_thread_destroy(void) -{ - dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d)\n", getpid()); - - /* destroy cr_thread and lock */ - g_cm_destroy = 1; - pthread_kill( g_cm_thread, SIGUSR1 ); - dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) SIGUSR1 sent\n",getpid()); - while (g_cm_destroy) { - struct timespec sleep, remain; - sleep.tv_sec = 0; - sleep.tv_nsec = 10000000; /* 10 ms */ - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread_destroy: waiting for cm_thread\n"); - nanosleep (&sleep, &remain); - } - dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid()); -} - -int dapli_at_thread_init(void) -{ - DAT_RETURN dat_status; - - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid()); - - /* create thread to process AT async requests */ - dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread); - if (dat_status != DAT_SUCCESS) - { - dapl_dbg_log(DAPL_DBG_TYPE_ERR, - " at_thread_init: failed to create thread\n"); - return 1; - } - return 0; -} - -void dapli_at_thread_destroy(void) -{ - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid()); - - /* destroy cr_thread and lock */ - g_at_destroy = 1; - pthread_kill( g_at_thread, SIGUSR1 ); - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 sent\n",getpid()); - while (g_at_destroy) { - struct timespec sleep, remain; - sleep.tv_sec = 0; - sleep.tv_nsec = 10000000; /* 10 ms */ - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " at_thread_destroy: waiting for at_thread\n"); - nanosleep (&sleep, &remain); - } - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid()); -} void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num) { @@ -348,12 +264,6 @@ static void dapli_destroy_cm_id(struct dapl_cm_id *conn) if (conn->ep) conn->ep->cm_handle = IB_INVALID_HANDLE; - /* take off the CM thread work queue and free */ - dapl_os_lock( &g_cm_lock ); - dapl_llist_remove_entry(&g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry); - dapl_os_unlock(&g_cm_lock); - dapl_os_free(conn, sizeof(*conn)); } } @@ -426,8 +336,8 @@ static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id *conn, if (new_conn) { (void)dapl_os_memzero(new_conn, sizeof(*new_conn)); - dapl_os_lock_init(&new_conn->lock); new_conn->cm_id = event->cm_id; /* provided by uCM */ + event->cm_id->context = new_conn; /* update CM_ID context */ new_conn->sp = conn->sp; new_conn->hca = conn->hca; new_conn->service_id = conn->service_id; @@ -444,13 +354,6 @@ static struct dapl_cm_id * dapli_req_recv(struct dapl_cm_id *conn, event->param.req_rcvd.primary_path, sizeof(struct ib_sa_path_rec)); - /* put new CR on CM thread event work queue */ - dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&new_conn->entry); - dapl_os_lock( &g_cm_lock ); - dapl_llist_add_tail(&g_cm_list, - (DAPL_LLIST_ENTRY*)&new_conn->entry, new_conn); - dapl_os_unlock(&g_cm_lock); - dapl_dbg_log(DAPL_DBG_TYPE_CM, " passive_cb: " "REQ on HCA %p SP %p SID %d L_ID %d new_id %d p_data %p\n", new_conn->hca, new_conn->sp, @@ -521,18 +424,13 @@ static int dapli_cm_active_cb(struct dapl_cm_id *conn, if (conn->ep) conn->ep->cm_handle = IB_INVALID_HANDLE; - /* take off the CM thread work queue and free */ - dapl_os_lock( &g_cm_lock ); - dapl_llist_remove_entry(&g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry); - dapl_os_unlock(&g_cm_lock); dapl_os_free(conn, sizeof(*conn)); } return(destroy); } static int dapli_cm_passive_cb(struct dapl_cm_id *conn, - struct ib_cm_event *event) + struct ib_cm_event *event) { int destroy; struct dapl_cm_id *new_conn; @@ -541,9 +439,6 @@ static int dapli_cm_passive_cb(struct dapl_cm_id *conn, " passive_cb: conn %p id %d event %d\n", conn, conn->cm_id, event->event ); - if (conn->cm_id == 0) - return 0; - dapl_os_lock(&conn->lock); if (conn->destroy) { dapl_os_unlock(&conn->lock); @@ -608,155 +503,11 @@ static int dapli_cm_passive_cb(struct dapl_cm_id *conn, if (conn->ep) conn->ep->cm_handle = IB_INVALID_HANDLE; - /* take off the CM thread work queue and free */ - dapl_os_lock( &g_cm_lock ); - dapl_llist_remove_entry(&g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry); - dapl_os_unlock(&g_cm_lock); - dapl_os_free(conn, sizeof(*conn)); } return(destroy); } -/* something to catch the signal */ -static void ib_sig_handler(int signum) -{ - return; -} - -/* async CM processing thread */ -void cm_thread(void *arg) -{ - struct dapl_cm_id *conn, *next_conn; - struct ib_cm_event *event; - struct pollfd ufds; - sigset_t sigset; - - dapl_dbg_log (DAPL_DBG_TYPE_CM, - " cm_thread(%d,0x%x): ENTER: cm_fd %d\n", - getpid(), g_cm_thread, ib_cm_get_fd()); - - sigemptyset(&sigset); - sigaddset(&sigset, SIGUSR1); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - signal( SIGUSR1, ib_sig_handler); - - dapl_os_lock( &g_cm_lock ); - while (!g_cm_destroy) { - struct ib_cm_id *cm_id; - int ret; - - /* select for CM event, all events process via cm_fd */ - ufds.fd = ib_cm_get_fd(); - ufds.events = POLLIN; - ufds.revents = 0; - - dapl_os_unlock(&g_cm_lock); - ret = poll(&ufds, 1, -1); - if (ret <= 0) { - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread(%d): ERR %s poll\n", - getpid(),strerror(errno)); - dapl_os_lock(&g_cm_lock); - continue; - } - - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread: GET EVENT fd=%d n=%d\n", - ib_cm_get_fd(),ret); - - if (ib_cm_event_get_timed(0,&event)) { - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread: ERR %s event_get on %d\n", - strerror(errno), ib_cm_get_fd() ); - dapl_os_lock(&g_cm_lock); - continue; - } - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread: GET EVENT fd=%d woke\n",ib_cm_get_fd()); - dapl_os_lock(&g_cm_lock); - - /* set proper cm_id */ - if (event->event == IB_CM_REQ_RECEIVED || - event->event == IB_CM_SIDR_REQ_RECEIVED) - cm_id = event->param.req_rcvd.listen_id; - else - cm_id = event->cm_id; - - dapl_dbg_log (DAPL_DBG_TYPE_CM, - " cm_thread: EVENT event(%d) cm_id=%d (%d)\n", - event->event, event->cm_id, cm_id ); - - /* - * Walk cm_list looking for connection id in event - * no need to walk if uCM would provide context with event - */ - if (!dapl_llist_is_empty(&g_cm_list)) - next_conn = dapl_llist_peek_head(&g_cm_list); - else - next_conn = NULL; - - ret = 0; - while (next_conn) { - conn = next_conn; - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread: LIST cm %p c_id %d e_id %d)\n", - conn, conn->cm_id, cm_id ); - - next_conn = dapl_llist_next_entry( - &g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry ); - - if (cm_id == conn->cm_id) { - dapl_os_unlock(&g_cm_lock); - if (conn->sp) - ret = dapli_cm_passive_cb(conn,event); - else - ret = dapli_cm_active_cb(conn,event); - dapl_os_lock(&g_cm_lock); - break; - } - } - ib_cm_event_put(event); - if (ret) { - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " cm_thread: destroy cm_id %d\n",cm_id); - ib_cm_destroy_id(cm_id); - } - } - dapl_os_unlock(&g_cm_lock); - dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread(%d) EXIT, cm_list=%s\n", - getpid(),dapl_llist_is_empty(&g_cm_list) ? "EMPTY":"NOT EMPTY"); - g_cm_destroy = 0; -} - -/* async AT processing thread */ -void at_thread(void *arg) -{ - sigset_t sigset; - - dapl_dbg_log (DAPL_DBG_TYPE_CM, - " at_thread(%d,0x%x): ENTER: at_fd %d\n", - getpid(), g_at_thread, ib_at_get_fd()); - - sigemptyset(&sigset); - sigaddset(&sigset, SIGUSR1); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - signal(SIGUSR1, ib_sig_handler); - - while (!g_at_destroy) { - /* poll forever until callback or signal */ - if (ib_at_callback_get_timed(-1) < 0) { - dapl_dbg_log(DAPL_DBG_TYPE_CM, - " at_thread: SIG? ret=%s, destroy=%d\n", - strerror(errno), g_at_destroy ); - } - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n"); - } - dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid()); - g_at_destroy = 0; -} /************************ DAPL provider entry points **********************/ @@ -853,13 +604,6 @@ dapls_ib_connect ( conn->retries = 0; dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6)); - /* put on CM thread work queue */ - dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry); - dapl_os_lock( &g_cm_lock ); - dapl_llist_add_tail(&g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry, conn); - dapl_os_unlock(&g_cm_lock); - status = ib_at_route_by_ip( ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr, ((struct sockaddr_in *)&conn->hca->hca_address)->sin_addr.s_addr, @@ -1019,13 +763,6 @@ dapls_ib_setup_conn_listener ( conn->hca = ia_ptr->hca_ptr; conn->service_id = ServiceID; - /* put on CM thread work queue */ - dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry); - dapl_os_lock( &g_cm_lock ); - dapl_llist_add_tail(&g_cm_list, - (DAPL_LLIST_ENTRY*)&conn->entry, conn); - dapl_os_unlock(&g_cm_lock); - dapl_dbg_log(DAPL_DBG_TYPE_EP, " setup_listener(conn=%p cm_id=%d)\n", sp_ptr->cm_srvc_handle,conn->cm_id); @@ -1345,8 +1082,6 @@ int dapls_ib_private_data_size ( return size; } -#ifndef SOCKET_CM - /* * Map all socket CM event codes to the DAT equivelent. */ @@ -1457,7 +1192,44 @@ dapls_ib_get_cm_event ( return ib_cm_event; } -#endif +void dapli_cm_event_cb() +{ + struct ib_cm_event *event; + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_cm_event()\n"); + + /* process one CM event, fairness */ + if(!ib_cm_event_get_timed(0,&event)) { + struct dapl_cm_id *conn; + int ret; + dapl_dbg_log(DAPL_DBG_TYPE_CM, + " dapli_cm_event: EVENT=%p ID=%p CTX=%p\n", + event->event, event->cm_id, + event->cm_id->context); + + /* set proper conn from cm_id context*/ + conn = (struct dapl_cm_id*)event->cm_id->context; + + if (conn->sp) + ret = dapli_cm_passive_cb(conn,event); + else + ret = dapli_cm_active_cb(conn,event); + + ib_cm_event_put(event); + + if (ret) + ib_cm_destroy_id(conn->cm_id); + } +} + +void dapli_at_event_cb() +{ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_at_event_cb()\n"); + + /* process one AT event, fairness */ + ib_at_callback_get_timed(0); +} + /* * Local variables: diff --git a/dapl/openib/dapl_ib_cq.c b/dapl/openib/dapl_ib_cq.c index ae32578..4380868 100644 --- a/dapl/openib/dapl_ib_cq.c +++ b/dapl/openib/dapl_ib_cq.c @@ -52,94 +52,40 @@ #include "dapl_evd_util.h" #include "dapl_ring_buffer_util.h" #include -#include -int dapli_cq_thread_init(struct dapl_hca *hca_ptr) +void dapli_cq_event_cb(struct _ib_hca_transport *hca) { - DAT_RETURN dat_status; - - dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr); - - /* create thread to process inbound connect request */ - dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,&hca_ptr->ib_trans.cq_thread); - if (dat_status != DAT_SUCCESS) - { - dapl_dbg_log(DAPL_DBG_TYPE_ERR, - " cq_thread_init: failed to create thread\n"); - return 1; - } - return 0; -} - -void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr) -{ - dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr); - - /* destroy cr_thread and lock */ - hca_ptr->ib_trans.cq_destroy = 1; - pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1); - dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 sent\n",hca_ptr); - while (hca_ptr->ib_trans.cq_destroy != 2) { - struct timespec sleep, remain; - sleep.tv_sec = 0; - sleep.tv_nsec = 10000000; /* 10 ms */ - dapl_dbg_log(DAPL_DBG_TYPE_UTIL, - " cq_thread_destroy: waiting for cq_thread\n"); - nanosleep (&sleep, &remain); - } - dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) exit\n",getpid()); - return; -} - -/* something to catch the signal */ -static void ib_cq_handler(int signum) -{ - return; -} - -void cq_thread( void *arg ) -{ - struct dapl_hca *hca_ptr = arg; - struct dapl_evd *evd_ptr; - struct ibv_cq *ibv_cq = NULL; - sigset_t sigset; - int status = 0; - - dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr); - - sigemptyset(&sigset); - sigaddset(&sigset,SIGUSR1); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - signal(SIGUSR1, ib_cq_handler); - - /* wait on DTO event, or signal to abort */ - while (!hca_ptr->ib_trans.cq_destroy) { - - struct pollfd cq_poll = { - .fd = hca_ptr->ib_hca_handle->cq_fd[0], + int i; + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," dapli_cq_event_cb(%p)\n", hca); + + /* check all comp events on this device */ + for(i=0;iib_ctx->num_comp;i++) { + struct dapl_evd *evd_ptr = NULL; + struct ibv_cq *ibv_cq = NULL; + struct pollfd cq_fd = { + .fd = hca->ib_ctx->cq_fd[i], .events = POLLIN, .revents = 0 }; - - status = poll(&cq_poll, 1, -1); - if ((status == 1) && - (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, (void*)&evd_ptr))) { - + if ((poll(&cq_fd, 1, 0) == 1) && + (!ibv_get_cq_event(hca->ib_ctx, i, + &ibv_cq, (void*)&evd_ptr))) { + + /* + * TODO: ibv put event to protect against + * destroy CQ race conditions? + */ if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD)) continue; /* process DTO event via callback */ - dapl_evd_dto_callback ( evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle, + dapl_evd_dto_callback ( hca->ib_ctx, evd_ptr->ib_cq_handle, (void*)evd_ptr ); - } else { - - } - } - hca_ptr->ib_trans.cq_destroy = 2; - dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr); - return; + } + } } + /* * Map all verbs DTO completion codes to the DAT equivelent. * diff --git a/dapl/openib/dapl_ib_util.c b/dapl/openib/dapl_ib_util.c index 5cc3b60..09fb326 100644 --- a/dapl/openib/dapl_ib_util.c +++ b/dapl/openib/dapl_ib_util.c @@ -55,13 +55,14 @@ static const char rcsid[] = "$Id: $"; #include #include -#include -#include -#include -#include - -int g_dapl_loopback_connection = 0; +#include +int g_dapl_loopback_connection = 0; +int g_ib_destroy = 0; +int g_ib_pipe[2]; +DAPL_OS_THREAD g_ib_thread; +DAPL_OS_LOCK g_hca_lock; +struct dapl_llist_entry *g_hca_list; /* just get IP address, IPv4 only for now */ int dapli_get_hca_addr( struct dapl_hca *hca_ptr ) @@ -130,7 +131,18 @@ int dapli_get_hca_addr( struct dapl_hca *hca_ptr ) int32_t dapls_ib_init (void) { dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" ); - if (dapli_cm_thread_init() || dapli_at_thread_init()) + + /* initialize hca_list lock */ + dapl_os_lock_init(&g_hca_lock); + + /* initialize hca list for CQ events */ + dapl_llist_init_head(&g_hca_list); + + /* create pipe for waking up work thread */ + if (pipe(g_ib_pipe)) + return 1; + + if (dapli_ib_thread_init()) return 1; return 0; @@ -139,8 +151,7 @@ int32_t dapls_ib_init (void) int32_t dapls_ib_release (void) { dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" ); - dapli_at_thread_destroy(); - dapli_cm_thread_destroy(); + dapli_ib_thread_destroy(); return 0; } @@ -196,6 +207,7 @@ DAT_RETURN dapls_ib_open_hca ( ibv_get_device_name(hca_ptr->ib_trans.ib_dev) ); return DAT_INTERNAL_ERROR; } + hca_ptr->ib_trans.ib_ctx = hca_ptr->ib_hca_handle; /* set inline max with enviromment or default, get local lid and gid 0 */ hca_ptr->ib_trans.max_inline_send = @@ -223,19 +235,22 @@ DAT_RETURN dapls_ib_open_hca ( goto bail; } - /* one thread for each device open */ - if (dapli_cq_thread_init(hca_ptr)) { - dapl_dbg_log (DAPL_DBG_TYPE_ERR, - " open_hca: cq_thread_init failed for %s\n", - ibv_get_device_name(hca_ptr->ib_trans.ib_dev) ); - goto bail; - } - - /* initialize cq_lock and wait object */ - dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock); - dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object); - - dapl_dbg_log (DAPL_DBG_TYPE_UTIL, + /* initialize hca wait object for uAT event */ + dapl_os_wait_object_init(&hca_ptr->ib_trans.wait_object); + + /* + * Put new hca_transport on list for async and CQ event processing + * Wakeup work thread to add to polling list + */ + dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry); + dapl_os_lock( &g_hca_lock ); + dapl_llist_add_tail(&g_hca_list, + (DAPL_LLIST_ENTRY*)&hca_ptr->ib_trans.entry, + &hca_ptr->ib_trans.entry); + write(g_ib_pipe[1], "w", sizeof "w"); + dapl_os_unlock(&g_hca_lock); + + dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " open_hca: %s, port %d, %s %d.%d.%d.%d INLINE_MAX=%d\n", ibv_get_device_name(hca_ptr->ib_trans.ib_dev), hca_ptr->port_num, ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_family == AF_INET ? "AF_INET":"AF_INET6", @@ -245,7 +260,6 @@ DAT_RETURN dapls_ib_open_hca ( ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff, hca_ptr->ib_trans.max_inline_send ); - return DAT_SUCCESS; bail: @@ -276,16 +290,28 @@ DAT_RETURN dapls_ib_close_hca ( IN DAPL_HCA *hca_ptr ) dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p->%p\n", hca_ptr,hca_ptr->ib_hca_handle); - dapli_cq_thread_destroy(hca_ptr); - if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) { if (ibv_close_device(hca_ptr->ib_hca_handle)) return(dapl_convert_errno(errno,"ib_close_device")); hca_ptr->ib_hca_handle = IB_INVALID_HANDLE; } - - dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock); + /* + * Remove hca from async and CQ event processing list + * Wakeup work thread to remove from polling list + */ + hca_ptr->ib_trans.destroy = 1; + write(g_ib_pipe[1], "w", sizeof "w"); + + /* wait for thread to remove HCA references */ + while (hca_ptr->ib_trans.destroy != 2) { + struct timespec sleep, remain; + sleep.tv_sec = 0; + sleep.tv_nsec = 10000000; /* 10 ms */ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " ib_thread_destroy: waiting on hca %p destroy\n"); + nanosleep (&sleep, &remain); + } return (DAT_SUCCESS); } @@ -432,31 +458,285 @@ DAT_RETURN dapls_ib_setup_async_callback ( IN void *context ) { - ib_hca_transport_t *hca_ptr; - - dapl_dbg_log (DAPL_DBG_TYPE_UTIL, - " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n", - ia_ptr, handler_type, evd_ptr, callback, context); - - hca_ptr = &ia_ptr->hca_ptr->ib_trans; - switch(handler_type) - { - case DAPL_ASYNC_UNAFILIATED: - hca_ptr->async_unafiliated = callback; - break; - case DAPL_ASYNC_CQ_ERROR: - hca_ptr->async_cq_error = callback; - break; - case DAPL_ASYNC_CQ_COMPLETION: - hca_ptr->async_cq = callback; - break; - case DAPL_ASYNC_QP_ERROR: - hca_ptr->async_qp_error = callback; - break; - default: - break; - } - return DAT_SUCCESS; + ib_hca_transport_t *hca_ptr; + + dapl_dbg_log (DAPL_DBG_TYPE_UTIL, + " setup_async_cb: ia %p type %d handle %p cb %p ctx %p\n", + ia_ptr, handler_type, evd_ptr, callback, context); + + hca_ptr = &ia_ptr->hca_ptr->ib_trans; + switch(handler_type) + { + case DAPL_ASYNC_UNAFILIATED: + hca_ptr->async_unafiliated = callback; + hca_ptr->async_un_ctx = context; + break; + case DAPL_ASYNC_CQ_ERROR: + hca_ptr->async_cq_error = callback; + hca_ptr->async_cq_ctx = context; + break; + case DAPL_ASYNC_CQ_COMPLETION: + hca_ptr->async_cq = callback; + hca_ptr->async_ctx = context; + break; + case DAPL_ASYNC_QP_ERROR: + hca_ptr->async_qp_error = callback; + hca_ptr->async_qp_ctx = context; + break; + default: + break; + } + return DAT_SUCCESS; } +int dapli_ib_thread_init(void) +{ + DAT_RETURN dat_status; + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " ib_thread_init(%d)\n", getpid()); + + /* create thread to process inbound connect request */ + dat_status = dapl_os_thread_create(dapli_thread, NULL, &g_ib_thread); + if (dat_status != DAT_SUCCESS) + { + dapl_dbg_log(DAPL_DBG_TYPE_ERR, + " ib_thread_init: failed to create thread\n"); + return 1; + } + return 0; +} + +void dapli_ib_thread_destroy(void) +{ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " ib_thread_destroy(%d)\n", getpid()); + + /* destroy ib_thread, wait for termination */ + g_ib_destroy = 1; + write(g_ib_pipe[1], "w", sizeof "w"); + while (g_ib_destroy != 2) { + struct timespec sleep, remain; + sleep.tv_sec = 0; + sleep.tv_nsec = 10000000; /* 10 ms */ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " ib_thread_destroy: waiting for ib_thread\n"); + nanosleep(&sleep, &remain); + } + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " ib_thread_destroy(%d) exit\n",getpid()); +} + +void dapli_async_event_cb(struct _ib_hca_transport *hca) +{ + struct ibv_async_event event; + struct pollfd async_fd = { + .fd = hca->ib_ctx->async_fd, + .events = POLLIN, + .revents = 0 + }; + + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " dapli_async_event_cb(%p)\n",hca); + + if (hca->destroy) + return; + + if ((poll(&async_fd, 1, 0)==1) && + (!ibv_get_async_event(hca->ib_ctx, &event))) { + + switch (event.event_type) { + + case IBV_EVENT_CQ_ERR: + { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " dapli_async_event CQ ERR %d\n", + event.event_type); + + /* report up if async callback still setup */ + if (hca->async_cq_error) + hca->async_cq_error(hca->ib_ctx, + &event, + hca->async_cq_ctx); + break; + } + case IBV_EVENT_COMM_EST: + { + /* Received messages on connected QP before RTU */ + struct dapl_ep *ep_ptr = event.element.qp->qp_context; + + /* TODO: cannot process COMM_EST until ibv + * guarantees valid QP context for events. + * Race conditions exist with QP destroy call. + * For now, assume the RTU will arrive. + */ + dapl_dbg_log(DAPL_DBG_TYPE_UTIL, + " dapli_async_event COMM_EST (qp=%p)\n", + event.element.qp); + + if (!DAPL_BAD_HANDLE(ep_ptr, DAPL_MAGIC_EP) && + ep_ptr->cm_handle != IB_INVALID_HANDLE) + ib_cm_establish(ep_ptr->cm_handle->cm_id); + + break; + } + case IBV_EVENT_QP_FATAL: + case IBV_EVENT_QP_REQ_ERR: + case IBV_EVENT_QP_ACCESS_ERR: + case IBV_EVENT_QP_LAST_WQE_REACHED: + case IBV_EVENT_SRQ_ERR: + case IBV_EVENT_SRQ_LIMIT_REACHED: + case IBV_EVENT_SQ_DRAINED: + { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " dapli_async_event QP ERR %d\n", + event.event_type); + + /* report up if async callback still setup */ + if (hca->async_qp_error) + hca->async_qp_error(hca->ib_ctx, + &event, + hca->async_qp_ctx); + break; + } + case IBV_EVENT_PATH_MIG: + case IBV_EVENT_PATH_MIG_ERR: + case IBV_EVENT_DEVICE_FATAL: + case IBV_EVENT_PORT_ACTIVE: + case IBV_EVENT_PORT_ERR: + case IBV_EVENT_LID_CHANGE: + case IBV_EVENT_PKEY_CHANGE: + case IBV_EVENT_SM_CHANGE: + { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " dapli_async_event DEV ERR %d\n", + event.event_type); + + /* report up if async callback still setup */ + if (hca->async_unafiliated) + hca->async_unafiliated( + hca->ib_ctx, + &event, + hca->async_un_ctx); + break; + } + default: + { + dapl_dbg_log (DAPL_DBG_TYPE_WARN, + "--> DsEventCb: UNKNOWN\n"); + break; + } + } + ibv_put_async_event(&event); + } +} + + +/* work thread for uAT, uCM, CQ, and async events */ +void dapli_thread(void *arg) +{ + struct pollfd ufds[__FD_SETSIZE]; + struct _ib_hca_transport *uhca[__FD_SETSIZE]={NULL}; + struct _ib_hca_transport *hca; + int ret,idx,fds; + char rbuf[2]; + + dapl_dbg_log (DAPL_DBG_TYPE_UTIL, + " ib_thread(%d,0x%x): ENTER: pipe %d cm %d at %d\n", + getpid(), g_ib_thread, + g_ib_pipe[0], ib_cm_get_fd(), + ib_at_get_fd()); + + /* Poll across pipe, CM, AT never changes */ + dapl_os_lock( &g_hca_lock ); + + ufds[0].fd = g_ib_pipe[0]; /* pipe */ + ufds[0].events = POLLIN; + ufds[1].fd = ib_cm_get_fd(); /* uCM */ + ufds[1].events = POLLIN; + ufds[2].fd = ib_at_get_fd(); /* uAT */ + ufds[2].events = POLLIN; + + while (!g_ib_destroy) { + + /* build ufds after pipe, cm, at events */ + ufds[0].revents = 0; + ufds[1].revents = 0; + ufds[2].revents = 0; + idx=2; + + /* Walk HCA list and setup async and CQ events */ + if (!dapl_llist_is_empty(&g_hca_list)) + hca = dapl_llist_peek_head(&g_hca_list); + else + hca = NULL; + + while(hca) { + int i; + ufds[++idx].fd = hca->ib_ctx->async_fd; /* uASYNC */ + ufds[idx].events = POLLIN; + ufds[idx].revents = 0; + uhca[idx] = hca; + for (i=0;iib_ctx->num_comp;i++) { /* uCQ */ + ufds[++idx].fd = hca->ib_ctx->cq_fd[i]; + ufds[idx].events = POLLIN; + ufds[idx].revents = 0; + uhca[idx] = hca; + } + hca = dapl_llist_next_entry( + &g_hca_list, + (DAPL_LLIST_ENTRY*)&hca->entry); + } + + /* unlock, and setup poll */ + fds = idx+1; + dapl_os_unlock(&g_hca_lock); + ret = poll(ufds, fds, -1); + if (ret <= 0) { + dapl_dbg_log(DAPL_DBG_TYPE_WARN, + " ib_thread(%d): ERR %s poll\n", + getpid(),strerror(errno)); + dapl_os_lock(&g_hca_lock); + continue; + } + + /* check and process CQ and ASYNC events, each open device */ + for(idx=3;idxdestroy == 1) { + dapl_os_lock(&g_hca_lock); + dapl_llist_remove_entry( + &g_hca_list, + (DAPL_LLIST_ENTRY*) + &uhca[idx]->entry); + dapl_os_unlock(&g_hca_lock); + uhca[idx]->destroy = 2; + } + } + } + + /* CM and AT events */ + if (ufds[1].revents == POLLIN) + dapli_cm_event_cb(); + + if (ufds[2].revents == POLLIN) + dapli_at_event_cb(); + + dapl_os_lock(&g_hca_lock); + } + dapl_dbg_log(DAPL_DBG_TYPE_UTIL," ib_thread(%d) EXIT\n",getpid()); + g_ib_destroy = 2; + dapl_os_unlock(&g_hca_lock); +} diff --git a/dapl/openib/dapl_ib_util.h b/dapl/openib/dapl_ib_util.h index 6f976a7..867a275 100644 --- a/dapl/openib/dapl_ib_util.h +++ b/dapl/openib/dapl_ib_util.h @@ -231,18 +231,22 @@ typedef void (*ib_async_handler_t)( /* ib_hca_transport_t, specific to this implementation */ typedef struct _ib_hca_transport { - struct ibv_device *ib_dev; + struct ib_llist_entry entry; + int destroy; + struct ibv_device *ib_dev; + struct ibv_context *ib_ctx; ib_cq_handle_t ib_cq_empty; - DAPL_OS_LOCK cq_lock; DAPL_OS_WAIT_OBJECT wait_object; - int cq_destroy; - DAPL_OS_THREAD cq_thread; int max_inline_send; union ibv_gid gid; ib_async_handler_t async_unafiliated; + void *async_un_ctx; ib_async_handler_t async_cq_error; + void *async_ctx; ib_async_handler_t async_cq; + void *async_cq_ctx; ib_async_handler_t async_qp_error; + void *async_qp_ctx; } ib_hca_transport_t; @@ -252,21 +256,15 @@ typedef uint32_t ib_shm_transport_t; /* prototypes */ int32_t dapls_ib_init (void); int32_t dapls_ib_release (void); -void cm_thread (void *arg); -int dapli_cm_thread_init(void); -void dapli_cm_thread_destroy(void); -void at_thread (void *arg); -int dapli_at_thread_init(void); -void dapli_at_thread_destroy(void); -void cq_thread (void *arg); -int dapli_cq_thread_init(struct dapl_hca *hca_ptr); -void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr); - -int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid); -int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index, - union ibv_gid *gid); -int dapli_get_hca_addr(struct dapl_hca *hca_ptr); +void dapli_thread(void *arg); +int dapli_ib_thread_init(void); +void dapli_ib_thread_destroy(void); +int dapli_get_hca_addr(struct dapl_hca *hca_ptr); void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num); +void dapli_cm_event_cb(void); +void dapli_at_event_cb(void); +void dapli_cq_event_cb(struct _ib_hca_transport *hca); +void dapli_async_event_cb(struct _ib_hca_transport *hca); DAT_RETURN dapls_modify_qp_state ( IN ib_qp_handle_t qp_handle, -- 2.41.0