struct rping_cb {
int server; /* 0 iff client */
pthread_t cqthread;
+ pthread_t persistent_server_thread;
struct ibv_comp_channel *channel;
struct ibv_cq *cq;
struct ibv_pd *pd;
DEBUG_LOG("cq_thread started.\n");
while (1) {
+ pthread_testcancel();
+
ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
if (ret) {
fprintf(stderr, "Failed to get cq event!\n");
- exit(ret);
+ pthread_exit(NULL);
}
if (ev_cq != cb->cq) {
fprintf(stderr, "Unkown CQ!\n");
- exit(-1);
+ pthread_exit(NULL);
}
ret = ibv_req_notify_cq(cb->cq, 0);
if (ret) {
fprintf(stderr, "Failed to set notify!\n");
- exit(ret);
+ pthread_exit(NULL);
}
ret = rping_cq_event_handler(cb);
ibv_ack_cq_events(cb->cq, 1);
if (ret)
- exit(ret);
+ pthread_exit(NULL);
}
}
return ret;
}
- sem_wait(&cb->sem);
- if (cb->state != CONNECT_REQUEST) {
- fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
- cb->state);
- return -1;
+ return 0;
+}
+
+static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
+{
+ struct rping_cb *cb = malloc(sizeof *cb);
+ if (!cb)
+ return NULL;
+ *cb = *listening_cb;
+ cb->child_cm_id->context = cb;
+ return cb;
+}
+
+static void free_cb(struct rping_cb *cb)
+{
+ free(cb);
+}
+
+static void *rping_persistent_server_thread(void *arg)
+{
+ struct rping_cb *cb = arg;
+ struct ibv_recv_wr *bad_wr;
+ int ret;
+
+ ret = rping_setup_qp(cb, cb->child_cm_id);
+ if (ret) {
+ fprintf(stderr, "setup_qp failed: %d\n", ret);
+ goto err0;
+ }
+
+ ret = rping_setup_buffers(cb);
+ if (ret) {
+ fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
+ goto err1;
+ }
+
+ ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
+ if (ret) {
+ fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
+ goto err2;
}
+ pthread_create(&cb->cqthread, NULL, cq_thread, cb);
+
+ ret = rping_accept(cb);
+ if (ret) {
+ fprintf(stderr, "connect error %d\n", ret);
+ goto err3;
+ }
+
+ rping_test_server(cb);
+ rdma_disconnect(cb->child_cm_id);
+ rping_free_buffers(cb);
+ rping_free_qp(cb);
+ pthread_cancel(cb->cqthread);
+ pthread_join(cb->cqthread, NULL);
+ rdma_destroy_id(cb->child_cm_id);
+ free_cb(cb);
+ return NULL;
+err3:
+ pthread_cancel(cb->cqthread);
+ pthread_join(cb->cqthread, NULL);
+err2:
+ rping_free_buffers(cb);
+err1:
+ rping_free_qp(cb);
+err0:
+ free_cb(cb);
+ return NULL;
+}
+
+static int rping_run_persistent_server(struct rping_cb *listening_cb)
+{
+ int ret;
+ struct rping_cb *cb;
+
+ ret = rping_bind_server(listening_cb);
+ if (ret)
+ return ret;
+
+ while (1) {
+ sem_wait(&listening_cb->sem);
+ if (listening_cb->state != CONNECT_REQUEST) {
+ fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+ listening_cb->state);
+ return -1;
+ }
+
+ cb = clone_cb(listening_cb);
+ if (!cb)
+ return -1;
+ pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
+ }
return 0;
}
if (ret)
return ret;
+ sem_wait(&cb->sem);
+ if (cb->state != CONNECT_REQUEST) {
+ fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+ cb->state);
+ return -1;
+ }
+
ret = rping_setup_qp(cb, cb->child_cm_id);
if (ret) {
fprintf(stderr, "setup_qp failed: %d\n", ret);
printf("\t-C count\tping count times\n");
printf("\t-a addr\t\taddress\n");
printf("\t-p port\t\tport\n");
+ printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
}
int main(int argc, char *argv[])
struct rping_cb *cb;
int op;
int ret = 0;
+ int persistent_server = 0;
cb = malloc(sizeof(*cb));
if (!cb)
sem_init(&cb->sem, 0, 0);
opterr = 0;
- while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+ while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
switch (op) {
case 'a':
cb->addr_str = optarg;
cb->addr = inet_addr(optarg);
DEBUG_LOG("ipaddr (%s)\n", optarg);
break;
+ case 'P':
+ persistent_server = 1;
+ break;
case 'p':
cb->port = htons(atoi(optarg));
DEBUG_LOG("port %d\n", (int) atoi(optarg));
pthread_create(&cb->cmthread, NULL, cm_thread, cb);
- if (cb->server)
- ret = rping_run_server(cb);
- else
+ if (cb->server) {
+ if (persistent_server)
+ ret = rping_run_persistent_server(cb);
+ else
+ ret = rping_run_server(cb);
+ } else
ret = rping_run_client(cb);
DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);