From f490dc7109f52e841df6e7d6d083537dc0b8b9a5 Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 4 Jun 2014 11:39:36 -0700 Subject: [PATCH] ibacm: Centralize provider SA access in core. This patch is the first of a series to centralize the SA access for all providers. Here is the description of the approach: (1) A provider will call the core to allocate a request and initialize the request (including the callback function); (2) Provider calls the core to send the SA request; (3) The core SA handler thread will poll for any SA response; (4) When a SA response is received, the core SA handler will find the matching request and call the corresponding provider callback function; (5) The provider callback will handle the SA response and free the SA request. The approach has the following advantages: (1) No code duplication in the provider for SA access; (2) No need for a separate thread in the provider to wait for SA response. Signed-off-by: Kaike Wan Signed-off-by: Sean Hefty --- include/infiniband/acm_prov.h | 16 +- src/acm.c | 277 ++++++++++++++++++++++++++++++++++ 2 files changed, 292 insertions(+), 1 deletion(-) diff --git a/include/infiniband/acm_prov.h b/include/infiniband/acm_prov.h index 9ec407e..d803d95 100644 --- a/include/infiniband/acm_prov.h +++ b/include/infiniband/acm_prov.h @@ -31,6 +31,8 @@ #define ACM_PROV_H #include +#include +#include #define ACM_PROV_VERSION 1 @@ -70,7 +72,7 @@ struct acm_provider { void *port_context, void **ep_context); void (*close_endpoint)(void *ep_context); int (*add_address)(const struct acm_address *addr, void *ep_context, - void **addr_context); + void **addr_context); void (*remove_address)(void *addr_context, struct acm_address *addr); int (*resolve)(void *addr_context, struct acm_msg *msg, uint64_t id); int (*query)(void *addr_context, struct acm_msg *msg, uint64_t id); @@ -102,4 +104,16 @@ extern enum ibv_rate acm_get_rate(uint8_t width, uint8_t speed); extern enum ibv_mtu acm_convert_mtu(int mtu); extern enum ibv_rate acm_convert_rate(int rate); +struct acm_sa_mad { + void *context; + struct ib_user_mad umad; + struct umad_sa_packet sa_mad; /* must follow umad and be 64-bit aligned */ +}; + +extern struct acm_sa_mad * +acm_alloc_sa_mad(struct acm_endpoint *endpoint, void *context, + void (*handler)(struct acm_sa_mad *)); +extern void acm_free_sa_mad(struct acm_sa_mad *mad); +extern int acm_send_sa_mad(struct acm_sa_mad *mad); + #endif /* ACM_PROV_H */ diff --git a/src/acm.c b/src/acm.c index 5a01349..8fcf36d 100644 --- a/src/acm.c +++ b/src/acm.c @@ -55,6 +55,7 @@ #include #include #include +#include #include "acm_mad.h" #include "acm_util.h" @@ -87,6 +88,12 @@ struct acmc_port { struct acm_port port; struct acm_provider *prov; /* limit to 1 provider per port for now */ void *prov_port_context; + int mad_portid; + int mad_agentid; + struct ib_mad_addr sa_addr; + DLIST_ENTRY sa_pending; + DLIST_ENTRY sa_wait; + int sa_credits; lock_t lock; DLIST_ENTRY ep_list; enum ibv_port_state state; @@ -132,6 +139,13 @@ union socket_addr { struct sockaddr_in6 sin6; }; +struct acmc_sa_req { + DLIST_ENTRY entry; + struct acmc_ep *ep; + void (*resp_handler)(struct acm_sa_mad *); + struct acm_sa_mad mad; +}; + static char def_prov_name[ACM_PROV_NAME_SIZE] = "ibacmp"; static DLIST_ENTRY provider_list; static struct acmc_prov *def_provider = NULL; @@ -154,6 +168,16 @@ static int acm_ep_insert_addr(struct acmc_ep *ep, const char *name, uint8_t *add size_t addr_len, uint8_t addr_type); static void acm_event_handler(struct acmc_device *dev); +static struct sa_data { + int timeout; + int retries; + int depth; + pthread_t thread_id; + struct pollfd *fds; + struct acmc_port **ports; + int nfds; +} sa = { 2000, 2, 1}; + /* * Service options - may be set through ibacm_opts.cfg file. */ @@ -1681,8 +1705,11 @@ static void acm_port_up(struct acmc_port *port) if (port->gid_cnt == 0) port->base_gid = gid; } + port->lid = attr.lid; port->lid_mask = 0xffff - ((1 << attr.lmc) - 1); + port->sa_addr.lid = attr.sm_lid; + port->sa_addr.sl = attr.sm_sl; port->state = IBV_PORT_ACTIVE; acm_assign_provider(port); dev_ctx = acm_acquire_prov_context(&port->dev->prov_dev_context_list, @@ -1830,6 +1857,21 @@ acm_open_port(struct acmc_port *port, struct acmc_device *dev, uint8_t port_num) port->port.port_num = port_num; lock_init(&port->lock); DListInit(&port->ep_list); + DListInit(&port->sa_pending); + DListInit(&port->sa_wait); + port->sa_credits = sa.depth; + port->sa_addr.qpn = htonl(1); + port->sa_addr.qkey = htonl(ACM_QKEY); + + port->mad_portid = umad_open_port(dev->device.verbs->device->name, port_num); + if (port->mad_portid < 0) + acm_log(0, "ERROR - unable to open MAD port\n"); + + port->mad_agentid = umad_register(port->mad_portid, + IB_MGMT_CLASS_SA, 1, 1, NULL); + if (port->mad_agentid < 0) + acm_log(0, "ERROR - unable to register MAD client\n"); + port->prov = NULL; port->state = IBV_PORT_DOWN; } @@ -2064,6 +2106,224 @@ static void acm_close_providers(void) } } +static int acmc_init_sa_fds(void) +{ + struct acmc_device *dev; + DLIST_ENTRY *dev_entry; + int ret, p, i = 0; + + for (dev_entry = dev_list.Next; dev_entry != &dev_list; + dev_entry = dev_entry->Next) { + dev = container_of(dev_entry, struct acmc_device, entry); + sa.nfds += dev->port_cnt; + } + + sa.fds = calloc(sa.nfds, sizeof(*sa.fds)); + sa.ports = calloc(sa.nfds, sizeof(*sa.ports)); + if (!sa.fds || !sa.ports) + return -ENOMEM; + + for (dev_entry = dev_list.Next; dev_entry != &dev_list; + dev_entry = dev_entry->Next) { + dev = container_of(dev_entry, struct acmc_device, entry); + for (p = 0; p < dev->port_cnt; p++) { + sa.fds[i].fd = umad_get_fd(dev->port[p].mad_portid); + sa.fds[i].events = POLLIN; + ret = fcntl(sa.fds[i].fd, F_SETFL, O_NONBLOCK); + if (ret) + acm_log(0, "WARNDING - umad fd is blocking\n"); + + sa.ports[i++] = &dev->port[p]; + } + } + + return 0; +} + +struct acm_sa_mad * +acm_alloc_sa_mad(struct acm_endpoint *endpoint, void *context, + void (*handler)(struct acm_sa_mad *)) +{ + struct acmc_sa_req *req; + + req = calloc(1, sizeof (*req)); + if (!req) { + acm_log(0, "Error: failed to allocate sa request\n"); + return NULL; + } + + req->ep = container_of(endpoint, struct acmc_ep, endpoint); + req->mad.context = context; + req->resp_handler = handler; + + acm_log(2, "%p\n", req); + return &req->mad; +} + +void acm_free_sa_mad(struct acm_sa_mad *mad) +{ + struct acmc_sa_req *req; + req = container_of(mad, struct acmc_sa_req, mad); + acm_log(2, "%p\n", req); + free(req); +} + +int acm_send_sa_mad(struct acm_sa_mad *mad) +{ + struct acmc_port *port; + struct acmc_sa_req *req; + int ret; + + req = container_of(mad, struct acmc_sa_req, mad); + acm_log(2, "%p from %s\n", req, req->ep->addr_info[0].addr.id_string); + + port = req->ep->port; + mad->umad.addr.qpn = port->sa_addr.qpn; + mad->umad.addr.qkey = port->sa_addr.qkey; + mad->umad.addr.lid = port->sa_addr.lid; + mad->umad.addr.sl = port->sa_addr.sl; + // TODO: mad->umad.addr.pkey_index = req->ep->?; + + lock_acquire(&port->lock); + if (port->sa_credits && DListEmpty(&port->sa_wait)) { + ret = umad_send(port->mad_portid, port->mad_agentid, &mad->umad, + sizeof mad->sa_mad, sa.timeout, sa.retries); + if (!ret) { + port->sa_credits--; + DListInsertTail(&req->entry, &port->sa_pending); + } + } else { + ret = 0; + DListInsertTail(&req->entry, &port->sa_wait); + } + lock_release(&port->lock); + return ret; +} + +static void acmc_send_queued_req(struct acmc_port *port) +{ + struct acmc_sa_req *req; + int ret; + + lock_acquire(&port->lock); + if (DListEmpty(&port->sa_wait) || !port->sa_credits) { + lock_release(&port->lock); + return; + } + + req = container_of(port->sa_wait.Next, struct acmc_sa_req, entry); + DListRemove(&req->entry); + ret = umad_send(port->mad_portid, port->mad_agentid, &req->mad.umad, + sizeof req->mad.sa_mad, sa.timeout, sa.retries); + if (!ret) { + port->sa_credits--; + DListInsertTail(&req->entry, &port->sa_pending); + } + lock_release(&port->lock); + + if (ret) { + req->mad.umad.status = -ret; + req->resp_handler(&req->mad); + } +} + +static void acmc_recv_mad(struct acmc_port *port) +{ + struct acmc_sa_req *req; + struct acm_sa_mad resp; + DLIST_ENTRY *entry; + int ret, len, found; + struct umad_hdr *hdr; + + acm_log(2, "\n"); + len = sizeof(resp.sa_mad); + ret = umad_recv(port->mad_portid, &resp.umad, &len, 0); + if (ret < 0) { + acm_log(1, "umad_recv error %d\n", ret); + return; + } + + hdr = &resp.sa_mad.mad_hdr; + acm_log(2, "bv %x cls %x cv %x mtd %x st %d tid %llx at %x atm %x\n", + hdr->base_version, hdr->mgmt_class, hdr->class_version, + hdr->method, hdr->status, hdr->tid, hdr->attr_id, hdr->attr_mod); + found = 0; + lock_acquire(&port->lock); + for (entry = port->sa_pending.Next; entry != &port->sa_pending; + entry = entry->Next) { + req = container_of(entry, struct acmc_sa_req, entry); + /* The lower 32-bit of the tid is used for agentid in umad */ + if (req->mad.sa_mad.mad_hdr.tid == (hdr->tid & 0xFFFFFFFF00000000)) { + found = 1; + DListRemove(entry); + port->sa_credits++; + break; + } + } + lock_release(&port->lock); + + if (found) { + memcpy(&req->mad.umad, &resp.umad, sizeof(resp.umad) + len); + req->resp_handler(&req->mad); + } +} + +static void *acm_sa_handler(void *context) +{ + int i, ret; + + acm_log(0, "started\n"); + ret = acmc_init_sa_fds(); + if (ret) { + acm_log(0, "ERROR - failed to init fds\n"); + return NULL; + } + + if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) { + acm_log(0, "Error: failed to set cancel type \n"); + return NULL; + } + + if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) { + acm_log(0, "Error: failed to set cancel state\n"); + return NULL; + } + + for (;;) { + pthread_testcancel(); + ret = poll(sa.fds, sa.nfds, -1); + if (ret < 0) { + acm_log(0, "ERROR - sa poll error: %d\n", errno); + continue; + } + + for (i = 0; i < sa.nfds; i++) { + if (!sa.fds[i].revents) + continue; + + if (sa.fds[i].revents & POLLIN) { + acmc_recv_mad(sa.ports[i]); + acmc_send_queued_req(sa.ports[i]); + } + sa.fds[i].revents = 0; + } + } + return NULL; +} + +static void acm_stop_sa_handler(void) +{ + if (pthread_cancel(sa.thread_id)) { + acm_log(0, "Error: failed to cancel sa resp thread \n"); + return; + } + + if (pthread_join(sa.thread_id, NULL)) { + acm_log(0, "Error: failed to join sa resp thread\n"); + return; + } +} + static void acm_set_options(void) { FILE *f; @@ -2092,6 +2352,12 @@ static void acm_set_options(void) strcpy(prov_lib_path, value); else if (!stricmp("support_ips_in_addr_cfg", opt)) support_ips_in_addr_cfg = atoi(value); + else if (!stricmp("timeout", opt)) + sa.timeout = atoi(value); + else if (!stricmp("retries", opt)) + sa.retries = atoi(value); + else if (!stricmp("sa_depth", opt)) + sa.depth = atoi(value); } fclose(f); @@ -2103,6 +2369,9 @@ static void acm_log_options(void) acm_log(0, "log level %d\n", log_level); acm_log(0, "lock file %s\n", lock_file); acm_log(0, "server_port %d\n", server_port); + acm_log(0, "timeout %d ms\n", sa.timeout); + acm_log(0, "retries %d\n", sa.retries); + acm_log(0, "sa depth %d\n", sa.depth); acm_log(0, "provider lib path %s\n", prov_lib_path); acm_log(0, "support IP's in ibacm_addr.cfg %d\n", support_ips_in_addr_cfg); } @@ -2225,6 +2494,7 @@ int CDECL_FUNC main(int argc, char **argv) for (i = 0; i < ACM_MAX_COUNTER; i++) atomic_init(&counter[i]); + umad_init(); if (acm_open_providers()) { acm_log(0, "ERROR - unable to open any providers\n"); return -1; @@ -2238,12 +2508,19 @@ int CDECL_FUNC main(int argc, char **argv) acm_log(1, "creating IP Netlink socket\n"); acm_ipnl_create(); + acm_log(1, "starting sa response receiving thread\n"); + if (pthread_create(&sa.thread_id, NULL, acm_sa_handler, NULL)) { + acm_log(0, "Error: failed to create sa resp rcving thread"); + return -1; + } acm_activate_devices(); acm_log(1, "starting server\n"); acm_server(); acm_log(0, "shutting down\n"); acm_close_providers(); + acm_stop_sa_handler(); + umad_done(); fclose(flog); return 0; } -- 2.41.0