static int acmp_resolve(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
static int acmp_query(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
+static void acm_event_handler(struct acm_device *dev);
static struct acm_provider def_prov = {
.resolve = acmp_resolve,
{
fd_set readfds;
int i, n, ret;
+ struct acm_device *dev;
+ DLIST_ENTRY *dev_entry;
acm_log(0, "started\n");
acm_init_server();
}
}
+ for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+ dev_entry = dev_entry->Next) {
+ dev = container_of(dev_entry, struct acm_device, entry);
+ FD_SET(dev->verbs->async_fd, &readfds);
+ n = max(n, (int) dev->verbs->async_fd);
+ }
+
ret = select(n + 1, &readfds, NULL, NULL, NULL);
if (ret == SOCKET_ERROR) {
acm_log(0, "ERROR - server select error\n");
acm_svr_receive(&client_array[i]);
}
}
+
+ for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+ dev_entry = dev_entry->Next) {
+ dev = container_of(dev_entry, struct acm_device, entry);
+ if (FD_ISSET(dev->verbs->async_fd, &readfds)) {
+ acm_log(2, "handling event from %s\n",
+ dev->verbs->device->name);
+ acm_event_handler(dev);
+ }
+ }
}
}
acm_log(1, "%s %d is down\n", port->dev->verbs->device->name, port->port_num);
}
-/*
- * There is one event handler thread per device. This is the only thread that
- * modifies the port state or a port endpoint list. Other threads which access
- * those must synchronize against changes accordingly, but this thread only
- * needs to lock when making modifications.
- */
-static void CDECL_FUNC acm_event_handler(void *context)
+static void acm_event_handler(struct acm_device *dev)
{
- struct acm_device *dev = (struct acm_device *) context;
struct ibv_async_event event;
int i, ret;
struct acmp_port *prov_port;
- acm_log(1, "started\n");
- for (i = 0; i < dev->port_cnt; i++) {
- acm_port_up(&dev->port[i]);
- }
+ ret = ibv_get_async_event(dev->verbs, &event);
+ if (ret)
+ return;
- for (;;) {
- ret = ibv_get_async_event(dev->verbs, &event);
- if (ret)
- continue;
+ acm_log(2, "processing async event %s for %s\n",
+ ibv_event_type_str(event.event_type),
+ dev->verbs->device->name);
+ i = event.element.port_num - 1;
- acm_log(2, "processing async event %s\n",
- ibv_event_type_str(event.event_type));
- i = event.element.port_num - 1;
- switch (event.event_type) {
- case IBV_EVENT_PORT_ACTIVE:
- if (dev->port[i].state != IBV_PORT_ACTIVE)
- acm_port_up(&dev->port[i]);
- break;
- case IBV_EVENT_PORT_ERR:
- if (dev->port[i].state == IBV_PORT_ACTIVE)
- acm_port_down(&dev->port[i]);
- break;
- case IBV_EVENT_CLIENT_REREGISTER:
- if (dev->port[i].state == IBV_PORT_ACTIVE) {
- if ((prov_port = acmp_get_port_by_guid(dev->guid, dev->port[i].port_num)) != NULL) {
- acmp_port_join(prov_port);
- acm_log(1, "%s %d has reregistered\n",
- dev->verbs->device->name, i + 1);
- }
-
- }
- break;
- default:
- break;
+ switch (event.event_type) {
+ case IBV_EVENT_PORT_ACTIVE:
+ if (dev->port[i].state != IBV_PORT_ACTIVE)
+ acm_port_up(&dev->port[i]);
+ break;
+ case IBV_EVENT_PORT_ERR:
+ if (dev->port[i].state == IBV_PORT_ACTIVE)
+ acm_port_down(&dev->port[i]);
+ break;
+ case IBV_EVENT_CLIENT_REREGISTER:
+ if ((dev->port[i].state == IBV_PORT_ACTIVE) &&
+ (prov_port = acmp_get_port_by_guid(dev->guid, dev->port[i].port_num))) {
+ acmp_port_join(prov_port);
+ acm_log(1, "%s %d has reregistered\n",
+ dev->verbs->device->name, i + 1);
}
-
- ibv_ack_async_event(&event);
+ break;
+ default:
+ break;
}
+
+ ibv_ack_async_event(&event);
}
static void acm_activate_devices()
struct acm_device *dev;
DLIST_ENTRY *dev_entry;
struct acmp_device *pdev;
+ int i;
acm_log(1, "\n");
for (dev_entry = dev_list.Next; dev_entry != &dev_list;
dev_entry = dev_entry->Next) {
dev = container_of(dev_entry, struct acm_device, entry);
- beginthread(acm_event_handler, dev);
+ for (i = 0; i < dev->port_cnt; i++) {
+ acm_port_up(&dev->port[i]);
+ }
}
for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;