]> git.openfabrics.org - ~shefty/ibacm.git/commitdiff
ib/acm: add state tracking
authorSean Hefty <sean.hefty@intel.com>
Thu, 22 Apr 2010 17:16:04 +0000 (10:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Thu, 22 Apr 2010 17:16:04 +0000 (10:16 -0700)
To support alternate address and route resolution protocols,
add state tracking to destinations.  This will allow for the
completion of address resolution separate from route
resolution.

As part of this change, we add locking to destinations,
rather than relying on the endpoint locking.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
linux/osd.h
src/acm.c
src/libacm.c
windows/osd.h

index fb6dc07e204c67cd79859afff3ce09b59ed35f09..6c74ad38c22a911cae0672c333a197be8ea26238 100644 (file)
 #endif\r
 #define ntohll(x) htonll(x)\r
 \r
-static inline void *zalloc(size_t size)\r
-{\r
-       void *buf;\r
-\r
-       if ((buf = malloc(size)))\r
-               memset(buf, 0, size);\r
-       return buf;\r
-}\r
-\r
 typedef struct { volatile int val; } atomic_t;\r
 #define atomic_inc(v) (__sync_fetch_and_add(&(v)->val, 1) + 1)\r
 #define atomic_dec(v) (__sync_fetch_and_sub(&(v)->val, 1) - 1)\r
@@ -100,19 +91,19 @@ static inline int event_wait(event_t *e, int timeout)
        return ret;\r
 }\r
 \r
-#define lock_t         pthread_mutex_t\r
-#define lock_init(x)   pthread_mutex_init(x, NULL)\r
-#define lock_acquire   pthread_mutex_lock\r
-#define lock_release   pthread_mutex_unlock\r
+#define lock_t       pthread_mutex_t\r
+#define lock_init(x) pthread_mutex_init(x, NULL)\r
+#define lock_acquire pthread_mutex_lock\r
+#define lock_release pthread_mutex_unlock\r
 \r
-#define osd_init()     0\r
+#define osd_init()  0\r
 #define osd_close()\r
 \r
-#define SOCKET int\r
-#define SOCKET_ERROR -1\r
+#define SOCKET         int\r
+#define SOCKET_ERROR   -1\r
 #define INVALID_SOCKET -1\r
 #define socket_errno() errno\r
-#define closesocket close\r
+#define closesocket    close\r
 \r
 static inline uint64_t time_stamp_us(void)\r
 {\r
index aedea19eec5b39ce3f7a28b41bb1d634bcf0363e..ce4385145731cb69227a615a08f5832ad88560cd 100644 (file)
--- a/src/acm.c
+++ b/src/acm.c
 #define MAX_EP_ADDR 4
 #define MAX_EP_MC   2
 
+enum acm_state {
+       ACM_INIT,
+       ACM_QUERY_ADDR,
+       ACM_ADDR_RESOLVED,
+       ACM_QUERY_ROUTE,
+       ACM_PROCESS_REQUEST,
+       ACM_READY
+};
+
 enum acm_addr_prot
 {
        ACM_ADDR_PROT_ACM
@@ -55,6 +64,9 @@ enum acm_route_prot
        ACM_ROUTE_PROT_SA
 };
 
+/*
+ * Nested locking order: dest -> ep
+ */
 struct acm_dest
 {
        uint8_t               address[ACM_MAX_ADDRESS]; /* keep first */
@@ -64,12 +76,17 @@ struct acm_dest
        union ibv_gid         mgid;
        DLIST_ENTRY           req_queue;
        uint32_t              remote_qpn;
+       lock_t                lock;
+       enum acm_state        state;
+       atomic_t              refcnt;
+       uint8_t               addr_type;
 };
 
 struct acm_port
 {
        struct acm_device   *dev;
        DLIST_ENTRY         ep_list;
+       lock_t              lock;
        int                 mad_portid;
        int                 mad_agentid;
        struct acm_dest     sa_dest;
@@ -116,6 +133,7 @@ struct acm_ep
        DLIST_ENTRY        pending_queue;
        DLIST_ENTRY        active_queue;
        DLIST_ENTRY        wait_queue;
+       enum acm_state     state;
 };
 
 struct acm_send_msg
@@ -187,7 +205,7 @@ static void acm_write(int level, const char *format, ...)
        va_end(args);
 }
 
-static void acm_log_ep_addr(int level, const char *msg, uint16_t type, uint8_t *data)
+static void acm_log_addr(int level, const char *msg, uint16_t addr_type, uint8_t *addr)
 {
        struct ib_path_record *path;
        char ip_addr[ACM_MAX_ADDRESS];
@@ -197,25 +215,25 @@ static void acm_log_ep_addr(int level, const char *msg, uint16_t type, uint8_t *
 
        lock_acquire(&log_lock);
        fprintf(flog, msg);
-       switch (type) {
+       switch (addr_type) {
        case ACM_EP_INFO_NAME:
-               fprintf(flog, "%s\n", data);
+               fprintf(flog, "%s\n", addr);
                break;
        case ACM_EP_INFO_ADDRESS_IP:
-               inet_ntop(AF_INET, data, ip_addr, ACM_MAX_ADDRESS);
+               inet_ntop(AF_INET, addr, ip_addr, ACM_MAX_ADDRESS);
                fprintf(flog, "%s\n", ip_addr);
                break;
        case ACM_EP_INFO_ADDRESS_IP6:
-               inet_ntop(AF_INET6, data, ip_addr, ACM_MAX_ADDRESS);
+               inet_ntop(AF_INET6, addr, ip_addr, ACM_MAX_ADDRESS);
                fprintf(flog, "%s\n", ip_addr);
                break;
        case ACM_EP_INFO_PATH:
-               path = (struct ib_path_record *) data;
+               path = (struct ib_path_record *) addr;
                fprintf(flog, "path record, SLID 0x%x, DLID 0x%x\n",
                        ntohs(path->slid), ntohs(path->dlid));
                break;
        default:
-               fprintf(flog, "unknown endpoint address 0x%x\n", type);
+               fprintf(flog, "unknown address 0x%x\n", addr_type);
        }
        lock_release(&log_lock);
 }
@@ -225,7 +243,7 @@ acm_alloc_send(struct acm_ep *ep, struct acm_dest *dest, size_t size)
 {
        struct acm_send_msg *msg;
 
-       msg = (struct acm_send_msg *) zalloc(sizeof *msg);
+       msg = (struct acm_send_msg *) calloc(1, sizeof *msg);
        if (!msg) {
                acm_log(0, "ERROR - unable to allocate send buffer\n");
                return NULL;
@@ -269,6 +287,7 @@ static void acm_post_send(struct acm_send_msg *msg)
        struct acm_ep *ep = msg->ep;
        struct ibv_send_wr *bad_wr;
 
+       lock_acquire(&ep->lock);
        if (ep->available_sends) {
                acm_log(2, "posting send to QP\n");
                ep->available_sends--;
@@ -278,6 +297,7 @@ static void acm_post_send(struct acm_send_msg *msg)
                acm_log(2, "no sends available, queuing message\n");
                DListInsertTail(&msg->entry, &ep->pending_queue);
        }
+       lock_release(&ep->lock);
 }
 
 static void acm_post_recv(struct acm_ep *ep, uint64_t address)
@@ -297,6 +317,7 @@ static void acm_post_recv(struct acm_ep *ep, uint64_t address)
        ibv_post_recv(ep->qp, &wr, &bad_wr);
 }
 
+/* Caller must hold ep lock */
 static void acm_send_available(struct acm_ep *ep)
 {
        struct acm_send_msg *msg;
@@ -464,8 +485,10 @@ static void acm_process_join_resp(struct acm_ep *ep, struct ib_user_mad *umad)
                ret = ibv_attach_mcast(ep->qp, &mc_rec->mgid, mc_rec->mlid);
                if (ret) {
                        acm_log(0, "ERROR - unable to attach QP to multicast group\n");
+               } else {
+                       dest->state = ACM_READY;
+                       acm_log(1, "join successful\n");
                }
-               acm_log(1, "join successful\n");
        } else {
                acm_log(0, "ERROR - MGID in join response not found\n");
        }
@@ -493,99 +516,134 @@ static int acm_addr_index(struct acm_ep *ep, uint8_t *addr, uint8_t addr_type)
        return -1;
 }
 
-/*
- * Multicast groups are ordered lowest to highest preference.
- */
-static int
-acm_record_av(struct acm_dest *dest, struct acm_ep *ep,
-       struct ibv_wc *wc, struct acm_resolve_rec *rec)
+static struct acm_dest *
+acm_alloc_dest(uint8_t addr_type, uint8_t *addr)
 {
-       int i, index;
+       struct acm_dest *dest;
 
-       acm_log(2, "\n");
-       for (i = min(rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
-               index = acm_mc_index(ep, &rec->gid[i]);
-               if (index >= 0) {
-                       acm_log(2, "selecting MC group at index %d\n", index);
-                       dest->av = ep->mc_dest[index].av;
-                       dest->av.dlid = wc->slid;
-                       dest->av.src_path_bits = wc->dlid_path_bits;
-                       dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
-                       
-                       dest->mgid = ep->mc_dest[index].mgid;
-
-                       dest->path = ep->mc_dest[index].path;
-                       dest->path.dgid = dest->av.grh.dgid;
-                       dest->path.dlid = htons(dest->av.dlid);
-                       return ACM_STATUS_SUCCESS;
-               }
+       acm_log(1, "\n");
+       dest = calloc(1, sizeof *dest);
+       if (!dest) {
+               acm_log(0, "ERROR - unable to allocate dest\n");
+               return NULL;
        }
 
-       return ACM_STATUS_ENODATA;
+       memcpy(dest->address, addr, ACM_MAX_ADDRESS);
+       dest->addr_type = addr_type;
+       DListInit(&dest->req_queue);
+       atomic_set(&dest->refcnt, 1);
+       lock_init(&dest->lock);
+
+       return dest;
 }
 
-/* 
- * Record the source of a resolve request.  Use the source QPN to see if
- * the remote service has relocated and we need to update our cache.
- */
+/* Caller must hold ep lock. */
 static struct acm_dest *
-acm_record_src(struct acm_ep *ep, struct ibv_wc *wc, struct acm_resolve_rec *rec)
+acm_get_dest(struct acm_ep *ep, uint8_t addr_type, uint8_t *addr)
 {
        struct acm_dest *dest, **tdest;
-       int ret;
 
        acm_log(2, "\n");
-       lock_acquire(&ep->lock);
-       tdest = tfind(rec->src, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
-       if (!tdest) {
-               acm_log(2, "creating new dest\n");
-               dest = zalloc(sizeof *dest);
-               if (!dest) {
-                       acm_log(0, "ERROR - unable to allocate dest\n");
-                       goto unlock;
-               }
-
-               memcpy(dest->address, rec->src, ACM_MAX_ADDRESS);
-               DListInit(&dest->req_queue);
-               tsearch(dest, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
-       } else {
+       tdest = tfind(addr, &ep->dest_map[addr_type - 1], acm_compare_dest);
+       if (tdest) {
                dest = *tdest;
+               (void) atomic_inc(&dest->refcnt);
+       } else {
+               dest = NULL;
        }
+       return dest;
+}
 
-       if (dest->ah) {
-               if (dest->remote_qpn == wc->src_qp)
-                       goto unlock;
+static void
+acm_put_dest(struct acm_dest *dest)
+{
+       if (atomic_dec(&dest->refcnt) == 0) {
+               free(dest);
+       }
+}
 
-               ibv_destroy_ah(dest->ah); // TODO: ah could be in use
-               dest->ah = NULL;
+static struct acm_dest *
+acm_acquire_dest(struct acm_ep *ep, uint8_t addr_type, uint8_t *addr)
+{
+       struct acm_dest *dest;
+
+       acm_log_addr(2, "acm_acquire_dest: ", addr_type, addr);
+       lock_acquire(&ep->lock);
+       dest = acm_get_dest(ep, addr_type, addr);
+       if (!dest) {
+               dest = acm_alloc_dest(addr_type, addr);
+               if (dest) {
+                       tsearch(dest, &ep->dest_map[addr_type - 1], acm_compare_dest);
+                       (void) atomic_inc(&dest->refcnt);
+               }
        }
+       lock_release(&ep->lock);
+       return dest;
+}
 
-       acm_log(2, "creating address handle\n");
-       ret = acm_record_av(dest, ep, wc, rec);
-       if (ret) {
-               acm_log(0, "ERROR - failed to record av\n");
-               goto err;
+/* Caller must hold ep lock. */
+//static void
+//acm_remove_dest(struct acm_ep *ep, struct acm_dest *dest)
+//{
+//     acm_log_addr(2, "acm_remove_dest: ", dest->addr_type, dest->addr);
+//     tdelete(dest->address, &ep->dest_map[dest->addr_type - 1], acm_compare_dest);
+//     acm_put_dest(dest);
+//}
+
+/*
+ * Multicast groups are ordered lowest to highest preference.
+ */
+static uint8_t
+acm_record_acm_route(struct acm_ep *ep, struct acm_dest *dest, struct ibv_wc *wc,
+       struct acm_resolve_rec *rec)
+{
+       int i, index;
+
+       acm_log(2, "\n");
+       for (i = min(rec->gid_cnt, ACM_MAX_GID_COUNT) - 1; i >= 0; i--) {
+               index = acm_mc_index(ep, &rec->gid[i]);
+               if (index >= 0) {
+                       goto found;
+               }
        }
 
+       acm_log(0, "ERROR - no shared multicast groups\n");
+       dest->state = ACM_INIT;
+       return ACM_STATUS_ENODATA;
+
+found:
+       acm_log(2, "selecting MC group at index %d\n", index);
+       dest->av = ep->mc_dest[index].av;
+       dest->av.dlid = wc->slid;
+       dest->av.src_path_bits = wc->dlid_path_bits;
+       dest->av.grh.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
+       
+       dest->mgid = ep->mc_dest[index].mgid;
+
+       dest->path = ep->mc_dest[index].path;
+       dest->path.dgid = dest->av.grh.dgid;
+       dest->path.dlid = htons(dest->av.dlid);
+
        dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
        if (!dest->ah) {
                acm_log(0, "ERROR - failed to create ah\n");
-               goto err;
+               dest->state = ACM_INIT;
+               return ACM_STATUS_ENOMEM;
        }
 
        dest->remote_qpn = wc->src_qp;
+       dest->state = ACM_READY;
+       return ACM_STATUS_SUCCESS;
 
-unlock:
-       lock_release(&ep->lock);
-       return dest;
+}
 
-err:
-       if (!tdest) {
-               tdelete(dest->address, &ep->dest_map[rec->src_type - 1], acm_compare_dest);
-               free(dest);
-       }
-       lock_release(&ep->lock);
-       return NULL;
+static void
+acm_record_acm_addr(struct acm_dest *dest, struct ibv_wc *wc)
+{
+       acm_log(2, "\n");
+       dest->path.dgid = ((struct ibv_grh *) (uintptr_t) wc->wr_id)->sgid;
+       dest->remote_qpn = wc->src_qp;
+       dest->state = ACM_ADDR_RESOLVED;
 }
 
 static void acm_init_resp_mad(struct acm_mad *resp, struct acm_mad *req)
@@ -599,7 +657,7 @@ static void acm_init_resp_mad(struct acm_mad *resp, struct acm_mad *req)
        resp->tid = req->tid;
 }
 
-static int acm_validate_resolve_req(struct acm_mad *mad)
+static int acm_validate_addr_req(struct acm_mad *mad)
 {
        struct acm_resolve_rec *rec;
 
@@ -618,39 +676,23 @@ static int acm_validate_resolve_req(struct acm_mad *mad)
 }
 
 static void
-acm_process_resolve_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
+acm_send_addr_resp(struct acm_ep *ep, struct acm_dest *dest, struct acm_mad *mad)
 {
        struct acm_resolve_rec *rec, *resp_rec;
-       struct acm_dest *dest;
        struct acm_send_msg *msg;
        struct acm_mad *resp_mad;
 
        acm_log(2, "\n");
-       if (acm_validate_resolve_req(mad)) {
-               acm_log(0, "ERROR - invalid request\n");
-               return;
-       }
-
-       rec = (struct acm_resolve_rec *) mad->data;
-       dest = acm_record_src(ep, wc, rec);
-       if (!dest) {
-               acm_log(0, "ERROR - failed to record source\n");
-               return;
-       }
-
-       if (acm_addr_index(ep, rec->dest, rec->dest_type) < 0) {
-               acm_log(2, "no matching address - discarding\n");
-               return;
-       }
-
        msg = acm_alloc_send(ep, dest, sizeof (*resp_mad));
        if (!msg) {
                acm_log(0, "ERROR - failed to allocate message\n");
                return;
        }
 
+       rec = (struct acm_resolve_rec *) mad->data;
        resp_mad = (struct acm_mad *) msg->data;
        resp_rec = (struct acm_resolve_rec *) resp_mad->data;
+
        acm_init_resp_mad(resp_mad, mad);
        resp_rec->dest_type = rec->src_type;
        resp_rec->dest_length = rec->src_length;
@@ -661,15 +703,12 @@ acm_process_resolve_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *ma
        memcpy(resp_rec->src, rec->dest, ACM_MAX_ADDRESS);
        memcpy(resp_rec->gid, dest->mgid.raw, sizeof(union ibv_gid));
 
-       acm_log(2, "sending resolve response\n");
-       lock_acquire(&ep->lock);
        acm_post_send(msg);
-       lock_release(&ep->lock);
 }
 
 static int
-acm_client_resolve_resp(struct acm_ep *ep, struct acm_client *client,
-       struct acm_resolve_msg *req_msg, struct acm_dest *dest, uint8_t status)
+acm_client_resolve_resp(struct acm_client *client, struct acm_resolve_msg *req_msg,
+       struct acm_dest *dest, uint8_t status)
 {
        struct acm_msg msg;
        struct acm_resolve_msg *resp_msg = (struct acm_resolve_msg *) &msg;
@@ -690,22 +729,21 @@ acm_client_resolve_resp(struct acm_ep *ep, struct acm_client *client,
        resp_msg->hdr.status = status;
        resp_msg->hdr.length = ACM_MSG_HDR_LENGTH;
        memset(resp_msg->hdr.reserved, 0, sizeof(resp_msg->hdr.reserved));
-       if (status)
-               goto send_resp;
-
-       resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
-       resp_msg->data[0].flags = IB_PATH_FLAG_GMP |
-               IB_PATH_FLAG_PRIMARY | IB_PATH_FLAG_BIDIRECTIONAL;
-       resp_msg->data[0].type = ACM_EP_INFO_PATH;
-       resp_msg->data[0].info.path = dest->path;
 
-       if (req_msg->hdr.src_out) {
+       if (status == ACM_STATUS_SUCCESS) {
                resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
-               memcpy(&resp_msg->data[1], &req_msg->data[req_msg->hdr.src_out],
-                       ACM_MSG_EP_LENGTH);
+               resp_msg->data[0].flags = IB_PATH_FLAG_GMP |
+                       IB_PATH_FLAG_PRIMARY | IB_PATH_FLAG_BIDIRECTIONAL;
+               resp_msg->data[0].type = ACM_EP_INFO_PATH;
+               resp_msg->data[0].info.path = dest->path;
+
+               if (req_msg->hdr.src_out) {
+                       resp_msg->hdr.length += ACM_MSG_EP_LENGTH;
+                       memcpy(&resp_msg->data[1], &req_msg->data[req_msg->hdr.src_out],
+                               ACM_MSG_EP_LENGTH);
+               }
        }
 
-send_resp:
        ret = send(client->sock, (char *) resp_msg, resp_msg->hdr.length, 0);
        if (ret != resp_msg->hdr.length)
                acm_log(0, "failed to send response\n");
@@ -718,85 +756,114 @@ release:
        return ret;
 }
 
-static struct acm_dest *
-acm_record_dest(struct acm_ep *ep, struct ibv_wc *wc,
-       struct acm_resolve_rec *req_rec, struct acm_resolve_rec *resp_rec)
+static void
+acm_complete_queued_req(struct acm_dest *dest, uint8_t status)
 {
-       struct acm_dest *dest, **tdest;
-       int ret;
+       struct acm_request *client_req;
+       DLIST_ENTRY *entry;
 
-       acm_log(2, "\n");
-       lock_acquire(&ep->lock);
-       tdest = tfind(req_rec->dest, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
-       if (!tdest) {
-               dest = NULL;
-               goto unlock;
+       acm_log(2, "status %d\n", status);
+       lock_acquire(&dest->lock);
+       while (!DListEmpty(&dest->req_queue)) {
+               entry = dest->req_queue.Next;
+               DListRemove(entry);
+               client_req = container_of(entry, struct acm_request, entry);
+               lock_release(&dest->lock);
+               acm_log(2, "completing client request\n");
+               acm_client_resolve_resp(client_req->client,
+                       (struct acm_resolve_msg *) &client_req->msg, dest, status);
+               lock_acquire(&dest->lock);
        }
+       lock_release(&dest->lock);
+}
 
-       dest = *tdest;
-       if (dest->ah)
-               goto unlock;
+static void
+acm_process_addr_req(struct acm_ep *ep, struct ibv_wc *wc, struct acm_mad *mad)
+{
+       struct acm_resolve_rec *rec;
+       struct acm_dest *dest;
+       uint8_t status;
 
-       acm_log(2, "creating address handle\n");
-       ret = acm_record_av(dest, ep, wc, resp_rec);
-       if (ret) {
-               acm_log(0, "ERROR - failed to record av\n");
-               goto unlock;
+       acm_log(2, "\n");
+       if (acm_validate_addr_req(mad)) {
+               acm_log(0, "ERROR - invalid request\n");
+               return;
        }
 
-       dest->ah = ibv_create_ah(ep->port->dev->pd, &dest->av);
-       if (!dest->ah) {
-               acm_log(0, "ERROR - failed to create ah\n");
-               goto unlock;
+       rec = (struct acm_resolve_rec *) mad->data;
+       dest = acm_acquire_dest(ep, rec->src_type, rec->src);
+       if (!dest) {
+               acm_log(0, "ERROR - unable to add source\n");
+               return;
        }
 
-       dest->remote_qpn = wc->src_qp;
+       lock_acquire(&dest->lock);
+       switch (dest->state) {
+       case ACM_READY:
+               if (dest->remote_qpn == wc->src_qp) {
+                       lock_release(&dest->lock);
+                       break;
+               }
 
-unlock:
-       lock_release(&ep->lock);
-       return dest;
+               ibv_destroy_ah(dest->ah); // TODO: ah could be in use
+               /* fall through */
+       default:
+               acm_record_acm_addr(dest, wc);
+               status = acm_record_acm_route(ep, dest, wc, rec);
+               lock_release(&dest->lock);
+               if (status) {
+                       acm_log(0, "ERROR - failed to record route\n");
+                       goto put;
+               }
+               acm_complete_queued_req(dest, status);
+       }
+
+       if (acm_addr_index(ep, rec->dest, rec->dest_type) >= 0) {
+               acm_send_addr_resp(ep, dest, mad);
+       }
+put:
+       acm_put_dest(dest);
 }
 
 static void
-acm_process_resolve_resp(struct acm_ep *ep, struct ibv_wc *wc,
-       struct acm_send_msg *msg, struct acm_mad *mad)
+acm_process_addr_resp(struct acm_send_msg *msg, struct ibv_wc *wc, struct acm_mad *mad)
 {
        struct acm_resolve_rec *req_rec, *resp_rec;
        struct acm_dest *dest;
-       struct acm_request *client_req;
-       DLIST_ENTRY *entry;
        uint8_t status;
 
-       status = acm_class_status(mad->status);
+       if (mad) {
+               status = acm_class_status(mad->status);
+               resp_rec = (struct acm_resolve_rec *) mad->data;
+       } else {
+               status = ACM_STATUS_ETIMEDOUT;
+               resp_rec = NULL;
+       }
        acm_log(2, "resp status 0x%x\n", status);
        req_rec = (struct acm_resolve_rec *) ((struct acm_mad *) msg->data)->data;
-       resp_rec = (struct acm_resolve_rec *) mad->data;
 
-       dest = acm_record_dest(ep, wc, req_rec, resp_rec);
+       dest = acm_acquire_dest(msg->ep, req_rec->dest_type, req_rec->dest);
        if (!dest) {
-               acm_log(0, "ERROR - cannot record dest\n");
+               acm_log(0, "ERROR - cannot record addr resp\n");
                return;
        }
 
-       if (!status && !dest->ah)
-               status = ACM_STATUS_EINVAL;
-
-       lock_acquire(&ep->lock);
-       while (!DListEmpty(&dest->req_queue)) {
-               entry = dest->req_queue.Next;
-               DListRemove(entry);
-               client_req = container_of(entry, struct acm_request, entry);
-               lock_release(&ep->lock);
-               acm_log(2, "completing queued client request\n");
-               acm_client_resolve_resp(ep, client_req->client,
-                       (struct acm_resolve_msg *) &client_req->msg, dest, status);
-               lock_acquire(&ep->lock);
+       lock_acquire(&dest->lock);
+       if (dest->state != ACM_QUERY_ADDR) {
+               lock_release(&dest->lock);
+               return;
        }
-       if (status) {
-               acm_log(0, "resp failed 0x%x\n", status);
-               tdelete(dest->address, &ep->dest_map[req_rec->dest_type - 1], acm_compare_dest);
+
+       if (!status) {
+               acm_record_acm_addr(dest, wc);
+               status = acm_record_acm_route(msg->ep, dest, wc, resp_rec);
+       } else {
+               dest->state = ACM_INIT;
        }
-       lock_release(&ep->lock);
+       lock_release(&dest->lock);
+
+       acm_complete_queued_req(dest, status);
+       acm_put_dest(dest);
 }
 
 static int acm_validate_recv(struct acm_mad *mad)
@@ -841,12 +908,12 @@ static void acm_process_recv(struct acm_ep *ep, struct ibv_wc *wc)
                        goto out;
                }
                acm_log(2, "found matching request\n");
-               acm_process_resolve_resp(ep, wc, req, mad);
+               acm_process_addr_resp(req, wc, mad);
                if (free)
                        acm_free_send(req);
        } else {
                acm_log(2, "unsolicited request\n");
-               acm_process_resolve_req(ep, wc, mad);
+               acm_process_addr_req(ep, wc, mad);
                free = 0;
        }
 
@@ -1018,7 +1085,7 @@ static void acm_join_group(struct acm_ep *ep, union ibv_gid *port_gid,
 
        acm_log(2, "\n");
        len = sizeof(*umad) + sizeof(*mad);
-       umad = (struct ib_user_mad *) zalloc(len);
+       umad = (struct ib_user_mad *) calloc(1, len);
        if (!umad) {
                acm_log(0, "ERROR - unable to allocate MAD for join\n");
                return;
@@ -1084,6 +1151,10 @@ static void acm_port_join(void *context)
 
                ep = container_of(ep_entry, struct acm_ep, entry);
                acm_join_group(ep, &port_gid, 0, 0, 0, min_rate, min_mtu);
+
+               if ((ep->state = ep->mc_dest[0].state) != ACM_READY)
+                       continue;
+
                if (port->rate != min_rate || port->mtu != min_mtu)
                        acm_join_group(ep, &port_gid, 0, 0, 0, port->rate, port->mtu);
        }
@@ -1122,49 +1193,17 @@ static void acm_process_timeouts(void)
 {
        DLIST_ENTRY *entry;
        struct acm_send_msg *msg;
-       struct acm_mad *mad;
        struct acm_resolve_rec *rec;
-       struct acm_dest *dest, **tdest;
-       struct acm_request *req;
-       struct acm_ep *ep;
        
        while (!DListEmpty(&timeout_list)) {
                entry = timeout_list.Next;
                DListRemove(entry);
 
                msg = container_of(entry, struct acm_send_msg, entry);
-               mad = (struct acm_mad *) msg->data;
+               rec = (struct acm_resolve_rec *) ((struct acm_mad *) msg->data)->data;
 
-               rec = (struct acm_resolve_rec *) mad->data;
-               ep = msg->ep;
-
-               acm_log_ep_addr(0, "acm_process_timeouts: dest ", rec->dest_type, rec->dest);
-               lock_acquire(&ep->lock);
-               tdest = tfind(rec->dest, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
-               if (!tdest) {
-                       acm_log(0, "destination already removed\n");
-                       lock_release(&ep->lock);
-                       continue;
-               } else {
-                       dest = *tdest;
-               }
-
-               acm_log(2, "failing pending client requests\n");
-               while (!DListEmpty(&dest->req_queue)) {
-                       entry = dest->req_queue.Next;
-                       DListRemove(entry);
-               
-                       req = container_of(entry, struct acm_request, entry);
-                       lock_release(&ep->lock);
-                       acm_client_resolve_resp(ep, req->client,
-                               (struct acm_resolve_msg *) &req->msg, dest,
-                               ACM_STATUS_ETIMEDOUT);
-                       lock_acquire(&ep->lock);
-               }
-
-               acm_log(2, "resolve timed out, releasing destination\n");
-               tdelete(dest->address, &ep->dest_map[rec->dest_type - 1], acm_compare_dest);
-               lock_release(&ep->lock);
+               acm_log_addr(0, "acm_process_timeouts: dest ", rec->dest_type, rec->dest);
+               acm_process_addr_resp(msg, NULL, NULL);
        }
 }
 
@@ -1281,7 +1320,7 @@ static int acm_listen(void)
        return 0;
 }
 
-static void acm_release_client(struct acm_client *client)
+static void acm_disconnect_client(struct acm_client *client)
 {
        lock_acquire(&client->lock);
        shutdown(client->sock, SHUT_RDWR);
@@ -1348,7 +1387,7 @@ release:
        return ret;
 }
 
-// TODO: process send/recv asynchronously
+/* SA queries are serialized to avoid swamping SA. */
 static uint8_t acm_query_sa(struct acm_ep *ep, struct ib_path_record *path)
 {
        struct acm_port *port;
@@ -1358,7 +1397,7 @@ static uint8_t acm_query_sa(struct acm_ep *ep, struct ib_path_record *path)
 
        acm_log(2, "\n");
        len = sizeof(*umad) + sizeof(*mad);
-       umad = (struct ib_user_mad *) zalloc(len);
+       umad = (struct ib_user_mad *) calloc(1, len);
        if (!umad) {
                acm_log(0, "ERROR - unable to allocate MAD\n");
                return ACM_STATUS_ENOMEM;
@@ -1375,6 +1414,7 @@ static uint8_t acm_query_sa(struct acm_ep *ep, struct ib_path_record *path)
        mad = (struct ib_sa_mad *) umad->data;
        acm_init_path_query(mad, path);
 
+       lock_acquire(&port->lock);
        ret = umad_send(port->mad_portid, port->mad_agentid, (void *) umad,
                sizeof(*mad), timeout, retries);
        if (ret) {
@@ -1396,6 +1436,7 @@ static uint8_t acm_query_sa(struct acm_ep *ep, struct ib_path_record *path)
                ret = ((uint8_t) ret) ? ret : -1;
        }
 out:
+       lock_release(&port->lock);
        free(umad);
        return (uint8_t) ret;
 }
@@ -1409,7 +1450,7 @@ acm_get_ep(struct acm_ep_addr_data *data)
        DLIST_ENTRY *dev_entry, *ep_entry;
        int i;
 
-       acm_log_ep_addr(2, "acm_get_ep: ", data->type, data->info.addr);
+       acm_log_addr(2, "acm_get_ep: ", data->type, data->info.addr);
        for (dev_entry = dev_list.Next; dev_entry != &dev_list;
                 dev_entry = dev_entry->Next) {
 
@@ -1425,6 +1466,9 @@ acm_get_ep(struct acm_ep_addr_data *data)
                                 ep_entry = ep_entry->Next) {
 
                                ep = container_of(ep_entry, struct acm_ep, entry);
+                               if (ep->state != ACM_READY)
+                                       continue;
+
                                if (data->type == ACM_EP_INFO_PATH)
                                        return ep; // TODO: check pkey
 
@@ -1435,7 +1479,7 @@ acm_get_ep(struct acm_ep_addr_data *data)
                }
        }
 
-       acm_log_ep_addr(0, "acm_get_ep: could not find ", data->type, data->info.addr);
+       acm_log_addr(0, "acm_get_ep: could not find ", data->type, data->info.addr);
        return NULL;
 }
 
@@ -1466,9 +1510,7 @@ acm_svr_query(struct acm_client *client, struct acm_resolve_msg *msg)
        }
 
        (void) atomic_inc(&client->refcnt);
-       lock_acquire(&ep->lock);
        status = acm_query_sa(ep, &msg->data[0].info.path);
-       lock_release(&ep->lock);
 
 resp:
        return acm_client_query_resp(ep, client, msg, status);
@@ -1484,11 +1526,6 @@ acm_send_resolve(struct acm_ep *ep, struct acm_ep_addr_data *saddr,
        int i;
 
        acm_log(2, "\n");
-       if (!ep->mc_dest[0].ah) {
-               acm_log(0, "ERROR - multicast group not ready\n");
-               return ACM_STATUS_ENOTCONN;
-       }
-
        msg = acm_alloc_send(ep, &ep->mc_dest[0], sizeof(struct acm_mad));
        if (!msg) {
                acm_log(0, "ERROR - cannot allocate send msg\n");
@@ -1520,7 +1557,7 @@ acm_send_resolve(struct acm_ep *ep, struct acm_ep_addr_data *saddr,
        return 0;
 }
 
-static int acm_select_src(struct acm_ep_addr_data *src, struct acm_ep_addr_data *dst)
+static int acm_svr_select_src(struct acm_ep_addr_data *src, struct acm_ep_addr_data *dst)
 {
        struct sockaddr_in6 addr;
        socklen_t len;
@@ -1640,90 +1677,91 @@ acm_svr_verify_resolve(struct acm_resolve_msg *msg,
        return ACM_STATUS_SUCCESS;
 }
 
+/* Caller must hold dest lock */
+static uint8_t
+acm_svr_queue_req(struct acm_dest *dest, struct acm_client *client,
+       struct acm_resolve_msg *msg)
+{
+       struct acm_request *req;
+
+       acm_log(2, "\n");
+       req = calloc(1, sizeof *req);
+       if (!req) {
+               acm_log(0, "ERROR - unable to queue client request\n");
+               return ACM_STATUS_ENOMEM;
+       }
+
+       (void) atomic_inc(&client->refcnt);
+       req->client = client;
+       memcpy(&req->msg, msg, sizeof(req->msg));
+       DListInsertTail(&req->entry, &dest->req_queue);
+       return ACM_STATUS_SUCCESS;
+}
+
 static int
 acm_svr_resolve(struct acm_client *client, struct acm_resolve_msg *msg)
 {
        struct acm_ep *ep;
-       struct acm_dest *dest, **tdest;
-       struct acm_request *req;
+       struct acm_dest *dest;
        struct acm_ep_addr_data *saddr, *daddr;
        uint8_t status;
+       int ret;
 
        status = acm_svr_verify_resolve(msg, &saddr, &daddr);
        if (status) {
                acm_log(0, "misformatted or unsupported request\n");
-               goto resp;
+               return acm_client_resolve_resp(client, msg, NULL, status);
        }
 
-       status = acm_select_src(saddr, daddr);
+       status = acm_svr_select_src(saddr, daddr);
        if (status) {
                acm_log(0, "unable to select suitable source address\n");
-               goto resp;
+               return acm_client_resolve_resp(client, msg, NULL, status);
        }
 
-       acm_log_ep_addr(2, "acm_svr_resolve: source ", saddr->type, saddr->info.addr);
+       acm_log_addr(2, "acm_svr_resolve: source ", saddr->type, saddr->info.addr);
        ep = acm_get_ep(saddr);
        if (!ep) {
                acm_log(0, "unknown local end point\n");
-               status = ACM_STATUS_ESRCADDR;
-               goto resp;
+               return acm_client_resolve_resp(client, msg, NULL, ACM_STATUS_ESRCADDR);
        }
 
-       acm_log_ep_addr(2, "acm_svr_resolve: dest ", daddr->type, daddr->info.addr);
-       (void) atomic_inc(&client->refcnt);
-       lock_acquire(&ep->lock);
-       tdest = tfind(&daddr->info.addr, &ep->dest_map[daddr->type - 1], acm_compare_dest);
-       dest = tdest ? *tdest : NULL;
-       if (dest && dest->ah) {
-               acm_log(2, "request satisfied from local cache\n");
-               status = ACM_STATUS_SUCCESS;
-               goto release;
-       }
-
-       req = zalloc(sizeof *req);
-       if (!req) {
-               acm_log(0, "ERROR - unable to allocate memory to queue client request\n");
-               status = ACM_STATUS_ENOMEM;
-               goto release;
-       }
+       acm_log_addr(2, "acm_svr_resolve: dest ", daddr->type, daddr->info.addr);
 
+       dest = acm_acquire_dest(ep, daddr->type, daddr->info.addr);
        if (!dest) {
-               acm_log(2, "adding new destination\n");
-               dest = zalloc(sizeof *dest);
-               if (!dest) {
-                       acm_log(0, "ERROR - unable to allocate destination in client request\n");
-                       status = ACM_STATUS_ENOMEM;
-                       goto free_req;
-               }
+               acm_log(0, "ERROR - unable to allocate destination in client request\n");
+               return acm_client_resolve_resp(client, msg, NULL, ACM_STATUS_ENOMEM);
+       }
 
-               memcpy(dest->address, &daddr->info.addr, ACM_MAX_ADDRESS);
+       lock_acquire(&dest->lock);
+       switch (dest->state) {
+       case ACM_READY:
+               acm_log(2, "request satisfied from local cache\n");
+               status = ACM_STATUS_SUCCESS;
+               break;
+       case ACM_INIT:
                acm_log(2, "sending resolve msg to dest\n");
                status = acm_send_resolve(ep, saddr, daddr);
                if (status) {
-                       acm_log(0, "ERROR - failure sending resolve request 0x%x\n", status);
-                       goto free_dest;
+                       break;
                }
-
-               DListInit(&dest->req_queue);
-               tsearch(dest, &ep->dest_map[daddr->type - 1], acm_compare_dest);
+               dest->state = ACM_QUERY_ADDR;
+               /* fall through */
+       default:
+               status = acm_svr_queue_req(dest, client, msg);
+               if (status) {
+                       break;
+               }
+               ret = 0;
+               lock_release(&dest->lock);
+               goto put;
        }
-
-       acm_log(2, "queuing client request\n");
-       req->client = client;
-       memcpy(&req->msg, msg, sizeof(req->msg));
-       DListInsertTail(&req->entry, &dest->req_queue);
-       lock_release(&ep->lock);
-       return 0;
-
-free_dest:
-       free(dest);
-       dest = NULL;
-free_req:
-       free(req);
-release:
-       lock_release(&ep->lock);
-resp:
-       return acm_client_resolve_resp(ep, client, msg, dest, status);
+       lock_release(&dest->lock);
+       ret = acm_client_resolve_resp(client, msg, dest, status);
+put:
+       acm_put_dest(dest);
+       return ret;
 }
 
 static void acm_svr_receive(struct acm_client *client)
@@ -1758,7 +1796,7 @@ static void acm_svr_receive(struct acm_client *client)
 
 out:
        if (ret)
-               acm_release_client(client);
+               acm_disconnect_client(client);
 }
 
 static void acm_server(void)
@@ -2085,7 +2123,7 @@ static void acm_activate_port(struct acm_port *port)
                port->port_num);
 
        for (i = 0; i < port->pkey_cnt; i++) {
-               ep = zalloc(sizeof *ep);
+               ep = calloc(1, sizeof *ep);
                if (!ep)
                        break;
 
@@ -2168,6 +2206,7 @@ static void acm_init_port(struct acm_port *port)
        int ret;
 
        acm_log(1, "%s %d\n", port->dev->verbs->device->name, port->port_num);
+       lock_init(&port->lock);
        DListInit(&port->ep_list);
        ret = ibv_query_port(port->dev->verbs, port->port_num, &attr);
        if (ret)
@@ -2222,7 +2261,7 @@ static void acm_open_dev(struct ibv_device *ibdev)
        }
 
        size = sizeof(*dev) + sizeof(struct acm_port) * attr.phys_port_cnt;
-       dev = (struct acm_device *) zalloc(size);
+       dev = (struct acm_device *) calloc(1, size);
        if (!dev)
                goto err1;
 
index bda083e82965d27a77660dd30176f8d61e06d83c..357ac53813a125c6638d9a46c5cd72cafdd25c2e 100644 (file)
@@ -104,7 +104,7 @@ static int acm_format_resp(struct acm_resolve_msg *msg,
        addr_cnt = (msg->hdr.length - ACM_MSG_HDR_LENGTH) /
                sizeof(struct acm_ep_addr_data);
        path_data = (struct ib_path_data *)
-               zalloc(addr_cnt * sizeof(struct ib_path_data));
+               calloc(1, addr_cnt * sizeof(struct ib_path_data));
        if (!path_data)
                return -1;
 
index dd90da4f7e57e9da8786ab7bd27a243f14ad82e6..10e5e18a89f0cf30f39219944f2d903d6e9ea305 100644 (file)
 #define LIB_DESTRUCTOR\r
 #define CDECL_FUNC __cdecl\r
 \r
-static __inline void *zalloc(size_t size)\r
-{\r
-       void *buf;\r
-\r
-       if ((buf = malloc(size)))\r
-               memset(buf, 0, size);\r
-       return buf;\r
-}\r
-\r
 typedef struct { volatile LONG val; } atomic_t;\r
 #define atomic_inc(v) InterlockedIncrement(&(v)->val)\r
 #define atomic_dec(v) InterlockedDecrement(&(v)->val)\r
 #define atomic_get(v) ((v)->val)\r
 #define atomic_set(v, s) ((v)->val = s)\r
 \r
-#define event_t                        HANDLE\r
-#define event_init(e)  *(e) = CreateEvent(NULL, FALSE, FALSE, NULL)\r
-#define event_signal(e)        SetEvent(*(e))\r
+#define event_t          HANDLE\r
+#define event_init(e)    *(e) = CreateEvent(NULL, FALSE, FALSE, NULL)\r
+#define event_signal(e)  SetEvent(*(e))\r
 #define event_wait(e, t) WaitForSingleObject(*(e), t)  \r
 \r
-#define lock_t                 CRITICAL_SECTION\r
-#define lock_init              InitializeCriticalSection\r
-#define lock_acquire   EnterCriticalSection\r
-#define lock_release   LeaveCriticalSection\r
+#define lock_t       CRITICAL_SECTION\r
+#define lock_init    InitializeCriticalSection\r
+#define lock_acquire EnterCriticalSection\r
+#define lock_release LeaveCriticalSection\r
 \r
 static __inline int osd_init()\r
 {\r
@@ -75,7 +66,7 @@ static __inline void osd_close()
        WSACleanup();\r
 }\r
 \r
-#define stricmp _stricmp\r
+#define stricmp  _stricmp\r
 #define strnicmp _strnicmp\r
 \r
 #define socket_errno WSAGetLastError\r