uint8_t addr_type;
};
+struct acmc_prov_context {
+ DLIST_ENTRY entry;
+ atomic_t refcnt;
+ struct acm_provider *prov;
+ void *context;
+};
+
struct acmc_device;
struct acmc_port {
struct acmc_device *dev;
+ struct acm_port port;
struct acm_provider *prov; /* limit to 1 provider per port for now */
+ void *prov_port_context;
lock_t lock;
DLIST_ENTRY ep_list;
enum ibv_port_state state;
int gid_cnt;
uint16_t lid;
uint16_t lid_mask;
- uint8_t port_num;
};
struct acmp_device;
struct acmp_port {
- struct acmp_device *dev;
+ struct acmp_device *dev;
+ const struct acm_port *port;
DLIST_ENTRY ep_list;
lock_t lock;
int mad_portid;
};
struct acmc_device {
- struct ibv_context *verbs;
- uint64_t guid;
+ struct acm_device device;
DLIST_ENTRY entry;
+ DLIST_ENTRY prov_dev_context_list;
int port_cnt;
struct acmc_port port[0];
};
struct acmp_device {
struct ibv_context *verbs;
+ const struct acm_device *device;
struct ibv_comp_channel *channel;
struct ibv_pd *pd;
uint64_t guid;
DLIST_ENTRY entry;
+ pthread_t comp_thread_id;
int port_cnt;
struct acmp_port port[0];
};
struct acmc_ep {
struct acmc_port *port;
struct acm_endpoint endpoint;
+ void *prov_ep_context;
struct acmc_addr addr_info[MAX_EP_ADDR];
lock_t lock;
DLIST_ENTRY entry;
int mc_cnt;
uint16_t pkey_index;
uint16_t pkey;
- struct acm_endpoint *endpoint;
+ const struct acm_endpoint *endpoint;
lock_t lock;
struct acmp_send_queue resolve_queue;
struct acmp_send_queue sa_queue;
struct acm_msg msg;
};
-static int acmp_resolve(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
-static int acmp_query(struct acm_endpoint *ep, struct acm_msg *msg, uint64_t id);
-static void acm_event_handler(struct acmc_device *dev);
+static int acmp_open_dev(const struct acm_device *device, void **dev_context);
+static void acmp_close_dev(void *dev_context);
+static int acmp_open_port(const struct acm_port *port, void *dev_context,
+ void **port_context);
+static void acmp_close_port(void *port_context);
+static int acmp_open_endpoint(const struct acm_endpoint *endpoint,
+ void *port_context, void **ep_context);
+static void acmp_close_endpoint(void *ep_context);
+static int acmp_resolve(void *ep_context, struct acm_msg *msg, uint64_t id);
+static int acmp_query(void *ep_context, struct acm_msg *msg, uint64_t id);
+static int acmp_handle_event(void *port_context, enum ibv_event_type type);
static struct acm_provider def_prov = {
+ .open_device = acmp_open_dev,
+ .close_device = acmp_close_dev,
+ .open_port = acmp_open_port,
+ .close_port = acmp_close_port,
+ .open_endpoint = acmp_open_endpoint,
+ .close_endpoint = acmp_close_endpoint,
.resolve = acmp_resolve,
.query = acmp_query,
+ .handle_event = acmp_handle_event,
};
union socket_addr {
static DLIST_ENTRY dev_list;
static DLIST_ENTRY acmp_dev_list;
+static lock_t acmp_dev_lock;
static atomic_t tid;
static DLIST_ENTRY timeout_list;
static struct acmc_ep *acm_find_ep(struct acmc_port *port, uint16_t pkey);
static int acm_ep_insert_addr(struct acmc_ep *ep, const char *name, uint8_t *addr,
size_t addr_len, uint8_t addr_type);
+static void acm_event_handler(struct acmc_device *dev);
/*
* Service options - may be set through ibacm_opts.cfg file.
return ((gid->global.subnet_prefix | gid->global.interface_id) == 0);
}
+static struct acmc_prov_context *
+acm_alloc_prov_context(struct acm_provider *prov)
+{
+ struct acmc_prov_context *ctx;
+
+ ctx = calloc(1, sizeof(*ctx));
+ if (!ctx) {
+ acm_log(0, "Error: failed to allocate prov context\n");
+ return NULL;
+ }
+ atomic_set(&ctx->refcnt, 1);
+ ctx->prov = prov;
+ return ctx;
+}
+
+static struct acmc_prov_context *
+acm_get_prov_context(DLIST_ENTRY *list, struct acm_provider *prov)
+{
+ DLIST_ENTRY *entry;
+ struct acmc_prov_context *ctx;
+
+ for (entry = list->Next; entry != list; entry = entry->Next) {
+ ctx = container_of(entry, struct acmc_prov_context, entry);
+ if (ctx->prov == prov) {
+ return ctx;
+ }
+ }
+
+ return NULL;
+}
+
+static struct acmc_prov_context *
+acm_acquire_prov_context(DLIST_ENTRY *list, struct acm_provider *prov)
+{
+ struct acmc_prov_context *ctx;
+
+ ctx = acm_get_prov_context(list, prov);
+ if (!ctx) {
+ ctx = acm_alloc_prov_context(prov);
+ if (!ctx) {
+ acm_log(0, "Error -- failed to allocate provider context\n");
+ return NULL;
+ }
+ DListInsertTail(&ctx->entry, list);
+ } else {
+ atomic_inc(&ctx->refcnt);
+ }
+
+ return ctx;
+}
+
+static void
+acm_release_prov_context(struct acmc_prov_context *ctx)
+{
+ if (atomic_dec(&ctx->refcnt) <= 0) {
+ DListRemove(&ctx->entry);
+ free(ctx);
+ }
+}
+
static int acmp_compare_dest(const void *dest1, const void *dest2)
{
return memcmp(dest1, dest2, ACM_MAX_ADDRESS);
}
static struct acm_address *
-acm_addr_lookup(struct acm_endpoint *endpoint, uint8_t *addr, uint8_t addr_type)
+acm_addr_lookup(const struct acm_endpoint *endpoint, uint8_t *addr, uint8_t addr_type)
{
struct acmc_ep *ep;
int i;
acmp_complete_send((struct acmp_send_msg *) (uintptr_t) wc->wr_id);
}
-static void CDECL_FUNC acmp_comp_handler(void *context)
+static void *acmp_comp_handler(void *context)
{
struct acmp_device *dev = (struct acmp_device *) context;
struct acmp_ep *ep;
int cnt;
acm_log(1, "started\n");
+
+ if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL)) {
+ acm_log(0, "Error: failed to set cancel type for dev %s\n",
+ dev->verbs->device->name);
+ pthread_exit(NULL);
+ }
+
+ if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL)) {
+ acm_log(0, "Error: failed to set cancel state for dev %s\n",
+ dev->verbs->device->name);
+ pthread_exit(NULL);
+ }
while (1) {
+ pthread_testcancel();
ibv_get_cq_event(dev->channel, &cq, (void *) &ep);
cnt = 0;
ibv_ack_cq_events(cq, cnt);
}
+
+ return NULL;
}
static void acmp_format_mgid(union ibv_gid *mgid, uint16_t pkey, uint8_t tos,
umad->addr.path_bits = port->sa_dest.av.src_path_bits;
acm_log(0, "%s %d pkey 0x%x, sl 0x%x, rate 0x%x, mtu 0x%x\n",
- ep->port->dev->verbs->device->name, ep->port->port_num,
- ep->pkey, sl, rate, mtu);
+ ep->port->dev->verbs->device->name,
+ ep->port->port_num, ep->pkey, sl, rate, mtu);
ep->mc_dest[ep->mc_cnt].state = ACMP_INIT;
mad = (struct ib_sa_mad *) umad->data;
acmp_init_join(mad, port_gid, ep->pkey, tos, tclass, sl, rate, mtu);
acm_log(1, "join for %s complete\n", ep->id_string);
}
-static void acmp_port_join(struct acmp_port *port)
+static int acmp_port_join(void *port_context)
{
struct acmp_ep *ep;
DLIST_ENTRY *ep_entry;
+ struct acmp_port *port = port_context;
acm_log(1, "device %s port %d\n", port->dev->verbs->device->name,
port->port_num);
for (ep_entry = port->ep_list.Next; ep_entry != &port->ep_list;
ep_entry = ep_entry->Next) {
-
ep = container_of(ep_entry, struct acmp_ep, entry);
acmp_ep_join(ep);
}
acm_log(1, "joins for device %s port %d complete\n",
port->dev->verbs->device->name, port->port_num);
+
+ return 0;
+}
+
+static int acmp_handle_event(void *port_context, enum ibv_event_type type)
+{
+ int ret = 0;
+
+ acm_log(2, "event %s\n", ibv_event_type_str(type));
+
+ switch (type) {
+ case IBV_EVENT_CLIENT_REREGISTER:
+ ret = acmp_port_join(port_context);
+ break;
+ default:
+ break;
+ }
+ return ret;
}
static void acmp_process_timeouts(void)
}
}
+/* While the device/port/ep will not be freed, we need to be careful of
+ * their addition while walking the link lists. Therefore, we need to acquire
+ * the appropriate locks.
+ */
static void CDECL_FUNC acmp_retry_handler(void *context)
{
struct acmp_device *dev;
event_wait(&timeout_event, -1);
next_expire = -1;
+ lock_acquire(&acmp_dev_lock);
for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;
- dev_entry = dev_entry->Next) {
+ dev_entry = dev_entry->Next) {
dev = container_of(dev_entry, struct acmp_device, entry);
+ lock_release(&acmp_dev_lock);
for (i = 0; i < dev->port_cnt; i++) {
port = &dev->port[i];
+ lock_acquire(&port->lock);
for (ep_entry = port->ep_list.Next;
- ep_entry != &port->ep_list;
- ep_entry = ep_entry->Next) {
+ ep_entry != &port->ep_list;
+ ep_entry = ep_entry->Next) {
ep = container_of(ep_entry, struct acmp_ep, entry);
+ lock_release(&port->lock);
lock_acquire(&ep->lock);
if (!DListEmpty(&ep->wait_queue))
acmp_process_wait_queue(ep, &next_expire);
lock_release(&ep->lock);
+ lock_acquire(&port->lock);
}
+ lock_release(&port->lock);
}
+ lock_acquire(&acmp_dev_lock);
}
+ lock_release(&acmp_dev_lock);
acmp_process_timeouts();
wait = (int) (next_expire - time_stamp_ms());
uint8_t i;
if (!ib_any_gid(&path->sgid)) {
- return (acm_gid_index(port->dev->verbs, port->port_num,
- port->gid_cnt, &path->sgid) < port->gid_cnt);
+ return (acm_gid_index(port->dev->device.verbs,
+ port->port.port_num, port->gid_cnt,
+ &path->sgid) < port->gid_cnt);
}
if (path->slid) {
return 1;
}
- if (acm_gid_index(port->dev->verbs, port->port_num, port->gid_cnt,
- &path->dgid) < port->gid_cnt) {
+ if (acm_gid_index(port->dev->device.verbs, port->port.port_num,
+ port->gid_cnt, &path->dgid) < port->gid_cnt) {
return 1;
}
for (i = 0; i < port->gid_cnt; i++) {
- ibv_query_gid(port->dev->verbs, port->port_num, i, &gid);
+ ibv_query_gid(port->dev->device.verbs, port->port.port_num,
+ i, &gid);
if (gid.global.subnet_prefix == path->dgid.global.subnet_prefix) {
return 1;
}
return acm_query_response(client->index, msg, ACM_STATUS_ESRCADDR);
}
- return ep->port->prov->query(&ep->endpoint, msg, client->index);
+ return ep->port->prov->query(ep->prov_ep_context, msg, client->index);
}
static int
-acmp_query(struct acm_endpoint *endpoint, struct acm_msg *msg, uint64_t id)
+acmp_query(void *ep_context, struct acm_msg *msg, uint64_t id)
{
struct acmp_request *req;
struct acmp_send_msg *sa_msg;
struct ib_sa_mad *mad;
- struct acmp_ep *ep = endpoint->prov_context;
+ struct acmp_ep *ep = ep_context;
uint8_t status;
if (ep->state != ACMP_READY) {
ACM_STATUS_ESRCADDR);
}
- return ep->port->prov->resolve(&ep->endpoint, msg, client->index);
+ return ep->port->prov->resolve(ep->prov_ep_context, msg, client->index);
}
static int
ACM_STATUS_ESRCADDR);
}
- return ep->port->prov->resolve(&ep->endpoint, msg, client->index);
+ return ep->port->prov->resolve(ep->prov_ep_context, msg, client->index);
}
static int
}
static int
-acmp_resolve(struct acm_endpoint *endpoint, struct acm_msg *msg, uint64_t id)
+acmp_resolve(void *ep_context, struct acm_msg *msg, uint64_t id)
{
- struct acmp_ep *ep = endpoint->prov_context;
+ struct acmp_ep *ep = ep_context;
if (ep->state != ACMP_READY)
return acm_resolve_response(id, msg, NULL, ACM_STATUS_ENODATA);
for (dev_entry = dev_list.Next; dev_entry != &dev_list;
dev_entry = dev_entry->Next) {
dev = container_of(dev_entry, struct acmc_device, entry);
- FD_SET(dev->verbs->async_fd, &readfds);
- n = max(n, (int) dev->verbs->async_fd);
+ FD_SET(dev->device.verbs->async_fd, &readfds);
+ n = max(n, (int) dev->device.verbs->async_fd);
}
ret = select(n + 1, &readfds, NULL, NULL, NULL);
for (dev_entry = dev_list.Next; dev_entry != &dev_list;
dev_entry = dev_entry->Next) {
dev = container_of(dev_entry, struct acmc_device, entry);
- if (FD_ISSET(dev->verbs->async_fd, &readfds)) {
+ if (FD_ISSET(dev->device.verbs->async_fd, &readfds)) {
acm_log(2, "handling event from %s\n",
- dev->verbs->device->name);
+ dev->device.verbs->device->name);
acm_event_handler(dev);
}
}
dev = container_of(dev_entry, struct acmc_device, entry);
- ret = ibv_query_device(dev->verbs, &dev_attr);
+ ret = ibv_query_device(dev->device.verbs, &dev_attr);
if (ret)
continue;
for (*port = 1; *port <= dev_attr.phys_port_cnt; (*port)++) {
- ret = ibv_query_port(dev->verbs, *port, &port_attr);
+ ret = ibv_query_port(dev->device.verbs, *port, &port_attr);
if (ret)
continue;
for (i = 0; i < port_attr.gid_tbl_len; i++) {
- ret = ibv_query_gid(dev->verbs, *port, i, &gid);
+ ret = ibv_query_gid(dev->device.verbs, *port, i, &gid);
if (ret || !gid.global.interface_id)
break;
dev = acm_get_device_from_gid(gid, &port_num);
if (dev && ep->port->dev == dev
- && ep->port->port_num == port_num && ep->endpoint.pkey == pkey) {
+ && ep->port->port.port_num == port_num && ep->endpoint.pkey == pkey) {
if (!acm_ep_insert_addr(ep, ip_str, addr, addr_len, addr_type)) {
acm_log(0, "Added %s %s %d 0x%x from %s\n", ip_str,
- dev->verbs->device->name, port_num, pkey,
+ dev->device.verbs->device->name, port_num, pkey,
ifname);
}
}
int port;
size_t addr_len;
- dev_name = ep->port->dev->verbs->device->name;
+ dev_name = ep->port->dev->device.verbs->device->name;
acm_log(1, "device %s, port %d, pkey 0x%x\n",
- dev_name, ep->port->port_num, ep->endpoint.pkey);
+ dev_name, ep->port->port.port_num, ep->endpoint.pkey);
acm_get_system_ips(ep);
pkey = 0xFFFF;
}
- if (!stricmp(dev_name, dev) && (ep->port->port_num == (uint8_t) port) &&
+ if (!stricmp(dev_name, dev) &&
+ (ep->port->port.port_num == (uint8_t) port) &&
(ep->endpoint.pkey == pkey)) {
acm_log(1, "assigning %s\n", name);
if (acm_ep_insert_addr(ep, name, addr, addr_len, type)) {
}
}
-static int acmp_add_addr(struct acm_endpoint *endpoint, struct acm_address *addr)
+static int acmp_add_addr(void *ep_context, struct acm_address *addr)
{
- struct acmp_ep *ep = endpoint->prov_context;
+ struct acmp_ep *ep = ep_context;
struct acmp_dest *dest;
acm_log(2, "\n");
- addr->prov_context = ep;
+ addr->prov_addr = ep;
if (loopback_prot != ACMP_LOOPBACK_PROT_LOCAL)
return 0;
struct acmp_device *dev;
DLIST_ENTRY *dev_entry;
- acm_log(1, "dev 0xllx port %d pkey 0x%x\n",
- endpoint->dev_guid, endpoint->port_num, endpoint->pkey);
+ acm_log(1, "dev 0x%llx port %d pkey 0x%x\n",
+ endpoint->port->dev->dev_guid, endpoint->port->port_num,
+ endpoint->pkey);
for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;
dev_entry = dev_entry->Next) {
dev = container_of(dev_entry, struct acmp_device, entry);
- if (dev->guid == endpoint->dev_guid)
- return &dev->port[endpoint->port_num - 1];
+ if (dev->guid == endpoint->port->dev->dev_guid)
+ return &dev->port[endpoint->port->port_num - 1];
}
return NULL;
DLIST_ENTRY *entry;
acm_log(1, "dev 0xllx port %d pkey 0x%x\n",
- endpoint->dev_guid, endpoint->port_num, endpoint->pkey);
+ endpoint->port->dev->dev_guid, endpoint->port->port_num, endpoint->pkey);
for (entry = port->ep_list.Next; entry != &port->ep_list;
entry = entry->Next) {
ep = container_of(entry, struct acmp_ep, entry);
return res;
}
-static void acmp_ep_down(struct acm_endpoint *endpoint)
+static void acmp_close_endpoint(void *ep_context)
{
- struct acmp_ep *ep = endpoint->prov_context;
- acm_log(1, "%s %d pkey 0x%04x\n", ep->port->dev->verbs->device->name,
- endpoint->port_num, endpoint->pkey);
+ struct acmp_ep *ep = ep_context;
+
+ acm_log(1, "%s %d pkey 0x%04x\n",
+ ep->port->dev->verbs->device->name,
+ ep->port->port_num, ep->pkey);
lock_acquire(&ep->lock);
ep->endpoint = NULL;
static void acm_ep_down(struct acmc_ep *ep)
{
- acm_log(1, "%s %d pkey 0x%04x\n", ep->port->dev->verbs->device->name,
- ep->port->port_num, ep->endpoint.pkey);
- acmp_ep_down(&ep->endpoint);
+ acm_log(1, "%s %d pkey 0x%04x\n",
+ ep->port->dev->device.verbs->device->name,
+ ep->port->port.port_num, ep->endpoint.pkey);
+ if (ep->prov_ep_context)
+ ep->port->prov->close_endpoint(ep->prov_ep_context);
free(ep);
}
return NULL;
ep->port = port;
- ep->endpoint.dev_guid = port->dev->guid;
- ep->endpoint.port_num = port->port_num;
+ ep->endpoint.port = &port->port;
ep->endpoint.pkey = pkey;
lock_init(&ep->lock);
return ep;
}
-static void acmp_ep_up(struct acm_endpoint *endpoint)
+static int acmp_open_endpoint(const struct acm_endpoint *endpoint,
+ void *port_context, void **ep_context)
{
- struct acmp_port *port;
+ struct acmp_port *port = port_context;
struct acmp_ep *ep;
struct ibv_qp_init_attr init_attr;
struct ibv_qp_attr attr;
int ret, sq_size;
- acm_log(1, "\n");
- if (endpoint->prov_context) {
- acm_log(2, "endpoint for pkey 0x%x already exists\n", endpoint->pkey);
- return;
- }
-
- port = acmp_get_port(endpoint);
- if (!port) {
- acm_log(0, "ERROR - failed to acquire dev 0xllx port %d pkey 0x%x\n",
- endpoint->dev_guid, endpoint->port_num, endpoint->pkey);
- return;
- }
-
- ep = acmp_get_ep(port, endpoint);
+ ep = acmp_get_ep(port, (struct acm_endpoint *) endpoint);
if (ep) {
acm_log(2, "endpoint for pkey 0x%x already exists\n", endpoint->pkey);
lock_acquire(&ep->lock);
- ep->endpoint = endpoint;
+ ep->endpoint = (struct acm_endpoint *) endpoint;
lock_release(&ep->lock);
- endpoint->prov_context = ep;
- return;
+ *ep_context = (void *) ep;
+ return 0;
}
acm_log(2, "creating endpoint for pkey 0x%x\n", endpoint->pkey);
- ep = acmp_alloc_ep(port, endpoint);
+ ep = acmp_alloc_ep(port, (struct acm_endpoint *) endpoint);
if (!ep)
- return;
+ return -1;
- endpoint->prov_context = ep;
- sprintf(ep->id_string, "%s-%d-0x%x", port->dev->verbs->device->name,
+ sprintf(ep->id_string, "%s-%d-0x%x",
+ port->dev->verbs->device->name,
port->port_num, endpoint->pkey);
sq_size = resolve_depth + sa_depth + send_depth;
attr.qp_state = IBV_QPS_INIT;
attr.port_num = port->port_num;
- attr.pkey_index = acmp_get_pkey_index(endpoint);
+ attr.pkey_index = acmp_get_pkey_index((struct acm_endpoint *) endpoint);
attr.qkey = ACM_QKEY;
ret = ibv_modify_qp(ep->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_QKEY);
lock_release(&port->lock);
acmp_ep_preload(ep);
acmp_ep_join(ep);
- return;
+ *ep_context = (void *) ep;
+ return 0;
err2:
ibv_destroy_qp(ep->qp);
ibv_destroy_cq(ep->cq);
err0:
free(ep);
+ return -1;
}
static void acm_ep_up(struct acmc_port *port, uint16_t pkey)
goto err;
}
+ if (port->prov->open_endpoint(&ep->endpoint, port->prov_port_context,
+ &ep->prov_ep_context)) {
+ acm_log(0, "Error -- failed to open prov endpoint\n");
+ goto err;
+ }
+
lock_acquire(&port->lock);
DListInsertHead(&ep->entry, &port->ep_list);
lock_release(&port->lock);
- acmp_ep_up(&ep->endpoint);
for (i = 0; i < MAX_EP_ADDR; i++) {
if (ep->addr_info[i].addr.type)
- acmp_add_addr(&ep->endpoint, &ep->addr_info[i].addr);
+ acmp_add_addr(ep->prov_ep_context, &ep->addr_info[i].addr);
}
return;
free(ep);
}
-static struct acmp_port * acmp_get_port_by_guid(uint64_t guid, int port_num)
-{
- struct acm_endpoint ep;
-
- memset(&ep, 0, sizeof(ep));
- ep.dev_guid = guid;
- ep.port_num = port_num;
-
- return acmp_get_port(&ep);
-}
-
-static void acmp_port_up(struct acmp_port *port, struct ibv_port_attr *attr)
+static void acmp_port_up(struct acmp_port *port)
{
+ struct ibv_port_attr attr;
union ibv_gid gid;
uint16_t pkey, sm_lid;
int i, ret;
acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
- port->mtu = attr->active_mtu;
- port->rate = acm_get_rate(attr->active_width, attr->active_speed);
- if (attr->subnet_timeout >= 8)
- port->subnet_timeout = 1 << (attr->subnet_timeout - 8);
+ ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
+ if (ret) {
+ acm_log(0, "ERROR - unable to get port attribute\n");
+ return;
+ }
+
+ port->mtu = attr.active_mtu;
+ port->rate = acm_get_rate(attr.active_width, attr.active_speed);
+ if (attr.subnet_timeout >= 8)
+ port->subnet_timeout = 1 << (attr.subnet_timeout - 8);
for (port->gid_cnt = 0;; port->gid_cnt++) {
- ret = ibv_query_gid(port->dev->verbs, port->port_num, port->gid_cnt, &gid);
+ ret = ibv_query_gid(port->dev->verbs, port->port_num,
+ port->gid_cnt, &gid);
if (ret || !gid.global.interface_id)
break;
port->base_gid = gid;
}
- port->lid = attr->lid;
- port->lid_mask = 0xffff - ((1 << attr->lmc) - 1);
+ port->lid = attr.lid;
+ port->lid_mask = 0xffff - ((1 << attr.lmc) - 1);
port->sa_dest.av.src_path_bits = 0;
- port->sa_dest.av.dlid = attr->sm_lid;
- port->sa_dest.av.sl = attr->sm_sl;
+ port->sa_dest.av.dlid = attr.sm_lid;
+ port->sa_dest.av.sl = attr.sm_sl;
port->sa_dest.av.port_num = port->port_num;
port->sa_dest.remote_qpn = 1;
- sm_lid = htons(attr->sm_lid);
+ sm_lid = htons(attr.sm_lid);
acmp_set_dest_addr(&port->sa_dest, ACM_ADDRESS_LID,
(uint8_t *) &sm_lid, sizeof(sm_lid));
atomic_set(&port->sa_dest.refcnt, 1);
port->sa_dest.state = ACMP_READY;
- for (i = 0; i < attr->pkey_tbl_len; i++) {
+ for (i = 0; i < attr.pkey_tbl_len; i++) {
ret = ibv_query_pkey(port->dev->verbs, port->port_num, i, &pkey);
if (ret)
continue;
union ibv_gid gid;
uint16_t pkey;
int i, ret;
- struct acmp_port *prov_port;
+ struct acmc_prov_context *dev_ctx;
- acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
- ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
+ acm_log(1, "%s %d\n", port->dev->device.verbs->device->name,
+ port->port.port_num);
+ ret = ibv_query_port(port->dev->device.verbs, port->port.port_num,
+ &attr);
if (ret) {
acm_log(0, "ERROR - unable to get port state\n");
return;
}
for (port->gid_cnt = 0;; port->gid_cnt++) {
- ret = ibv_query_gid(port->dev->verbs, port->port_num, port->gid_cnt, &gid);
+ ret = ibv_query_gid(port->dev->device.verbs, port->port.port_num,
+ port->gid_cnt, &gid);
if (ret || !gid.global.interface_id)
break;
}
port->lid = attr.lid;
port->lid_mask = 0xffff - ((1 << attr.lmc) - 1);
port->state = IBV_PORT_ACTIVE;
- if ((prov_port = acmp_get_port_by_guid(port->dev->guid, port->port_num))) {
- acmp_port_up(prov_port, &attr);
- } else {
- acm_log(0, "Error -- failed to find prov port\n");
+ dev_ctx = acm_acquire_prov_context(&port->dev->prov_dev_context_list,
+ port->prov);
+ if (!dev_ctx) {
+ acm_log(0, "Error -- failed to acquire dev context\n");
return;
}
+ if (atomic_get(&dev_ctx->refcnt) == 1) {
+ if (port->prov->open_device(&port->dev->device, &dev_ctx->context)) {
+ acm_log(0, "Error -- failed to open the prov device\n");
+ goto err1;
+ }
+ }
+
+ if (port->prov->open_port(&port->port, dev_ctx->context,
+ &port->prov_port_context)) {
+ acm_log(0, "Error -- failed to open the prov port\n");
+ goto err1;
+ }
+
for (i = 0; i < attr.pkey_tbl_len; i++) {
- ret = ibv_query_pkey(port->dev->verbs, port->port_num, i, &pkey);
+ ret = ibv_query_pkey(port->dev->device.verbs,
+ port->port.port_num, i, &pkey);
if (ret)
continue;
pkey = ntohs(pkey);
acm_ep_up(port, pkey);
}
-
- acm_log(1, "%s %d is up\n", port->dev->verbs->device->name, port->port_num);
+ return;
+err1:
+ acm_release_prov_context(dev_ctx);
}
static void acmp_port_down(struct acmp_port *port)
{
struct ibv_port_attr attr;
int ret;
- struct acmp_port *prov_port;
DLIST_ENTRY *entry;
struct acmc_ep *ep;
+ struct acmc_prov_context *dev_ctx;
- acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
- ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
+ acm_log(1, "%s %d\n", port->dev->device.verbs->device->name,
+ port->port.port_num);
+ ret = ibv_query_port(port->dev->device.verbs, port->port.port_num, &attr);
if (!ret && attr.state == IBV_PORT_ACTIVE) {
acm_log(1, "port active\n");
return;
}
lock_release(&port->lock);
- if ((prov_port = acmp_get_port_by_guid(port->dev->guid, port->port_num))) {
- acmp_port_down(prov_port);
- } else {
- acm_log(0, "Error -- failed to find prov port\n");
- return;
+ if (port->prov_port_context) {
+ port->prov->close_port(port->prov_port_context);
+ port->prov_port_context = NULL;
+ dev_ctx = acm_get_prov_context(&port->dev->prov_dev_context_list,
+ port->prov);
+ if (dev_ctx) {
+ port->prov->close_device(dev_ctx->context);
+ acm_release_prov_context(dev_ctx);
+ }
}
- acm_log(1, "%s %d is down\n", port->dev->verbs->device->name, port->port_num);
+
+ acm_log(1, "%s %d is down\n", port->dev->device.verbs->device->name,
+ port->port.port_num);
}
static void acm_event_handler(struct acmc_device *dev)
{
struct ibv_async_event event;
int i, ret;
- struct acmp_port *prov_port;
- ret = ibv_get_async_event(dev->verbs, &event);
+ ret = ibv_get_async_event(dev->device.verbs, &event);
if (ret)
return;
acm_log(2, "processing async event %s for %s\n",
ibv_event_type_str(event.event_type),
- dev->verbs->device->name);
+ dev->device.verbs->device->name);
i = event.element.port_num - 1;
switch (event.event_type) {
break;
case IBV_EVENT_CLIENT_REREGISTER:
if ((dev->port[i].state == IBV_PORT_ACTIVE) &&
- (prov_port = acmp_get_port_by_guid(dev->guid, dev->port[i].port_num))) {
- acmp_port_join(prov_port);
+ dev->port[i].prov_port_context) {
+ dev->port[i].prov->handle_event(dev->port[i].prov_port_context,
+ event.event_type);
acm_log(1, "%s %d has reregistered\n",
- dev->verbs->device->name, i + 1);
+ dev->device.verbs->device->name, i + 1);
}
break;
default:
{
struct acmc_device *dev;
DLIST_ENTRY *dev_entry;
- struct acmp_device *pdev;
int i;
acm_log(1, "\n");
acm_port_up(&dev->port[i]);
}
}
+}
- for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;
- dev_entry = dev_entry->Next) {
+static int acmp_open_port(const struct acm_port *cport, void *dev_context,
+ void **port_context)
+{
+ struct acmp_device *dev = dev_context;
+ struct acmp_port *port;
- pdev = container_of(dev_entry, struct acmp_device, entry);
- beginthread(acmp_comp_handler, pdev);
+ if (cport->port_num < 1 || cport->port_num > dev->port_cnt) {
+ acm_log(0, "Error: port_num %d is out of range (max %d)\n",
+ cport->port_num, dev->port_cnt);
+ return -1;
}
-}
-static void
-acmp_open_port(struct acmp_port *port, struct acmp_device *dev, uint8_t port_num)
-{
- acm_log(1, "%s %d\n", dev->verbs->device->name, port_num);
- port->dev = dev;
- port->port_num = port_num;
- lock_init(&port->lock);
- DListInit(&port->ep_list);
- acmp_init_dest(&port->sa_dest, ACM_ADDRESS_LID, NULL, 0);
+ port = &dev->port[cport->port_num - 1];
+ port->port = cport;
- port->mad_portid = umad_open_port(dev->verbs->device->name, port->port_num);
+ port->mad_portid = umad_open_port(dev->verbs->device->name,
+ port->port_num);
if (port->mad_portid < 0) {
acm_log(0, "ERROR - unable to open MAD port\n");
- return;
+ return -1;
}
port->mad_agentid = umad_register(port->mad_portid,
}
port->state = IBV_PORT_DOWN;
- return;
+ acmp_port_up(port);
+ *port_context = port;
+ return 0;
err:
umad_close_port(port->mad_portid);
+ return -1;
}
-static void
-acm_open_port(struct acmc_port *port, struct acmc_device *dev, uint8_t port_num)
+static void acmp_close_port(void *port_context)
+{
+ struct acmp_port *port = port_context;
+
+ acmp_port_down(port);
+ umad_unregister(port->mad_portid, port->mad_agentid);
+ port->mad_agentid = -1;
+ umad_close_port(port->mad_portid);
+ port->mad_portid = -1;
+ port->port = NULL;
+ port->state = IBV_PORT_DOWN;
+}
+
+static void acmp_init_port(struct acmp_port *port, struct acmp_device *dev,
+ uint8_t port_num)
{
acm_log(1, "%s %d\n", dev->verbs->device->name, port_num);
port->dev = dev;
port->port_num = port_num;
lock_init(&port->lock);
DListInit(&port->ep_list);
+ acmp_init_dest(&port->sa_dest, ACM_ADDRESS_LID, NULL, 0);
+ port->state = IBV_PORT_DOWN;
+}
+
+static void
+acm_open_port(struct acmc_port *port, struct acmc_device *dev, uint8_t port_num)
+{
+ acm_log(1, "%s %d\n", dev->device.verbs->device->name, port_num);
+ port->dev = dev;
+ port->port.dev = &dev->device;
+ port->port.port_num = port_num;
+ lock_init(&port->lock);
+ DListInit(&port->ep_list);
port->prov = &def_prov;
port->state = IBV_PORT_DOWN;
}
-static void acmp_open_dev(struct ibv_device *ibdev, struct ibv_context *verbs,
- struct ibv_device_attr *attr)
+static int acmp_open_dev(const struct acm_device *device, void **dev_context)
{
struct acmp_device *dev;
size_t size;
- int i;
+ struct ibv_device_attr attr;
+ int i, ret;
+ DLIST_ENTRY *dev_entry;
+ struct ibv_context *verbs;
- acm_log(1, "%s\n", ibdev->name);
+ acm_log(1, "dev_guid 0x%llx %s\n", device->dev_guid,
+ device->verbs->device->name);
+
+ for (dev_entry = acmp_dev_list.Next; dev_entry != &acmp_dev_list;
+ dev_entry = dev_entry->Next) {
+ dev = container_of(dev_entry, struct acmp_device, entry);
+
+ if (dev->guid == device->dev_guid) {
+ acm_log(2, "dev_guid 0x%llx already exits\n",
+ device->dev_guid);
+ *dev_context = dev;
+ dev->device = device;
+ return 0;
+ }
+ }
+
+ /* We need to release the core device structure when device close is
+ * called. But this provider does not support dynamic add/removal of
+ * devices/ports/endpoints. To avoid use-after-free issues, we open
+ * our own verbs context, rather than using the one in the core
+ * device structure.
+ */
+ verbs = ibv_open_device(device->verbs->device);
+ if (!verbs) {
+ acm_log(0, "ERROR - opening device %s\n",
+ device->verbs->device->name);
+ goto err;
+ }
+
+ ret = ibv_query_device(verbs, &attr);
+ if (ret) {
+ acm_log(0, "ERROR - ibv_query_device (%s) %d\n",
+ verbs->device->name, ret);
+ goto err;
+ }
- size = sizeof(*dev) + sizeof(struct acmp_port) * attr->phys_port_cnt;
+ size = sizeof(*dev) + sizeof(struct acmp_port) * attr.phys_port_cnt;
dev = (struct acmp_device *) calloc(1, size);
if (!dev)
- return;
+ goto err;
dev->verbs = verbs;
- dev->guid = ibv_get_device_guid(ibdev);
- dev->port_cnt = attr->phys_port_cnt;
+ dev->device = device;
+ dev->port_cnt = attr.phys_port_cnt;
dev->pd = ibv_alloc_pd(dev->verbs);
if (!dev->pd) {
}
for (i = 0; i < dev->port_cnt; i++) {
- acmp_open_port(&dev->port[i], dev, i + 1);
+ acmp_init_port(&dev->port[i], dev, i + 1);
}
+ if (pthread_create(&dev->comp_thread_id, NULL, acmp_comp_handler, dev)) {
+ acm_log(0, "Error -- failed to create the comp thread for dev %s",
+ dev->verbs->device->name);
+ goto err3;
+ }
+
+ lock_acquire(&acmp_dev_lock);
DListInsertHead(&dev->entry, &acmp_dev_list);
+ lock_release(&acmp_dev_lock);
+ dev->guid = device->dev_guid;
+ *dev_context = dev;
- acm_log(1, "%s opened\n", ibdev->name);
- return;
+ acm_log(1, "%s opened\n", dev->verbs->device->name);
+ return 0;
+err3:
+ ibv_destroy_comp_channel(dev->channel);
err2:
ibv_dealloc_pd(dev->pd);
err1:
free(dev);
+err:
+ return -1;
+}
+
+static void acmp_close_dev(void *dev_context)
+{
+ struct acmp_device *dev = dev_context;
+
+ acm_log(1, "dev_guid 0x%llx\n", dev->device->dev_guid);
+ dev->device = NULL;
}
static void acm_open_dev(struct ibv_device *ibdev)
if (!dev)
goto err1;
- dev->verbs = verbs;
- dev->guid = ibv_get_device_guid(ibdev);
+ dev->device.verbs = verbs;
+ dev->device.dev_guid = ibv_get_device_guid(ibdev);
dev->port_cnt = attr.phys_port_cnt;
+ DListInit(&dev->prov_dev_context_list);
for (i = 0; i < dev->port_cnt; i++) {
acm_open_port(&dev->port[i], dev, i + 1);
DListInsertHead(&dev->entry, &dev_list);
- acmp_open_dev(ibdev, verbs, &attr);
-
acm_log(1, "%s opened\n", ibdev->name);
return;
acm_log(0, "ERROR - no devices\n");
return -1;
}
- if (DListEmpty(&acmp_dev_list)) {
- acm_log(0, "ERROR - no provider devices\n");
- return -1;
- }
return 0;
}
atomic_init(&wait_cnt);
DListInit(&dev_list);
DListInit(&acmp_dev_list);
+ lock_init(&acmp_dev_lock);
DListInit(&timeout_list);
event_init(&timeout_event);
for (i = 0; i < ACM_MAX_COUNTER; i++)