]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
librdmacm/rping: Persistent rping server.
authorSteve Wise <swise@opengridcomputing.com>
Fri, 7 Sep 2007 18:12:57 +0000 (11:12 -0700)
committerSean Hefty <sean.hefty@intel.com>
Fri, 7 Sep 2007 18:12:57 +0000 (11:12 -0700)
Support a rping server mode where the server handles many incoming
connections by creating threads to process each new rping session.

Signed-off-by: Steve Wise <swise@opengridcomputing.com>
examples/rping.c
man/rping.1

index 5098ebcf70dc5f3c9a28bfd8cb3d1ef3ea675102..983ce1c95fe794b1547504841b2f609e2ad2b837 100644 (file)
@@ -112,6 +112,7 @@ struct rping_rdma_info {
 struct rping_cb {
        int server;                     /* 0 iff client */
        pthread_t cqthread;
+       pthread_t persistent_server_thread;
        struct ibv_comp_channel *channel;
        struct ibv_cq *cq;
        struct ibv_pd *pd;
@@ -591,24 +592,26 @@ static void *cq_thread(void *arg)
        DEBUG_LOG("cq_thread started.\n");
 
        while (1) {     
+               pthread_testcancel();
+
                ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
                if (ret) {
                        fprintf(stderr, "Failed to get cq event!\n");
-                       exit(ret);
+                       pthread_exit(NULL);
                }
                if (ev_cq != cb->cq) {
                        fprintf(stderr, "Unkown CQ!\n");
-                       exit(-1);
+                       pthread_exit(NULL);
                }
                ret = ibv_req_notify_cq(cb->cq, 0);
                if (ret) {
                        fprintf(stderr, "Failed to set notify!\n");
-                       exit(ret);
+                       pthread_exit(NULL);
                }
                ret = rping_cq_event_handler(cb);
                ibv_ack_cq_events(cb->cq, 1);
                if (ret)
-                       exit(ret);
+                       pthread_exit(NULL);
        }
 }
 
@@ -748,13 +751,99 @@ static int rping_bind_server(struct rping_cb *cb)
                return ret;
        }
 
-       sem_wait(&cb->sem);
-       if (cb->state != CONNECT_REQUEST) {
-               fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
-                       cb->state);
-               return -1;
+       return 0;
+}
+
+static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
+{
+       struct rping_cb *cb = malloc(sizeof *cb);
+       if (!cb)
+               return NULL;
+       *cb = *listening_cb;
+       cb->child_cm_id->context = cb;
+       return cb;
+}
+
+static void free_cb(struct rping_cb *cb)
+{
+       free(cb);
+}
+
+static void *rping_persistent_server_thread(void *arg)
+{
+       struct rping_cb *cb = arg;
+       struct ibv_recv_wr *bad_wr;
+       int ret;
+
+       ret = rping_setup_qp(cb, cb->child_cm_id);
+       if (ret) {
+               fprintf(stderr, "setup_qp failed: %d\n", ret);
+               goto err0;
+       }
+
+       ret = rping_setup_buffers(cb);
+       if (ret) {
+               fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
+               goto err1;
+       }
+
+       ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
+       if (ret) {
+               fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
+               goto err2;
        }
 
+       pthread_create(&cb->cqthread, NULL, cq_thread, cb);
+
+       ret = rping_accept(cb);
+       if (ret) {
+               fprintf(stderr, "connect error %d\n", ret);
+               goto err3;
+       }
+
+       rping_test_server(cb);
+       rdma_disconnect(cb->child_cm_id);
+       rping_free_buffers(cb);
+       rping_free_qp(cb);
+       pthread_cancel(cb->cqthread);
+       pthread_join(cb->cqthread, NULL);
+       rdma_destroy_id(cb->child_cm_id);
+       free_cb(cb);
+       return NULL;
+err3:
+       pthread_cancel(cb->cqthread);
+       pthread_join(cb->cqthread, NULL);
+err2:
+       rping_free_buffers(cb);
+err1:
+       rping_free_qp(cb);
+err0:
+       free_cb(cb);
+       return NULL;
+}
+
+static int rping_run_persistent_server(struct rping_cb *listening_cb)
+{
+       int ret;
+       struct rping_cb *cb;
+
+       ret = rping_bind_server(listening_cb);
+       if (ret)
+               return ret;
+
+       while (1) {
+               sem_wait(&listening_cb->sem);
+               if (listening_cb->state != CONNECT_REQUEST) {
+                       fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+                               listening_cb->state);
+                       return -1;
+               }
+
+               cb = clone_cb(listening_cb);
+               if (!cb)
+                       return -1;
+               pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
+       }
        return 0;
 }
 
@@ -767,6 +856,13 @@ static int rping_run_server(struct rping_cb *cb)
        if (ret)
                return ret;
 
+       sem_wait(&cb->sem);
+       if (cb->state != CONNECT_REQUEST) {
+               fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
+                       cb->state);
+               return -1;
+       }
+
        ret = rping_setup_qp(cb, cb->child_cm_id);
        if (ret) {
                fprintf(stderr, "setup_qp failed: %d\n", ret);
@@ -987,6 +1083,7 @@ static void usage(char *name)
        printf("\t-C count\tping count times\n");
        printf("\t-a addr\t\taddress\n");
        printf("\t-p port\t\tport\n");
+       printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
 }
 
 int main(int argc, char *argv[])
@@ -994,6 +1091,7 @@ int main(int argc, char *argv[])
        struct rping_cb *cb;
        int op;
        int ret = 0;
+       int persistent_server = 0;
 
        cb = malloc(sizeof(*cb));
        if (!cb)
@@ -1007,13 +1105,16 @@ int main(int argc, char *argv[])
        sem_init(&cb->sem, 0, 0);
 
        opterr = 0;
-       while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+       while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
                switch (op) {
                case 'a':
                        cb->addr_str = optarg;
                        cb->addr = inet_addr(optarg);
                        DEBUG_LOG("ipaddr (%s)\n", optarg);
                        break;
+               case 'P':
+                       persistent_server = 1;
+                       break;
                case 'p':
                        cb->port = htons(atoi(optarg));
                        DEBUG_LOG("port %d\n", (int) atoi(optarg));
@@ -1089,9 +1190,12 @@ int main(int argc, char *argv[])
 
        pthread_create(&cb->cmthread, NULL, cm_thread, cb);
 
-       if (cb->server)
-               ret = rping_run_server(cb);
-       else
+       if (cb->server) {
+               if (persistent_server)
+                       ret = rping_run_persistent_server(cb);
+               else
+                       ret = rping_run_server(cb);
+       } else
                ret = rping_run_client(cb);
 
        DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
index 153436a4f87d7c839c424cb2f4361bff88e87c31..a2b7b6be604b0ad43fba147e9c7211233f9b65e2 100644 (file)
@@ -4,7 +4,7 @@ rping \- RDMA CM connection and RDMA ping-pong test.
 .SH SYNOPSIS
 .sp
 .nf
-\fIrping\fR -s [-v] [-V] [-d] [-a address] [-p port]
+\fIrping\fR -s [-v] [-V] [-d] [-P] [-a address] [-p port]
                [-C message_count] [-S message_size]
 \fIrping\fR -c [-v] [-V] [-d] -a address [-p port]
                [-C message_count] [-S message_size]
@@ -42,6 +42,10 @@ The number of messages to transfer over each connection.  (default infinite)
 .TP
 \-S message_size
 The size of each message transferred, in bytes.  (default 100)
+.TP
+\-P
+Run the server in persistent mode.  This allows multiple rping clients
+to connect to a single server instance. The server will run until killed.
 .SH "NOTES"
 Because this test maps RDMA resources to userspace, users must ensure
 that they have available system resources and permissions.  See the