int retries;
};
+struct list_head {
+ struct list_head *prev;
+ struct list_head *next;
+ struct rdma_cm_id *id;
+};
+
+struct work_list {
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ struct list_head list;
+};
+
+#define INIT_LIST(x) ((x)->prev = (x)->next = (x))
+
+static struct work_list req_work;
+static struct work_list disc_work;
static struct node *nodes;
static struct timeval times[STEP_CNT][2];
static int connections = 100;
-static int left[STEP_CNT];
+static volatile int started[STEP_CNT];
+static volatile int completed[STEP_CNT];
static struct ibv_qp_init_attr init_qp_attr;
static struct rdma_conn_param conn_param;
#define start_time(s) gettimeofday(×[s][0], NULL)
#define end_time(s) gettimeofday(×[s][1], NULL)
+static inline void __list_delete(struct list_head *list)
+{
+ struct list_head *prev, *next;
+ prev = list->prev;
+ next = list->next;
+ prev->next = next;
+ next->prev = prev;
+ INIT_LIST(list);
+}
+
+static inline int __list_empty(struct work_list *list)
+{
+ return list->list.next == &list->list;
+}
+
+static inline int list_empty(struct work_list *work_list)
+{
+ pthread_mutex_lock(&work_list->lock);
+ return work_list->list.next == &work_list->list;
+ pthread_mutex_unlock(&work_list->lock);
+}
+
+static inline struct list_head *__list_remove_head(struct work_list *work_list)
+{
+ struct list_head *list_item;
+
+ list_item = work_list->list.next;
+ __list_delete(list_item);
+ return list_item;
+}
+
+static inline struct list_head *list_remove_head(struct work_list *work_list)
+{
+ struct list_head *list_item;
+ pthread_mutex_lock(&work_list->lock);
+ list_item = __list_remove_head(work_list);
+ pthread_mutex_unlock(&work_list->lock);
+ return list_item;
+}
+
+static inline void list_add_tail(struct work_list *work_list, struct list_head *req)
+{
+ int empty;
+ pthread_mutex_lock(&work_list->lock);
+ empty = __list_empty(work_list);
+ req->prev = work_list->list.prev;
+ req->next = &work_list->list;
+ req->prev->next = work_list->list.prev = req;
+ pthread_mutex_unlock(&work_list->lock);
+ if (empty)
+ pthread_cond_signal(&work_list->cond);
+}
+
static int zero_time(struct timeval *t)
{
return !(t->tv_sec || t->tv_usec);
static void addr_handler(struct node *n)
{
end_perf(n, STEP_RESOLVE_ADDR);
- left[STEP_RESOLVE_ADDR]--;
+ completed[STEP_RESOLVE_ADDR]++;
}
static void route_handler(struct node *n)
{
end_perf(n, STEP_RESOLVE_ROUTE);
- left[STEP_RESOLVE_ROUTE]--;
+ completed[STEP_RESOLVE_ROUTE]++;
}
static void conn_handler(struct node *n)
{
end_perf(n, STEP_CONNECT);
- left[STEP_CONNECT]--;
+ completed[STEP_CONNECT]++;
}
static void disc_handler(struct node *n)
{
end_perf(n, STEP_DISCONNECT);
- left[STEP_DISCONNECT]--;
+ completed[STEP_DISCONNECT]++;
}
-static int req_handler(struct rdma_cm_id *id)
+static void __req_handler(struct rdma_cm_id *id)
{
int ret;
perror("failure accepting");
goto err;
}
- return 0;
+ return;
err:
printf("failing connection request\n");
rdma_reject(id, NULL, 0);
- return ret;
+ rdma_destroy_id(id);
+ return;
+}
+
+static void *req_handler_thread(void *arg)
+{
+ struct list_head *work;
+ do {
+ pthread_mutex_lock(&req_work.lock);
+ if (__list_empty(&req_work))
+ pthread_cond_wait(&req_work.cond, &req_work.lock);
+ work = __list_remove_head(&req_work);
+ pthread_mutex_unlock(&req_work.lock);
+ __req_handler(work->id);
+ free(work);
+ } while (1);
+ return NULL;
+}
+
+static void *disc_handler_thread(void *arg)
+{
+ struct list_head *work;
+ do {
+ pthread_mutex_lock(&disc_work.lock);
+ if (__list_empty(&disc_work))
+ pthread_cond_wait(&disc_work.cond, &disc_work.lock);
+ work = __list_remove_head(&disc_work);
+ pthread_mutex_unlock(&disc_work.lock);
+ rdma_disconnect(work->id);
+ rdma_destroy_id(work->id);
+ free(work);
+ } while (1);
+ return NULL;
}
static void cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
struct node *n = id->context;
+ struct list_head *request;
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
route_handler(n);
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
- if (req_handler(id)) {
- rdma_ack_cm_event(event);
+ request = malloc(sizeof *request);
+ if (!request) {
+ perror("out of memory accepting connect request");
+ rdma_reject(id, NULL, 0);
rdma_destroy_id(id);
- return;
+ } else {
+ INIT_LIST(request);
+ request->id = id;
+ list_add_tail(&req_work, request);
}
break;
case RDMA_CM_EVENT_ESTABLISHED:
break;
case RDMA_CM_EVENT_DISCONNECTED:
if (!n) {
- rdma_disconnect(id);
- rdma_ack_cm_event(event);
- rdma_destroy_id(id);
- return;
- }
- disc_handler(n);
+ request = malloc(sizeof *request);
+ if (!request) {
+ perror("out of memory queueing disconnect request, handling synchronously");
+ rdma_disconnect(id);
+ rdma_destroy_id(id);
+ } else {
+ INIT_LIST(request);
+ request->id = id;
+ list_add_tail(&disc_work, request);
+ }
+ } else
+ disc_handler(n);
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
/* Cleanup will occur after test completes. */
end_time(STEP_DESTROY);
}
-static int process_events(int *left)
+static void *process_events(void *arg)
{
struct rdma_cm_event *event;
int ret = 0;
- while ((!left || *left) && !ret) {
+ while (!ret) {
ret = rdma_get_cm_event(channel, &event);
if (!ret) {
cma_handler(event->id, event);
} else {
- perror("failure in rdma_get_cm_event in connect events");
+ perror("failure in rdma_get_cm_event in process_server_events");
ret = errno;
}
}
-
- return ret;
+ return NULL;
}
static int run_server(void)
{
+ pthread_t req_thread, disc_thread;
struct rdma_cm_id *listen_id;
int ret;
+ INIT_LIST(&req_work.list);
+ INIT_LIST(&disc_work.list);
+ ret = pthread_mutex_init(&req_work.lock, NULL);
+ if (ret) {
+ perror("initializing mutex for req work");
+ return ret;
+ }
+
+ ret = pthread_mutex_init(&disc_work.lock, NULL);
+ if (ret) {
+ perror("initializing mutex for disc work");
+ return ret;
+ }
+
+ ret = pthread_cond_init(&req_work.cond, NULL);
+ if (ret) {
+ perror("initializing cond for req work");
+ return ret;
+ }
+
+ ret = pthread_cond_init(&disc_work.cond, NULL);
+ if (ret) {
+ perror("initializing cond for disc work");
+ return ret;
+ }
+
+ ret = pthread_create(&req_thread, NULL, req_handler_thread, NULL);
+ if (ret) {
+ perror("failed to create req handler thread");
+ return ret;
+ }
+
+ ret = pthread_create(&disc_thread, NULL, disc_handler_thread, NULL);
+ if (ret) {
+ perror("failed to create disconnect handler thread");
+ return ret;
+ }
+
ret = rdma_create_id(channel, &listen_id, NULL, hints.ai_port_space);
if (ret) {
perror("listen request failed");
static int run_client(void)
{
+ pthread_t event_thread;
int i, ret;
ret = get_rdma_addr(src_addr, dst_addr, port, &hints, &rai);
conn_param.private_data = rai->ai_connect;
conn_param.private_data_len = rai->ai_connect_len;
+ ret = pthread_create(&event_thread, NULL, process_events, NULL);
+ if (ret) {
+ perror("failure creating event thread");
+ return ret;
+ }
+
if (src_addr) {
printf("binding source address\n");
start_time(STEP_BIND);
nodes[i].error = 1;
continue;
}
- left[STEP_RESOLVE_ADDR]++;
+ started[STEP_RESOLVE_ADDR]++;
}
- ret = process_events(&left[STEP_RESOLVE_ADDR]);
- if (ret)
- return ret;
+ while (started[STEP_RESOLVE_ADDR] != completed[STEP_RESOLVE_ADDR]) sched_yield();
end_time(STEP_RESOLVE_ADDR);
printf("resolving route\n");
nodes[i].error = 1;
continue;
}
- left[STEP_RESOLVE_ROUTE]++;
+ started[STEP_RESOLVE_ROUTE]++;
}
- ret = process_events(&left[STEP_RESOLVE_ROUTE]);
- if (ret)
- return ret;
+ while (started[STEP_RESOLVE_ROUTE] != completed[STEP_RESOLVE_ROUTE]) sched_yield();
end_time(STEP_RESOLVE_ROUTE);
printf("creating qp\n");
nodes[i].error = 1;
continue;
}
- left[STEP_CONNECT]++;
+ started[STEP_CONNECT]++;
}
- ret = process_events(&left[STEP_CONNECT]);
- if (ret)
- return ret;
+ while (started[STEP_CONNECT] != completed[STEP_CONNECT]) sched_yield();
end_time(STEP_CONNECT);
printf("disconnecting\n");
continue;
start_perf(&nodes[i], STEP_DISCONNECT);
rdma_disconnect(nodes[i].id);
- left[STEP_DISCONNECT]++;
+ started[STEP_DISCONNECT]++;
}
- ret = process_events(&left[STEP_DISCONNECT]);
- if (ret)
- return ret;
+ while (started[STEP_DISCONNECT] != completed[STEP_DISCONNECT]) sched_yield();
end_time(STEP_DISCONNECT);
return ret;