#define MAX_EP_ADDR 4
#define MAX_EP_MC 2
+enum acm_state {
+ ACM_INIT,
+ ACM_QUERY_ADDR,
+ ACM_ADDR_RESOLVED,
+ ACM_QUERY_ROUTE,
+ ACM_PROCESS_REQUEST,
+ ACM_READY
+};
+
enum acm_addr_prot
{
ACM_ADDR_PROT_ACM
ACM_ROUTE_PROT_SA
};
+/*
+ * Nested locking order: dest -> ep
+ */
struct acm_dest
{
uint8_t address[ACM_MAX_ADDRESS]; /* keep first */
union ibv_gid mgid;
DLIST_ENTRY req_queue;
uint32_t remote_qpn;
+ lock_t lock;
+ enum acm_state state;
+ atomic_t refcnt;
+ uint8_t addr_type;
};
struct acm_port
{
struct acm_device *dev;
DLIST_ENTRY ep_list;
+ lock_t lock;
int mad_portid;
int mad_agentid;
struct acm_dest sa_dest;
DLIST_ENTRY pending_queue;
DLIST_ENTRY active_queue;
DLIST_ENTRY wait_queue;
+ enum acm_state state;
};
struct acm_send_msg
va_end(args);
}
-static void acm_log_ep_addr(int level, const char *msg, uint16_t type, uint8_t *data)
+static void acm_log_addr(int level, const char *msg, uint16_t addr_type, uint8_t *addr)
{
struct ib_path_record *path;
char ip_addr[ACM_MAX_ADDRESS];
lock_acquire(&log_lock);
fprintf(flog, msg);
- switch (type) {
+ switch (addr_type) {
case ACM_EP_INFO_NAME:
- fprintf(flog, "%s\n", data);
+ fprintf(flog, "%s\n", addr);
break;
case ACM_EP_INFO_ADDRESS_IP:
- inet_ntop(AF_INET, data, ip_addr, ACM_MAX_ADDRESS);
+ inet_ntop(AF_INET, addr, ip_addr, ACM_MAX_ADDRESS);
fprintf(flog, "%s\n", ip_addr);
break;
case ACM_EP_INFO_ADDRESS_IP6:
- inet_ntop(AF_INET6, data, ip_addr, ACM_MAX_ADDRESS);
+ inet_ntop(AF_INET6, addr, ip_addr, ACM_MAX_ADDRESS);
fprintf(flog, "%s\n", ip_addr);
break;
case ACM_EP_INFO_PATH:
- path = (struct ib_path_record *) data;
+ path = (struct ib_path_record *) addr;
fprintf(flog, "path record, SLID 0x%x, DLID 0x%x\n",
ntohs(path->slid), ntohs(path->dlid));
break;
default:
- fprintf(flog, "unknown endpoint address 0x%x\n", type);
+ fprintf(flog, "unknown address 0x%x\n", addr_type);
}
lock_release(&log_lock);
}
{
struct acm_send_msg *msg;
- msg = (struct acm_send_msg *) zalloc(sizeof *msg);
+ msg = (struct acm_send_msg *) calloc(1, sizeof *msg);
if (!msg) {
acm_log(0, "ERROR - unable to allocate send buffer\n");
return NULL;
struct acm_ep *ep = msg->ep;
struct ibv_send_wr *bad_wr;
+ lock_acquire(&ep->lock);
if (ep->available_sends) {
acm_log(2, "posting send to QP\n");
ep->available_sends--;
acm_log(2, "no sends available, queuing message\n");
DListInsertTail(&msg->entry, &ep->pending_queue);
}
+ lock_release(&ep->lock);
}
static void acm_post_recv(struct acm_ep *ep, uint64_t address)
ibv_post_recv(ep->qp, &wr, &bad_wr);
}
+/* Caller must hold ep lock */
static void acm_send_available(struct acm_ep *ep)
{
struct acm_send_msg *msg;
ret = ibv_attach_mcast(ep->qp, &mc_rec->mgid, mc_rec->mlid);
if (ret) {
acm_log(0, "ERROR - unable to attach QP to multicast group\n");
+ } else {
+ dest->state = ACM_READY;
+ acm_log(1, "join successful\n");
}
- acm_log(1, "join successful\n");
} else {
acm_log(0, "ERROR - MGID in join response not found\n");
}
return -1;
}
-/*
- * Multicast groups are ordered lowest to highest preference.
- */
-static int
-acm_record_av(struct acm_dest *dest, struct acm_ep *ep,
- struct ibv_wc *wc, struct acm_resolve_rec *rec)
+static struct acm_dest *
+acm_alloc_dest(uint8_t addr_type, uint8_t *addr)
{
- int i, index;
+ struct acm_dest *dest;
- acm_log(2, "\n");
- for (i = min(rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
- index = acm_mc_index(ep, &rec->gid[i]);
- if (index >= 0) {
- acm_log(2, "selecting MC group at index %d\n", index);
- dest->av = ep->mc_dest[index].av;
- dest->av.dlid = wc->slid;
- dest->av.src_path_bits = wc->dlid_path_bits;
- dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
-
- dest->mgid = ep->mc_dest[index].mgid;
-
- dest->path = ep->mc_dest[index].path;
- dest->path.dgid = dest->av.grh.dgid;
- dest->path.dlid = htons(dest->av.dlid);
- return ACM_STATUS_SUCCESS;
- }
+ acm_log(1, "\n");
+ dest = calloc(1, sizeof *dest);
+ if (!dest) {
+ acm_log(0, "ERROR - unable to allocate dest\n");
+ return NULL;
}
- return ACM_STATUS_ENODATA;
+ memcpy(dest->address, addr, ACM_MAX_ADDRESS);
+ dest->addr_type = addr_type;
+ DListInit(&dest->req_queue);
+ atomic_set(&dest->refcnt, 1);
+ lock_init(&dest->lock);
+
+ return dest;
}
-/*
- * Record the source of a resolve request. Use the source QPN to see if
- * the remote service has relocated and we need to update our cache.
- */
+/* Caller must hold ep lock. */
static struct acm_dest *
-acm_record_src(struct acm_ep *ep, struct ibv_wc *wc, struct acm_resolve_rec *rec)
+acm_get_dest(struct acm_ep *ep, uint8_t addr_type, uint8_t *addr)
{
struct acm_dest *dest, **tdest;
- int ret;
acm_log(2, "\n");
- lock_acquire(&ep->lock);
- tdest = tfind(rec->src, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
- if (!tdest) {
- acm_log(2, "creating new dest\n");
- dest = zalloc(sizeof *dest);
- if (!dest) {
- acm_log(0, "ERROR - unable to allocate dest\n");
- goto unlock;
- }
-
- memcpy(dest->address, rec->src, ACM_MAX_ADDRESS);
- DListInit(&dest->req_queue);
- tsearch(dest, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
- } else {
+ tdest = tfind(addr, &ep->dest_map[addr_type - 1], acm_compare_dest);
+ if (tdest) {
dest = *tdest;
+ (void) atomic_inc(&dest->refcnt);
+ } else {
+ dest = NULL;
}
+ return dest;
+}
- if (dest->ah) {
- if (dest->remote_qpn == wc->src_qp)
- goto unlock;
+static void
+acm_put_dest(struct acm_dest *dest)
+{
+ if (atomic_dec(&dest->refcnt) == 0) {
+ free(dest);
+ }
+}
- ibv_destroy_ah(dest->ah); // TODO: ah could be in use
- dest->ah = NULL;
+static struct acm_dest *
+acm_acquire_dest(struct acm_ep *ep, uint8_t addr_type, uint8_t *addr)
+{
+ struct acm_dest *dest;
+
+ acm_log_addr(2, "acm_acquire_dest: ", addr_type, addr);
+ lock_acquire(&ep->lock);
+ dest = acm_get_dest(ep, addr_type, addr);
+ if (!dest) {
+ dest = acm_alloc_dest(addr_type, addr);
+ if (dest) {
+ tsearch(dest, &ep->dest_map[addr_type - 1], acm_compare_dest);
+ (void) atomic_inc(&dest->refcnt);
+ }
}
+ lock_release(&ep->lock);
+ return dest;
+}
- acm_log(2, "creating address handle\n");
- ret = acm_record_av(dest, ep, wc, rec);
- if (ret) {
- acm_log(0, "ERROR - failed to record av\n");
- goto err;
+/* Caller must hold ep lock. */
+//static void
+//acm_remove_dest(struct acm_ep *ep, struct acm_dest *dest)
+//{
+// acm_log_addr(2, "acm_remove_dest: ", dest->addr_type, dest->addr);
+// tdelete(dest->address, &ep->dest_map[dest->addr_type - 1], acm_compare_dest);
+// acm_put_dest(dest);
+//}
+
+/*
+ * Multicast groups are ordered lowest to highest preference.
+ */
+static uint8_t
+acm_record_acm_route(struct acm_ep *ep, struct acm_dest *dest, struct ibv_wc *wc,
+ struct acm_resolve_rec *rec)
+{
+ int i, index;
+
+ acm_log(2, "\n");
+ for (i = min(rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
+ index = acm_mc_index(ep, &rec->gid[i]);
+ if (index >= 0) {
+ goto found;
+ }
}
+ acm_log(0, "ERROR - no shared multicast groups\n");
+ dest->state = ACM_INIT;
+ return ACM_STATUS_ENODATA;
+
+found:
+ acm_log(2, "selecting MC group at index %d\n", index);
+ dest->av = ep->mc_dest[index].av;
+ dest->av.dlid = wc->slid;
+ dest->av.src_path_bits = wc->dlid_path_bits;
+ dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
+
+ dest->mgid = ep->mc_dest[index].mgid;
+
+ dest->path = ep->mc_dest[index].path;
+ dest->path.dgid = dest->av.grh.dgid;
+ dest->path.dlid = htons(dest->av.dlid);
+
dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
if (!dest->ah) {
acm_log(0, "ERROR - failed to create ah\n");
- goto err;
+ dest->state = ACM_INIT;
+ return ACM_STATUS_ENOMEM;
}
dest->remote_qpn = wc->src_qp;
+ dest->state = ACM_READY;
+ return ACM_STATUS_SUCCESS;
-unlock:
- lock_release(&ep->lock);
- return dest;
+}
-err:
- if (!tdest) {
- tdelete(dest->address, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
- free(dest);
- }
- lock_release(&ep->lock);
- return NULL;
+static void
+acm_record_acm_addr(struct acm_dest *dest, struct ibv_wc *wc)
+{
+ acm_log(2, "\n");
+ dest->path.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
+ dest->remote_qpn = wc->src_qp;
+ dest->state = ACM_ADDR_RESOLVED;
}
static void acm_init_resp_mad(struct acm_mad *resp, struct acm_mad *req)
resp->tid = req->tid;
}
-static int acm_validate_resolve_req(struct acm_mad *mad)
+static int acm_validate_addr_req(struct acm_mad *mad)
{
struct acm_resolve_rec *rec;
}
static void
-acm_process_resolve_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
+acm_send_addr_resp(struct acm_ep *ep, struct acm_dest *dest, struct acm_mad *mad)
{
struct acm_resolve_rec *rec, *resp_rec;
- struct acm_dest *dest;
struct acm_send_msg *msg;
struct acm_mad *resp_mad;
acm_log(2, "\n");
- if (acm_validate_resolve_req(mad)) {
- acm_log(0, "ERROR - invalid request\n");
- return;
- }
-
- rec = (struct acm_resolve_rec *) mad->data;
- dest = acm_record_src(ep, wc, rec);
- if (!dest) {
- acm_log(0, "ERROR - failed to record source\n");
- return;
- }
-
- if (acm_addr_index(ep, rec->dest, rec->dest_type) < 0) {
- acm_log(2, "no matching address - discarding\n");
- return;
- }
-
msg = acm_alloc_send(ep, dest, sizeof (*resp_mad));
if (!msg) {
acm_log(0, "ERROR - failed to allocate message\n");
return;
}
+ rec = (struct acm_resolve_rec *) mad->data;
resp_mad = (struct acm_mad *) msg->data;
resp_rec = (struct acm_resolve_rec *) resp_mad->data;
+
acm_init_resp_mad(resp_mad, mad);
resp_rec->dest_type = rec->src_type;
resp_rec->dest_length = rec->src_length;
memcpy(resp_rec->src, rec->dest, ACM_MAX_ADDRESS);
memcpy(resp_rec->gid, dest->mgid.raw, sizeof(union ibv_gid));
- acm_log(2, "sending resolve response\n");
- lock_acquire(&ep->lock);
acm_post_send(msg);
- lock_release(&ep->lock);
}
static int
-acm_client_resolve_resp(struct acm_ep *ep, struct acm_client *client,
- struct acm_resolve_msg *req_msg, struct acm_dest *dest, uint8_t status)
+acm_client_resolve_resp(struct acm_client *client, struct acm_resolve_msg *req_msg,
+ struct acm_dest *dest, uint8_t status)
{
struct acm_msg msg;
struct acm_resolve_msg *resp_msg = (struct acm_resolve_msg *) &msg;
resp_msg->hdr.status = status;
resp_msg->hdr.length = ACM_MSG_HDR_LENGTH;
memset(resp_msg->hdr.reserved, 0, sizeof(resp_msg->hdr.reserved));
- if (status)
- goto send_resp;
-
- resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
- resp_msg->data[0].flags = IB_PATH_FLAG_GMP |
- IB_PATH_FLAG_PRIMARY | IB_PATH_FLAG_BIDIRECTIONAL;
- resp_msg->data[0].type = ACM_EP_INFO_PATH;
- resp_msg->data[0].info.path = dest->path;
- if (req_msg->hdr.src_out) {
+ if (status == ACM_STATUS_SUCCESS) {
resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
- memcpy(&resp_msg->data[1], &req_msg->data[req_msg->hdr.src_out],
- ACM_MSG_EP_LENGTH);
+ resp_msg->data[0].flags = IB_PATH_FLAG_GMP |
+ IB_PATH_FLAG_PRIMARY | IB_PATH_FLAG_BIDIRECTIONAL;
+ resp_msg->data[0].type = ACM_EP_INFO_PATH;
+ resp_msg->data[0].info.path = dest->path;
+
+ if (req_msg->hdr.src_out) {
+ resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
+ memcpy(&resp_msg->data[1], &req_msg->data[req_msg->hdr.src_out],
+ ACM_MSG_EP_LENGTH);
+ }
}
-send_resp:
ret = send(client->sock, (char *) resp_msg, resp_msg->hdr.length, 0);
if (ret != resp_msg->hdr.length)
acm_log(0, "failed to send response\n");
return ret;
}
-static struct acm_dest *
-acm_record_dest(struct acm_ep *ep, struct ibv_wc *wc,
- struct acm_resolve_rec *req_rec, struct acm_resolve_rec *resp_rec)
+static void
+acm_complete_queued_req(struct acm_dest *dest, uint8_t status)
{
- struct acm_dest *dest, **tdest;
- int ret;
+ struct acm_request *client_req;
+ DLIST_ENTRY *entry;
- acm_log(2, "\n");
- lock_acquire(&ep->lock);
- tdest = tfind(req_rec->dest, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
- if (!tdest) {
- dest = NULL;
- goto unlock;
+ acm_log(2, "status %d\n", status);
+ lock_acquire(&dest->lock);
+ while (!DListEmpty(&dest->req_queue)) {
+ entry = dest->req_queue.Next;
+ DListRemove(entry);
+ client_req = container_of(entry, struct acm_request, entry);
+ lock_release(&dest->lock);
+ acm_log(2, "completing client request\n");
+ acm_client_resolve_resp(client_req->client,
+ (struct acm_resolve_msg *) &client_req->msg, dest, status);
+ lock_acquire(&dest->lock);
}
+ lock_release(&dest->lock);
+}
- dest = *tdest;
- if (dest->ah)
- goto unlock;
+static void
+acm_process_addr_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
+{
+ struct acm_resolve_rec *rec;
+ struct acm_dest *dest;
+ uint8_t status;
- acm_log(2, "creating address handle\n");
- ret = acm_record_av(dest, ep, wc, resp_rec);
- if (ret) {
- acm_log(0, "ERROR - failed to record av\n");
- goto unlock;
+ acm_log(2, "\n");
+ if (acm_validate_addr_req(mad)) {
+ acm_log(0, "ERROR - invalid request\n");
+ return;
}
- dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
- if (!dest->ah) {
- acm_log(0, "ERROR - failed to create ah\n");
- goto unlock;
+ rec = (struct acm_resolve_rec *) mad->data;
+ dest = acm_acquire_dest(ep, rec->src_type, rec->src);
+ if (!dest) {
+ acm_log(0, "ERROR - unable to add source\n");
+ return;
}
- dest->remote_qpn = wc->src_qp;
+ lock_acquire(&dest->lock);
+ switch (dest->state) {
+ case ACM_READY:
+ if (dest->remote_qpn == wc->src_qp) {
+ lock_release(&dest->lock);
+ break;
+ }
-unlock:
- lock_release(&ep->lock);
- return dest;
+ ibv_destroy_ah(dest->ah); // TODO: ah could be in use
+ /* fall through */
+ default:
+ acm_record_acm_addr(dest, wc);
+ status = acm_record_acm_route(ep, dest, wc, rec);
+ lock_release(&dest->lock);
+ if (status) {
+ acm_log(0, "ERROR - failed to record route\n");
+ goto put;
+ }
+ acm_complete_queued_req(dest, status);
+ }
+
+ if (acm_addr_index(ep, rec->dest, rec->dest_type) >= 0) {
+ acm_send_addr_resp(ep, dest, mad);
+ }
+put:
+ acm_put_dest(dest);
}
static void
-acm_process_resolve_resp(struct acm_ep *ep, struct ibv_wc *wc,
- struct acm_send_msg *msg, struct acm_mad *mad)
+acm_process_addr_resp(struct acm_send_msg *msg, struct ibv_wc *wc, struct acm_mad *mad)
{
struct acm_resolve_rec *req_rec, *resp_rec;
struct acm_dest *dest;
- struct acm_request *client_req;
- DLIST_ENTRY *entry;
uint8_t status;
- status = acm_class_status(mad->status);
+ if (mad) {
+ status = acm_class_status(mad->status);
+ resp_rec = (struct acm_resolve_rec *) mad->data;
+ } else {
+ status = ACM_STATUS_ETIMEDOUT;
+ resp_rec = NULL;
+ }
acm_log(2, "resp status 0x%x\n", status);
req_rec = (struct acm_resolve_rec *) ((struct acm_mad *) msg->data)->data;
- resp_rec = (struct acm_resolve_rec *) mad->data;
- dest = acm_record_dest(ep, wc, req_rec, resp_rec);
+ dest = acm_acquire_dest(msg->ep, req_rec->dest_type, req_rec->dest);
if (!dest) {
- acm_log(0, "ERROR - cannot record dest\n");
+ acm_log(0, "ERROR - cannot record addr resp\n");
return;
}
- if (!status && !dest->ah)
- status = ACM_STATUS_EINVAL;
-
- lock_acquire(&ep->lock);
- while (!DListEmpty(&dest->req_queue)) {
- entry = dest->req_queue.Next;
- DListRemove(entry);
- client_req = container_of(entry, struct acm_request, entry);
- lock_release(&ep->lock);
- acm_log(2, "completing queued client request\n");
- acm_client_resolve_resp(ep, client_req->client,
- (struct acm_resolve_msg *) &client_req->msg, dest, status);
- lock_acquire(&ep->lock);
+ lock_acquire(&dest->lock);
+ if (dest->state != ACM_QUERY_ADDR) {
+ lock_release(&dest->lock);
+ return;
}
- if (status) {
- acm_log(0, "resp failed 0x%x\n", status);
- tdelete(dest->address, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
+
+ if (!status) {
+ acm_record_acm_addr(dest, wc);
+ status = acm_record_acm_route(msg->ep, dest, wc, resp_rec);
+ } else {
+ dest->state = ACM_INIT;
}
- lock_release(&ep->lock);
+ lock_release(&dest->lock);
+
+ acm_complete_queued_req(dest, status);
+ acm_put_dest(dest);
}
static int acm_validate_recv(struct acm_mad *mad)
goto out;
}
acm_log(2, "found matching request\n");
- acm_process_resolve_resp(ep, wc, req, mad);
+ acm_process_addr_resp(req, wc, mad);
if (free)
acm_free_send(req);
} else {
acm_log(2, "unsolicited request\n");
- acm_process_resolve_req(ep, wc, mad);
+ acm_process_addr_req(ep, wc, mad);
free = 0;
}
acm_log(2, "\n");
len = sizeof(*umad) + sizeof(*mad);
- umad = (struct ib_user_mad *) zalloc(len);
+ umad = (struct ib_user_mad *) calloc(1, len);
if (!umad) {
acm_log(0, "ERROR - unable to allocate MAD for join\n");
return;
ep = container_of(ep_entry, struct acm_ep, entry);
acm_join_group(ep, &port_gid, 0, 0, 0, min_rate, min_mtu);
+
+ if ((ep->state = ep->mc_dest[0].state) != ACM_READY)
+ continue;
+
if (port->rate != min_rate || port->mtu != min_mtu)
acm_join_group(ep, &port_gid, 0, 0, 0, port->rate, port->mtu);
}
{
DLIST_ENTRY *entry;
struct acm_send_msg *msg;
- struct acm_mad *mad;
struct acm_resolve_rec *rec;
- struct acm_dest *dest, **tdest;
- struct acm_request *req;
- struct acm_ep *ep;
while (!DListEmpty(&timeout_list)) {
entry = timeout_list.Next;
DListRemove(entry);
msg = container_of(entry, struct acm_send_msg, entry);
- mad = (struct acm_mad *) msg->data;
+ rec = (struct acm_resolve_rec *) ((struct acm_mad *) msg->data)->data;
- rec = (struct acm_resolve_rec *) mad->data;
- ep = msg->ep;
-
- acm_log_ep_addr(0, "acm_process_timeouts: dest ", rec->dest_type, rec->dest);
- lock_acquire(&ep->lock);
- tdest = tfind(rec->dest, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
- if (!tdest) {
- acm_log(0, "destination already removed\n");
- lock_release(&ep->lock);
- continue;
- } else {
- dest = *tdest;
- }
-
- acm_log(2, "failing pending client requests\n");
- while (!DListEmpty(&dest->req_queue)) {
- entry = dest->req_queue.Next;
- DListRemove(entry);
-
- req = container_of(entry, struct acm_request, entry);
- lock_release(&ep->lock);
- acm_client_resolve_resp(ep, req->client,
- (struct acm_resolve_msg *) &req->msg, dest,
- ACM_STATUS_ETIMEDOUT);
- lock_acquire(&ep->lock);
- }
-
- acm_log(2, "resolve timed out, releasing destination\n");
- tdelete(dest->address, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
- lock_release(&ep->lock);
+ acm_log_addr(0, "acm_process_timeouts: dest ", rec->dest_type, rec->dest);
+ acm_process_addr_resp(msg, NULL, NULL);
}
}
return 0;
}
-static void acm_release_client(struct acm_client *client)
+static void acm_disconnect_client(struct acm_client *client)
{
lock_acquire(&client->lock);
shutdown(client->sock, SHUT_RDWR);
return ret;
}
-// TODO: process send/recv asynchronously
+/* SA queries are serialized to avoid swamping SA. */
static uint8_t acm_query_sa(struct acm_ep *ep, struct ib_path_record *path)
{
struct acm_port *port;
acm_log(2, "\n");
len = sizeof(*umad) + sizeof(*mad);
- umad = (struct ib_user_mad *) zalloc(len);
+ umad = (struct ib_user_mad *) calloc(1, len);
if (!umad) {
acm_log(0, "ERROR - unable to allocate MAD\n");
return ACM_STATUS_ENOMEM;
mad = (struct ib_sa_mad *) umad->data;
acm_init_path_query(mad, path);
+ lock_acquire(&port->lock);
ret = umad_send(port->mad_portid, port->mad_agentid, (void *) umad,
sizeof(*mad), timeout, retries);
if (ret) {
ret = ((uint8_t) ret) ? ret : -1;
}
out:
+ lock_release(&port->lock);
free(umad);
return (uint8_t) ret;
}
DLIST_ENTRY *dev_entry, *ep_entry;
int i;
- acm_log_ep_addr(2, "acm_get_ep: ", data->type, data->info.addr);
+ acm_log_addr(2, "acm_get_ep: ", data->type, data->info.addr);
for (dev_entry = dev_list.Next; dev_entry != &dev_list;
dev_entry = dev_entry->Next) {
ep_entry = ep_entry->Next) {
ep = container_of(ep_entry, struct acm_ep, entry);
+ if (ep->state != ACM_READY)
+ continue;
+
if (data->type == ACM_EP_INFO_PATH)
return ep; // TODO: check pkey
}
}
- acm_log_ep_addr(0, "acm_get_ep: could not find ", data->type, data->info.addr);
+ acm_log_addr(0, "acm_get_ep: could not find ", data->type, data->info.addr);
return NULL;
}
}
(void) atomic_inc(&client->refcnt);
- lock_acquire(&ep->lock);
status = acm_query_sa(ep, &msg->data[0].info.path);
- lock_release(&ep->lock);
resp:
return acm_client_query_resp(ep, client, msg, status);
int i;
acm_log(2, "\n");
- if (!ep->mc_dest[0].ah) {
- acm_log(0, "ERROR - multicast group not ready\n");
- return ACM_STATUS_ENOTCONN;
- }
-
msg = acm_alloc_send(ep, &ep->mc_dest[0], sizeof(struct acm_mad));
if (!msg) {
acm_log(0, "ERROR - cannot allocate send msg\n");
return 0;
}
-static int acm_select_src(struct acm_ep_addr_data *src, struct acm_ep_addr_data *dst)
+static int acm_svr_select_src(struct acm_ep_addr_data *src, struct acm_ep_addr_data *dst)
{
struct sockaddr_in6 addr;
socklen_t len;
return ACM_STATUS_SUCCESS;
}
+/* Caller must hold dest lock */
+static uint8_t
+acm_svr_queue_req(struct acm_dest *dest, struct acm_client *client,
+ struct acm_resolve_msg *msg)
+{
+ struct acm_request *req;
+
+ acm_log(2, "\n");
+ req = calloc(1, sizeof *req);
+ if (!req) {
+ acm_log(0, "ERROR - unable to queue client request\n");
+ return ACM_STATUS_ENOMEM;
+ }
+
+ (void) atomic_inc(&client->refcnt);
+ req->client = client;
+ memcpy(&req->msg, msg, sizeof(req->msg));
+ DListInsertTail(&req->entry, &dest->req_queue);
+ return ACM_STATUS_SUCCESS;
+}
+
static int
acm_svr_resolve(struct acm_client *client, struct acm_resolve_msg *msg)
{
struct acm_ep *ep;
- struct acm_dest *dest, **tdest;
- struct acm_request *req;
+ struct acm_dest *dest;
struct acm_ep_addr_data *saddr, *daddr;
uint8_t status;
+ int ret;
status = acm_svr_verify_resolve(msg, &saddr, &daddr);
if (status) {
acm_log(0, "misformatted or unsupported request\n");
- goto resp;
+ return acm_client_resolve_resp(client, msg, NULL, status);
}
- status = acm_select_src(saddr, daddr);
+ status = acm_svr_select_src(saddr, daddr);
if (status) {
acm_log(0, "unable to select suitable source address\n");
- goto resp;
+ return acm_client_resolve_resp(client, msg, NULL, status);
}
- acm_log_ep_addr(2, "acm_svr_resolve: source ", saddr->type, saddr->info.addr);
+ acm_log_addr(2, "acm_svr_resolve: source ", saddr->type, saddr->info.addr);
ep = acm_get_ep(saddr);
if (!ep) {
acm_log(0, "unknown local end point\n");
- status = ACM_STATUS_ESRCADDR;
- goto resp;
+ return acm_client_resolve_resp(client, msg, NULL, ACM_STATUS_ESRCADDR);
}
- acm_log_ep_addr(2, "acm_svr_resolve: dest ", daddr->type, daddr->info.addr);
- (void) atomic_inc(&client->refcnt);
- lock_acquire(&ep->lock);
- tdest = tfind(&daddr->info.addr, &ep->dest_map[daddr->type - 1], acm_compare_dest);
- dest = tdest ? *tdest : NULL;
- if (dest && dest->ah) {
- acm_log(2, "request satisfied from local cache\n");
- status = ACM_STATUS_SUCCESS;
- goto release;
- }
-
- req = zalloc(sizeof *req);
- if (!req) {
- acm_log(0, "ERROR - unable to allocate memory to queue client request\n");
- status = ACM_STATUS_ENOMEM;
- goto release;
- }
+ acm_log_addr(2, "acm_svr_resolve: dest ", daddr->type, daddr->info.addr);
+ dest = acm_acquire_dest(ep, daddr->type, daddr->info.addr);
if (!dest) {
- acm_log(2, "adding new destination\n");
- dest = zalloc(sizeof *dest);
- if (!dest) {
- acm_log(0, "ERROR - unable to allocate destination in client request\n");
- status = ACM_STATUS_ENOMEM;
- goto free_req;
- }
+ acm_log(0, "ERROR - unable to allocate destination in client request\n");
+ return acm_client_resolve_resp(client, msg, NULL, ACM_STATUS_ENOMEM);
+ }
- memcpy(dest->address, &daddr->info.addr, ACM_MAX_ADDRESS);
+ lock_acquire(&dest->lock);
+ switch (dest->state) {
+ case ACM_READY:
+ acm_log(2, "request satisfied from local cache\n");
+ status = ACM_STATUS_SUCCESS;
+ break;
+ case ACM_INIT:
acm_log(2, "sending resolve msg to dest\n");
status = acm_send_resolve(ep, saddr, daddr);
if (status) {
- acm_log(0, "ERROR - failure sending resolve request 0x%x\n", status);
- goto free_dest;
+ break;
}
-
- DListInit(&dest->req_queue);
- tsearch(dest, &ep->dest_map[daddr->type - 1], acm_compare_dest);
+ dest->state = ACM_QUERY_ADDR;
+ /* fall through */
+ default:
+ status = acm_svr_queue_req(dest, client, msg);
+ if (status) {
+ break;
+ }
+ ret = 0;
+ lock_release(&dest->lock);
+ goto put;
}
-
- acm_log(2, "queuing client request\n");
- req->client = client;
- memcpy(&req->msg, msg, sizeof(req->msg));
- DListInsertTail(&req->entry, &dest->req_queue);
- lock_release(&ep->lock);
- return 0;
-
-free_dest:
- free(dest);
- dest = NULL;
-free_req:
- free(req);
-release:
- lock_release(&ep->lock);
-resp:
- return acm_client_resolve_resp(ep, client, msg, dest, status);
+ lock_release(&dest->lock);
+ ret = acm_client_resolve_resp(client, msg, dest, status);
+put:
+ acm_put_dest(dest);
+ return ret;
}
static void acm_svr_receive(struct acm_client *client)
out:
if (ret)
- acm_release_client(client);
+ acm_disconnect_client(client);
}
static void acm_server(void)
port->port_num);
for (i = 0; i < port->pkey_cnt; i++) {
- ep = zalloc(sizeof *ep);
+ ep = calloc(1, sizeof *ep);
if (!ep)
break;
int ret;
acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
+ lock_init(&port->lock);
DListInit(&port->ep_list);
ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
if (ret)
}
size = sizeof(*dev) + sizeof(struct acm_port) * attr.phys_port_cnt;
- dev = (struct acm_device *) zalloc(size);
+ dev = (struct acm_device *) calloc(1, size);
if (!dev)
goto err1;