]> git.openfabrics.org - ~shefty/ibacm.git/commitdiff
ibacm: Centralize provider SA access in core.
authorSean Hefty <sean.hefty@intel.com>
Wed, 4 Jun 2014 18:39:36 +0000 (11:39 -0700)
committerSean Hefty <sean.hefty@intel.com>
Wed, 4 Jun 2014 18:39:36 +0000 (11:39 -0700)
This patch is the first of a series to centralize the SA access for all
providers.

Here is the description of the approach:
(1) A provider will call the core to allocate a request and initialize
    the request (including the callback function);
(2) Provider calls the core to send the SA request;
(3) The core SA handler thread will poll for any SA response;
(4) When a SA response is received, the core SA handler will find the
    matching request and call the corresponding provider callback
    function;
(5) The provider callback will handle the SA response and free the
    SA request.

The approach has the following advantages:
(1) No code duplication in the provider for SA access;
(2) No need for a separate thread in the provider to wait for SA response.

Signed-off-by: Kaike Wan <kaike.wan@intel.com>
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
include/infiniband/acm_prov.h
src/acm.c

index 9ec407e43678c47febcaf8d1223758fb88cd99ca..d803d957d12aab8136cf5743e117c5150aba6bd9 100644 (file)
@@ -31,6 +31,8 @@
 #define ACM_PROV_H
 
 #include <infiniband/acm.h>
+#include <infiniband/umad.h>
+#include <infiniband/umad_sa.h>
 
 #define ACM_PROV_VERSION          1
 
@@ -70,7 +72,7 @@ struct acm_provider {
                        void *port_context, void **ep_context);
        void    (*close_endpoint)(void *ep_context);
        int     (*add_address)(const struct acm_address *addr, void *ep_context, 
-                              void **addr_context);
+                       void **addr_context);
        void    (*remove_address)(void *addr_context, struct acm_address *addr);
        int     (*resolve)(void *addr_context, struct acm_msg *msg, uint64_t id);
        int     (*query)(void *addr_context, struct acm_msg *msg, uint64_t id);
@@ -102,4 +104,16 @@ extern enum ibv_rate acm_get_rate(uint8_t width, uint8_t speed);
 extern enum ibv_mtu acm_convert_mtu(int mtu);
 extern enum ibv_rate acm_convert_rate(int rate);
 
+struct acm_sa_mad {
+       void                    *context;
+       struct ib_user_mad      umad;
+       struct umad_sa_packet   sa_mad; /* must follow umad and be 64-bit aligned */
+};
+
+extern struct acm_sa_mad *
+acm_alloc_sa_mad(struct acm_endpoint *endpoint, void *context,
+                void (*handler)(struct acm_sa_mad *));
+extern void acm_free_sa_mad(struct acm_sa_mad *mad);
+extern int acm_send_sa_mad(struct acm_sa_mad *mad);
+
 #endif /* ACM_PROV_H */
index 5a0134931056f634455a79bade4de6febcce32d6..8fcf36dc15d6ca60dc615aabeb23ad198fefa867 100644 (file)
--- a/src/acm.c
+++ b/src/acm.c
@@ -55,6 +55,7 @@
 #include <netinet/in.h>
 #include <linux/netlink.h>
 #include <linux/rtnetlink.h>
+#include <poll.h>
 #include "acm_mad.h"
 #include "acm_util.h"
 
@@ -87,6 +88,12 @@ struct acmc_port {
        struct acm_port     port;
        struct acm_provider *prov; /* limit to 1 provider per port for now */
        void                *prov_port_context;
+       int                 mad_portid;
+       int                 mad_agentid;
+       struct ib_mad_addr  sa_addr;
+       DLIST_ENTRY         sa_pending;
+       DLIST_ENTRY         sa_wait;
+       int                 sa_credits;
        lock_t              lock;
        DLIST_ENTRY         ep_list;
        enum ibv_port_state state;
@@ -132,6 +139,13 @@ union socket_addr {
        struct sockaddr_in6 sin6;
 };
 
+struct acmc_sa_req {
+       DLIST_ENTRY             entry;
+       struct acmc_ep          *ep;
+       void                    (*resp_handler)(struct acm_sa_mad *);
+       struct acm_sa_mad       mad;
+};
+
 static char def_prov_name[ACM_PROV_NAME_SIZE] = "ibacmp";
 static DLIST_ENTRY provider_list;
 static struct acmc_prov *def_provider = NULL;
@@ -154,6 +168,16 @@ static int acm_ep_insert_addr(struct acmc_ep *ep, const char *name, uint8_t *add
                              size_t addr_len, uint8_t addr_type);
 static void acm_event_handler(struct acmc_device *dev);
 
+static struct sa_data {
+       int             timeout;
+       int             retries;
+       int             depth;
+       pthread_t       thread_id;
+       struct pollfd   *fds;
+       struct acmc_port **ports;
+       int             nfds;
+} sa = { 2000, 2, 1};
+
 /*
  * Service options - may be set through ibacm_opts.cfg file.
  */
@@ -1681,8 +1705,11 @@ static void acm_port_up(struct acmc_port *port)
                if (port->gid_cnt == 0)
                        port->base_gid = gid;
        }
+
        port->lid = attr.lid;
        port->lid_mask = 0xffff - ((1 << attr.lmc) - 1);
+       port->sa_addr.lid = attr.sm_lid;
+       port->sa_addr.sl = attr.sm_sl;
        port->state = IBV_PORT_ACTIVE;
        acm_assign_provider(port);
        dev_ctx = acm_acquire_prov_context(&port->dev->prov_dev_context_list, 
@@ -1830,6 +1857,21 @@ acm_open_port(struct acmc_port *port, struct acmc_device *dev, uint8_t port_num)
        port->port.port_num = port_num;
        lock_init(&port->lock);
        DListInit(&port->ep_list);
+       DListInit(&port->sa_pending);
+       DListInit(&port->sa_wait);
+       port->sa_credits = sa.depth;
+       port->sa_addr.qpn = htonl(1);
+       port->sa_addr.qkey = htonl(ACM_QKEY);
+
+       port->mad_portid = umad_open_port(dev->device.verbs->device->name, port_num);
+       if (port->mad_portid < 0)
+               acm_log(0, "ERROR - unable to open MAD port\n");
+
+       port->mad_agentid = umad_register(port->mad_portid,
+                                         IB_MGMT_CLASS_SA, 1, 1, NULL);
+       if (port->mad_agentid < 0)
+               acm_log(0, "ERROR - unable to register MAD client\n");
+
        port->prov = NULL;
        port->state = IBV_PORT_DOWN;
 }
@@ -2064,6 +2106,224 @@ static void acm_close_providers(void)
        }
 }
 
+static int acmc_init_sa_fds(void)
+{
+       struct acmc_device *dev;
+       DLIST_ENTRY *dev_entry;
+       int ret, p, i = 0;
+
+       for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+            dev_entry = dev_entry->Next) {
+               dev = container_of(dev_entry, struct acmc_device, entry);
+               sa.nfds += dev->port_cnt;
+       }
+
+       sa.fds = calloc(sa.nfds, sizeof(*sa.fds));
+       sa.ports = calloc(sa.nfds, sizeof(*sa.ports));
+       if (!sa.fds || !sa.ports)
+               return -ENOMEM;
+
+       for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+            dev_entry = dev_entry->Next) {
+               dev = container_of(dev_entry, struct acmc_device, entry);
+               for (p = 0; p < dev->port_cnt; p++) {
+                       sa.fds[i].fd = umad_get_fd(dev->port[p].mad_portid);
+                       sa.fds[i].events = POLLIN;
+                       ret = fcntl(sa.fds[i].fd, F_SETFL, O_NONBLOCK);
+                       if (ret)
+                               acm_log(0, "WARNDING - umad fd is blocking\n");
+
+                       sa.ports[i++] = &dev->port[p];
+               }
+       }
+
+       return 0;
+}
+
+struct acm_sa_mad *
+acm_alloc_sa_mad(struct acm_endpoint *endpoint, void *context,
+                void (*handler)(struct acm_sa_mad *))
+{
+       struct acmc_sa_req *req;
+
+       req = calloc(1, sizeof (*req));
+       if (!req) {
+               acm_log(0, "Error: failed to allocate sa request\n");
+               return NULL;
+       }
+
+       req->ep = container_of(endpoint, struct acmc_ep, endpoint);
+       req->mad.context = context;
+       req->resp_handler = handler;
+
+       acm_log(2, "%p\n", req);
+       return &req->mad;
+}
+
+void acm_free_sa_mad(struct acm_sa_mad *mad)
+{
+       struct acmc_sa_req *req;
+       req = container_of(mad, struct acmc_sa_req, mad);
+       acm_log(2, "%p\n", req);
+       free(req);
+}
+
+int acm_send_sa_mad(struct acm_sa_mad *mad)
+{
+       struct acmc_port *port;
+       struct acmc_sa_req *req;
+       int ret;
+
+       req = container_of(mad, struct acmc_sa_req, mad);
+       acm_log(2, "%p from %s\n", req, req->ep->addr_info[0].addr.id_string);
+
+       port = req->ep->port;
+       mad->umad.addr.qpn = port->sa_addr.qpn;
+       mad->umad.addr.qkey = port->sa_addr.qkey;
+       mad->umad.addr.lid = port->sa_addr.lid;
+       mad->umad.addr.sl = port->sa_addr.sl;
+       // TODO: mad->umad.addr.pkey_index = req->ep->?;
+
+       lock_acquire(&port->lock);
+       if (port->sa_credits && DListEmpty(&port->sa_wait)) {
+               ret = umad_send(port->mad_portid, port->mad_agentid, &mad->umad,
+                               sizeof mad->sa_mad, sa.timeout, sa.retries);
+               if (!ret) {
+                       port->sa_credits--;
+                       DListInsertTail(&req->entry, &port->sa_pending);
+               }
+       } else {
+               ret = 0;
+               DListInsertTail(&req->entry, &port->sa_wait);
+       }
+       lock_release(&port->lock);
+       return ret;
+}
+
+static void acmc_send_queued_req(struct acmc_port *port)
+{
+       struct acmc_sa_req *req;
+       int ret;
+
+       lock_acquire(&port->lock);
+       if (DListEmpty(&port->sa_wait) || !port->sa_credits) {
+               lock_release(&port->lock);
+               return;
+       }
+
+       req = container_of(port->sa_wait.Next, struct acmc_sa_req, entry);
+       DListRemove(&req->entry);
+       ret = umad_send(port->mad_portid, port->mad_agentid, &req->mad.umad,
+                       sizeof req->mad.sa_mad, sa.timeout, sa.retries);
+       if (!ret) {
+               port->sa_credits--;
+               DListInsertTail(&req->entry, &port->sa_pending);
+       }
+       lock_release(&port->lock);
+
+       if (ret) {
+               req->mad.umad.status = -ret;
+               req->resp_handler(&req->mad);
+       }
+}
+
+static void acmc_recv_mad(struct acmc_port *port)
+{
+       struct acmc_sa_req *req;
+       struct acm_sa_mad resp;
+       DLIST_ENTRY *entry;
+       int ret, len, found;
+       struct umad_hdr *hdr;
+
+       acm_log(2, "\n");
+       len = sizeof(resp.sa_mad);
+       ret = umad_recv(port->mad_portid, &resp.umad, &len, 0);
+       if (ret < 0) {
+               acm_log(1, "umad_recv error %d\n", ret);
+               return;
+       }
+
+       hdr = &resp.sa_mad.mad_hdr;
+       acm_log(2, "bv %x cls %x cv %x mtd %x st %d tid %llx at %x atm %x\n",
+               hdr->base_version, hdr->mgmt_class, hdr->class_version,
+               hdr->method, hdr->status, hdr->tid, hdr->attr_id, hdr->attr_mod);
+       found = 0;
+       lock_acquire(&port->lock);
+       for (entry = port->sa_pending.Next; entry != &port->sa_pending;
+            entry = entry->Next) {
+               req = container_of(entry, struct acmc_sa_req, entry);
+               /* The lower 32-bit of the tid is used for agentid in umad */
+               if (req->mad.sa_mad.mad_hdr.tid == (hdr->tid & 0xFFFFFFFF00000000)) {
+                       found = 1;
+                       DListRemove(entry);
+                       port->sa_credits++;
+                       break;
+               }
+       }
+       lock_release(&port->lock);
+
+       if (found) {
+               memcpy(&req->mad.umad, &resp.umad, sizeof(resp.umad) + len);
+               req->resp_handler(&req->mad);
+       }
+}
+
+static void *acm_sa_handler(void *context)
+{
+       int i, ret;
+
+       acm_log(0, "started\n");
+       ret = acmc_init_sa_fds();
+       if (ret) {
+               acm_log(0, "ERROR - failed to init fds\n");
+               return NULL;
+       }
+
+       if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) {
+               acm_log(0, "Error: failed to set cancel type \n");
+               return NULL;
+       }
+
+       if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) {
+               acm_log(0, "Error: failed to set cancel state\n");
+               return NULL;
+       }
+
+       for (;;) {
+               pthread_testcancel();
+               ret = poll(sa.fds, sa.nfds, -1);
+               if (ret < 0) {
+                       acm_log(0, "ERROR - sa poll error: %d\n", errno);
+                       continue;
+               }
+
+               for (i = 0; i < sa.nfds; i++) {
+                       if (!sa.fds[i].revents)
+                               continue;
+
+                       if (sa.fds[i].revents & POLLIN) {
+                               acmc_recv_mad(sa.ports[i]);
+                               acmc_send_queued_req(sa.ports[i]);
+                       }
+                       sa.fds[i].revents = 0;
+               }
+       }
+       return NULL;
+}
+
+static void acm_stop_sa_handler(void)
+{
+       if (pthread_cancel(sa.thread_id)) {
+               acm_log(0, "Error: failed to cancel sa resp thread \n");
+               return;
+       }
+
+       if (pthread_join(sa.thread_id, NULL)) {
+               acm_log(0, "Error: failed to join sa resp thread\n");
+               return;
+       }
+}
+
 static void acm_set_options(void)
 {
        FILE *f;
@@ -2092,6 +2352,12 @@ static void acm_set_options(void)
                        strcpy(prov_lib_path, value);
                else if (!stricmp("support_ips_in_addr_cfg", opt))
                        support_ips_in_addr_cfg = atoi(value);
+               else if (!stricmp("timeout", opt))
+                       sa.timeout = atoi(value);
+               else if (!stricmp("retries", opt))
+                       sa.retries = atoi(value);
+               else if (!stricmp("sa_depth", opt))
+                       sa.depth = atoi(value);
        }
 
        fclose(f);
@@ -2103,6 +2369,9 @@ static void acm_log_options(void)
        acm_log(0, "log level %d\n", log_level);
        acm_log(0, "lock file %s\n", lock_file);
        acm_log(0, "server_port %d\n", server_port);
+       acm_log(0, "timeout %d ms\n", sa.timeout);
+       acm_log(0, "retries %d\n", sa.retries);
+       acm_log(0, "sa depth %d\n", sa.depth);
        acm_log(0, "provider lib path %s\n", prov_lib_path);
        acm_log(0, "support IP's in ibacm_addr.cfg %d\n", support_ips_in_addr_cfg);
 }
@@ -2225,6 +2494,7 @@ int CDECL_FUNC main(int argc, char **argv)
        for (i = 0; i < ACM_MAX_COUNTER; i++)
                atomic_init(&counter[i]);
 
+       umad_init();
        if (acm_open_providers()) {
                acm_log(0, "ERROR - unable to open any providers\n");
                return -1;
@@ -2238,12 +2508,19 @@ int CDECL_FUNC main(int argc, char **argv)
        acm_log(1, "creating IP Netlink socket\n");
        acm_ipnl_create();
 
+       acm_log(1, "starting sa response receiving thread\n");
+       if (pthread_create(&sa.thread_id, NULL, acm_sa_handler, NULL)) {
+               acm_log(0, "Error: failed to create sa resp rcving thread");
+               return -1;
+       }
        acm_activate_devices();
        acm_log(1, "starting server\n");
        acm_server();
 
        acm_log(0, "shutting down\n");
        acm_close_providers();
+       acm_stop_sa_handler();
+       umad_done();
        fclose(flog);
        return 0;
 }