]> git.openfabrics.org - ~shefty/ibacm.git/commitdiff
ibacm: Eliminate device event handler thread.
authorKaike Wan <kaike.wan@intel.com>
Wed, 16 Apr 2014 05:18:53 +0000 (22:18 -0700)
committerSean Hefty <sean.hefty@intel.com>
Wed, 16 Apr 2014 06:33:36 +0000 (23:33 -0700)
The event handler thread creates synchronization issues
with the client request, especially when address/endpoint/port
is removed or shut down. This patch moves the device event
handling code into acm_server() so that event and client
reqest are serialized to avoid any potential synchronization
issues.  We also lose the overhead of an additional thread.

Signed-off-by: Kaike Wan <kaike.wan@intel.com>
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/acm.c

index 779aa0f2852f971af276c138b6ad08f8fc62ae4b..19973f31c4eb345f229fbe150815c631cb0d614a 100644 (file)
--- a/src/acm.c
+++ b/src/acm.c
@@ -246,6 +246,7 @@ struct acm_request {
 
 static int acmp_resolve(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
 static int acmp_query(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
+static void acm_event_handler(struct acm_device *dev);
 
 static struct acm_provider def_prov = {
        .resolve = acmp_resolve,
@@ -2764,6 +2765,8 @@ static void acm_server(void)
 {
        fd_set readfds;
        int i, n, ret;
+       struct acm_device *dev;
+       DLIST_ENTRY *dev_entry;
 
        acm_log(0, "started\n");
        acm_init_server();
@@ -2788,6 +2791,13 @@ static void acm_server(void)
                        }
                }
 
+               for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+                    dev_entry = dev_entry->Next) {
+                       dev = container_of(dev_entry, struct acm_device, entry);
+                       FD_SET(dev->verbs->async_fd, &readfds);
+                       n = max(n, (int) dev->verbs->async_fd);
+               }
+
                ret = select(n + 1, &readfds, NULL, NULL, NULL);
                if (ret == SOCKET_ERROR) {
                        acm_log(0, "ERROR - server select error\n");
@@ -2807,6 +2817,16 @@ static void acm_server(void)
                                acm_svr_receive(&client_array[i]);
                        }
                }
+
+               for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+                    dev_entry = dev_entry->Next) {
+                       dev = container_of(dev_entry, struct acm_device, entry);
+                       if (FD_ISSET(dev->verbs->async_fd, &readfds)) {
+                               acm_log(2, "handling event from %s\n", 
+                                       dev->verbs->device->name);
+                               acm_event_handler(dev);
+                       }
+               }
        }
 }
 
@@ -3895,57 +3915,43 @@ static void acm_port_down(struct acm_port *port)
        acm_log(1, "%s %d is down\n", port->dev->verbs->device->name, port->port_num);
 }
 
-/*
- * There is one event handler thread per device.  This is the only thread that
- * modifies the port state or a port endpoint list.  Other threads which access
- * those must synchronize against changes accordingly, but this thread only
- * needs to lock when making modifications.
- */
-static void CDECL_FUNC acm_event_handler(void *context)
+static void acm_event_handler(struct acm_device *dev)
 {
-       struct acm_device *dev = (struct acm_device *) context;
        struct ibv_async_event event;
        int i, ret;
        struct acmp_port *prov_port;
 
-       acm_log(1, "started\n");
-       for (i = 0; i < dev->port_cnt; i++) {
-               acm_port_up(&dev->port[i]);
-       }
+       ret = ibv_get_async_event(dev->verbs, &event);
+       if (ret)
+               return;
 
-       for (;;) {
-               ret = ibv_get_async_event(dev->verbs, &event);
-               if (ret)
-                       continue;
+       acm_log(2, "processing async event %s for %s\n",
+               ibv_event_type_str(event.event_type), 
+               dev->verbs->device->name);
+       i = event.element.port_num - 1;
 
-               acm_log(2, "processing async event %s\n",
-                       ibv_event_type_str(event.event_type));
-               i = event.element.port_num - 1;
-               switch (event.event_type) {
-               case IBV_EVENT_PORT_ACTIVE:
-                       if (dev->port[i].state != IBV_PORT_ACTIVE)
-                               acm_port_up(&dev->port[i]);
-                       break;
-               case IBV_EVENT_PORT_ERR:
-                       if (dev->port[i].state == IBV_PORT_ACTIVE)
-                               acm_port_down(&dev->port[i]);
-                       break;
-               case IBV_EVENT_CLIENT_REREGISTER:
-                       if (dev->port[i].state == IBV_PORT_ACTIVE) {
-                               if ((prov_port = acmp_get_port_by_guid(dev->guid, dev->port[i].port_num)) != NULL)  {
-                                       acmp_port_join(prov_port);
-                                       acm_log(1, "%s %d has reregistered\n",
-                                                       dev->verbs->device->name, i + 1);
-                               }
-                               
-                       }
-                       break;
-               default:
-                       break;
+       switch (event.event_type) {
+       case IBV_EVENT_PORT_ACTIVE:
+               if (dev->port[i].state != IBV_PORT_ACTIVE)
+                       acm_port_up(&dev->port[i]);
+               break;
+       case IBV_EVENT_PORT_ERR:
+               if (dev->port[i].state == IBV_PORT_ACTIVE)
+                       acm_port_down(&dev->port[i]);
+               break;
+       case IBV_EVENT_CLIENT_REREGISTER:
+               if ((dev->port[i].state == IBV_PORT_ACTIVE) &&
+                   (prov_port = acmp_get_port_by_guid(dev->guid, dev->port[i].port_num))) {
+                       acmp_port_join(prov_port);
+                       acm_log(1, "%s %d has reregistered\n",
+                               dev->verbs->device->name, i + 1);
                }
-
-               ibv_ack_async_event(&event);
+               break;
+       default:
+               break;
        }
+
+       ibv_ack_async_event(&event);
 }
 
 static void acm_activate_devices()
@@ -3953,13 +3959,16 @@ static void acm_activate_devices()
        struct acm_device *dev;
        DLIST_ENTRY *dev_entry;
        struct acmp_device *pdev;
+       int i;
 
        acm_log(1, "\n");
        for (dev_entry = dev_list.Next; dev_entry != &dev_list;
                dev_entry = dev_entry->Next) {
 
                dev = container_of(dev_entry, struct acm_device, entry);
-               beginthread(acm_event_handler, dev);
+               for (i = 0; i < dev->port_cnt; i++) {
+                       acm_port_up(&dev->port[i]);
+               }
        }
 
        for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;