#include <netinet/in.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>
+#include <poll.h>
#include "acm_mad.h"
#include "acm_util.h"
struct acm_port port;
struct acm_provider *prov; /* limit to 1 provider per port for now */
void *prov_port_context;
+ int mad_portid;
+ int mad_agentid;
+ struct ib_mad_addr sa_addr;
+ DLIST_ENTRY sa_pending;
+ DLIST_ENTRY sa_wait;
+ int sa_credits;
lock_t lock;
DLIST_ENTRY ep_list;
enum ibv_port_state state;
struct sockaddr_in6 sin6;
};
+struct acmc_sa_req {
+ DLIST_ENTRY entry;
+ struct acmc_ep *ep;
+ void (*resp_handler)(struct acm_sa_mad *);
+ struct acm_sa_mad mad;
+};
+
static char def_prov_name[ACM_PROV_NAME_SIZE] = "ibacmp";
static DLIST_ENTRY provider_list;
static struct acmc_prov *def_provider = NULL;
size_t addr_len, uint8_t addr_type);
static void acm_event_handler(struct acmc_device *dev);
+static struct sa_data {
+ int timeout;
+ int retries;
+ int depth;
+ pthread_t thread_id;
+ struct pollfd *fds;
+ struct acmc_port **ports;
+ int nfds;
+} sa = { 2000, 2, 1};
+
/*
* Service options - may be set through ibacm_opts.cfg file.
*/
if (port->gid_cnt == 0)
port->base_gid = gid;
}
+
port->lid = attr.lid;
port->lid_mask = 0xffff - ((1 << attr.lmc) - 1);
+ port->sa_addr.lid = attr.sm_lid;
+ port->sa_addr.sl = attr.sm_sl;
port->state = IBV_PORT_ACTIVE;
acm_assign_provider(port);
dev_ctx = acm_acquire_prov_context(&port->dev->prov_dev_context_list,
port->port.port_num = port_num;
lock_init(&port->lock);
DListInit(&port->ep_list);
+ DListInit(&port->sa_pending);
+ DListInit(&port->sa_wait);
+ port->sa_credits = sa.depth;
+ port->sa_addr.qpn = htonl(1);
+ port->sa_addr.qkey = htonl(ACM_QKEY);
+
+ port->mad_portid = umad_open_port(dev->device.verbs->device->name, port_num);
+ if (port->mad_portid < 0)
+ acm_log(0, "ERROR - unable to open MAD port\n");
+
+ port->mad_agentid = umad_register(port->mad_portid,
+ IB_MGMT_CLASS_SA, 1, 1, NULL);
+ if (port->mad_agentid < 0)
+ acm_log(0, "ERROR - unable to register MAD client\n");
+
port->prov = NULL;
port->state = IBV_PORT_DOWN;
}
}
}
+static int acmc_init_sa_fds(void)
+{
+ struct acmc_device *dev;
+ DLIST_ENTRY *dev_entry;
+ int ret, p, i = 0;
+
+ for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+ dev_entry = dev_entry->Next) {
+ dev = container_of(dev_entry, struct acmc_device, entry);
+ sa.nfds += dev->port_cnt;
+ }
+
+ sa.fds = calloc(sa.nfds, sizeof(*sa.fds));
+ sa.ports = calloc(sa.nfds, sizeof(*sa.ports));
+ if (!sa.fds || !sa.ports)
+ return -ENOMEM;
+
+ for (dev_entry = dev_list.Next; dev_entry != &dev_list;
+ dev_entry = dev_entry->Next) {
+ dev = container_of(dev_entry, struct acmc_device, entry);
+ for (p = 0; p < dev->port_cnt; p++) {
+ sa.fds[i].fd = umad_get_fd(dev->port[p].mad_portid);
+ sa.fds[i].events = POLLIN;
+ ret = fcntl(sa.fds[i].fd, F_SETFL, O_NONBLOCK);
+ if (ret)
+ acm_log(0, "WARNDING - umad fd is blocking\n");
+
+ sa.ports[i++] = &dev->port[p];
+ }
+ }
+
+ return 0;
+}
+
+struct acm_sa_mad *
+acm_alloc_sa_mad(struct acm_endpoint *endpoint, void *context,
+ void (*handler)(struct acm_sa_mad *))
+{
+ struct acmc_sa_req *req;
+
+ req = calloc(1, sizeof (*req));
+ if (!req) {
+ acm_log(0, "Error: failed to allocate sa request\n");
+ return NULL;
+ }
+
+ req->ep = container_of(endpoint, struct acmc_ep, endpoint);
+ req->mad.context = context;
+ req->resp_handler = handler;
+
+ acm_log(2, "%p\n", req);
+ return &req->mad;
+}
+
+void acm_free_sa_mad(struct acm_sa_mad *mad)
+{
+ struct acmc_sa_req *req;
+ req = container_of(mad, struct acmc_sa_req, mad);
+ acm_log(2, "%p\n", req);
+ free(req);
+}
+
+int acm_send_sa_mad(struct acm_sa_mad *mad)
+{
+ struct acmc_port *port;
+ struct acmc_sa_req *req;
+ int ret;
+
+ req = container_of(mad, struct acmc_sa_req, mad);
+ acm_log(2, "%p from %s\n", req, req->ep->addr_info[0].addr.id_string);
+
+ port = req->ep->port;
+ mad->umad.addr.qpn = port->sa_addr.qpn;
+ mad->umad.addr.qkey = port->sa_addr.qkey;
+ mad->umad.addr.lid = port->sa_addr.lid;
+ mad->umad.addr.sl = port->sa_addr.sl;
+ // TODO: mad->umad.addr.pkey_index = req->ep->?;
+
+ lock_acquire(&port->lock);
+ if (port->sa_credits && DListEmpty(&port->sa_wait)) {
+ ret = umad_send(port->mad_portid, port->mad_agentid, &mad->umad,
+ sizeof mad->sa_mad, sa.timeout, sa.retries);
+ if (!ret) {
+ port->sa_credits--;
+ DListInsertTail(&req->entry, &port->sa_pending);
+ }
+ } else {
+ ret = 0;
+ DListInsertTail(&req->entry, &port->sa_wait);
+ }
+ lock_release(&port->lock);
+ return ret;
+}
+
+static void acmc_send_queued_req(struct acmc_port *port)
+{
+ struct acmc_sa_req *req;
+ int ret;
+
+ lock_acquire(&port->lock);
+ if (DListEmpty(&port->sa_wait) || !port->sa_credits) {
+ lock_release(&port->lock);
+ return;
+ }
+
+ req = container_of(port->sa_wait.Next, struct acmc_sa_req, entry);
+ DListRemove(&req->entry);
+ ret = umad_send(port->mad_portid, port->mad_agentid, &req->mad.umad,
+ sizeof req->mad.sa_mad, sa.timeout, sa.retries);
+ if (!ret) {
+ port->sa_credits--;
+ DListInsertTail(&req->entry, &port->sa_pending);
+ }
+ lock_release(&port->lock);
+
+ if (ret) {
+ req->mad.umad.status = -ret;
+ req->resp_handler(&req->mad);
+ }
+}
+
+static void acmc_recv_mad(struct acmc_port *port)
+{
+ struct acmc_sa_req *req;
+ struct acm_sa_mad resp;
+ DLIST_ENTRY *entry;
+ int ret, len, found;
+ struct umad_hdr *hdr;
+
+ acm_log(2, "\n");
+ len = sizeof(resp.sa_mad);
+ ret = umad_recv(port->mad_portid, &resp.umad, &len, 0);
+ if (ret < 0) {
+ acm_log(1, "umad_recv error %d\n", ret);
+ return;
+ }
+
+ hdr = &resp.sa_mad.mad_hdr;
+ acm_log(2, "bv %x cls %x cv %x mtd %x st %d tid %llx at %x atm %x\n",
+ hdr->base_version, hdr->mgmt_class, hdr->class_version,
+ hdr->method, hdr->status, hdr->tid, hdr->attr_id, hdr->attr_mod);
+ found = 0;
+ lock_acquire(&port->lock);
+ for (entry = port->sa_pending.Next; entry != &port->sa_pending;
+ entry = entry->Next) {
+ req = container_of(entry, struct acmc_sa_req, entry);
+ /* The lower 32-bit of the tid is used for agentid in umad */
+ if (req->mad.sa_mad.mad_hdr.tid == (hdr->tid & 0xFFFFFFFF00000000)) {
+ found = 1;
+ DListRemove(entry);
+ port->sa_credits++;
+ break;
+ }
+ }
+ lock_release(&port->lock);
+
+ if (found) {
+ memcpy(&req->mad.umad, &resp.umad, sizeof(resp.umad) + len);
+ req->resp_handler(&req->mad);
+ }
+}
+
+static void *acm_sa_handler(void *context)
+{
+ int i, ret;
+
+ acm_log(0, "started\n");
+ ret = acmc_init_sa_fds();
+ if (ret) {
+ acm_log(0, "ERROR - failed to init fds\n");
+ return NULL;
+ }
+
+ if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) {
+ acm_log(0, "Error: failed to set cancel type \n");
+ return NULL;
+ }
+
+ if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) {
+ acm_log(0, "Error: failed to set cancel state\n");
+ return NULL;
+ }
+
+ for (;;) {
+ pthread_testcancel();
+ ret = poll(sa.fds, sa.nfds, -1);
+ if (ret < 0) {
+ acm_log(0, "ERROR - sa poll error: %d\n", errno);
+ continue;
+ }
+
+ for (i = 0; i < sa.nfds; i++) {
+ if (!sa.fds[i].revents)
+ continue;
+
+ if (sa.fds[i].revents & POLLIN) {
+ acmc_recv_mad(sa.ports[i]);
+ acmc_send_queued_req(sa.ports[i]);
+ }
+ sa.fds[i].revents = 0;
+ }
+ }
+ return NULL;
+}
+
+static void acm_stop_sa_handler(void)
+{
+ if (pthread_cancel(sa.thread_id)) {
+ acm_log(0, "Error: failed to cancel sa resp thread \n");
+ return;
+ }
+
+ if (pthread_join(sa.thread_id, NULL)) {
+ acm_log(0, "Error: failed to join sa resp thread\n");
+ return;
+ }
+}
+
static void acm_set_options(void)
{
FILE *f;
strcpy(prov_lib_path, value);
else if (!stricmp("support_ips_in_addr_cfg", opt))
support_ips_in_addr_cfg = atoi(value);
+ else if (!stricmp("timeout", opt))
+ sa.timeout = atoi(value);
+ else if (!stricmp("retries", opt))
+ sa.retries = atoi(value);
+ else if (!stricmp("sa_depth", opt))
+ sa.depth = atoi(value);
}
fclose(f);
acm_log(0, "log level %d\n", log_level);
acm_log(0, "lock file %s\n", lock_file);
acm_log(0, "server_port %d\n", server_port);
+ acm_log(0, "timeout %d ms\n", sa.timeout);
+ acm_log(0, "retries %d\n", sa.retries);
+ acm_log(0, "sa depth %d\n", sa.depth);
acm_log(0, "provider lib path %s\n", prov_lib_path);
acm_log(0, "support IP's in ibacm_addr.cfg %d\n", support_ips_in_addr_cfg);
}
for (i = 0; i < ACM_MAX_COUNTER; i++)
atomic_init(&counter[i]);
+ umad_init();
if (acm_open_providers()) {
acm_log(0, "ERROR - unable to open any providers\n");
return -1;
acm_log(1, "creating IP Netlink socket\n");
acm_ipnl_create();
+ acm_log(1, "starting sa response receiving thread\n");
+ if (pthread_create(&sa.thread_id, NULL, acm_sa_handler, NULL)) {
+ acm_log(0, "Error: failed to create sa resp rcving thread");
+ return -1;
+ }
acm_activate_devices();
acm_log(1, "starting server\n");
acm_server();
acm_log(0, "shutting down\n");
acm_close_providers();
+ acm_stop_sa_handler();
+ umad_done();
fclose(flog);
return 0;
}