]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Optimize synchronization to improve performance
authorSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
Hotspot performance analysis using VTune showed pthread_mutex_unlock()
as the most significant hotspot when transferring small messages using
rstream.  To reduce the impact of using pthread mutexes, replace it
with a custom lock built using an atomic variable and a semaphore.
When there's no contention for the lock (which is the expected case
for nonblocking sockets), the synchronization is reduced to
incrementing and decrementing an atomic variable.

A test that acquired and released a lock 2 billion times reported that
the custom lock was roughly 20% faster than using the mutex.
26.6 seconds versus 33.0 seconds.

Unfortunately, further analysis showed that using the custom lock
provided a minimal performance gain on rstream itself, and simply
moved the hotspot to the custom unlock call.  The hotspot is likely
a result of some other interaction, rather than caused by slowness
in releasing a lock.  However, we keep the custom lock based on
the results of the direct lock tests that were done.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/cma.h
src/rsocket.c

index 91528c0569b56f9fb8979f1b049c5e5963a841ab..f28020ed0e083b1abfe3e918bbf7652b40578eba 100644 (file)
--- a/src/cma.h
+++ b/src/cma.h
@@ -42,6 +42,7 @@
 #include <errno.h>
 #include <endian.h>
 #include <byteswap.h>
+#include <semaphore.h>
 
 #include <rdma/rdma_cma.h>
 
@@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; }
 
 #define min(a, b) (a < b ? a : b)
 
+/*
+ * Fast synchronization for low contention locking.
+ */
+typedef struct {
+       sem_t sem;
+       volatile int cnt;
+} fastlock_t;
+static inline void fastlock_init(fastlock_t *lock)
+{
+       sem_init(&lock->sem, 0, 0);
+       lock->cnt = 0;
+}
+static inline void fastlock_destroy(fastlock_t *lock)
+{
+       sem_destroy(&lock->sem);
+}
+static inline void fastlock_acquire(fastlock_t *lock)
+{
+       if (__sync_add_and_fetch(&lock->cnt, 1) > 1)
+               sem_wait(&lock->sem);
+}
+static inline void fastlock_release(fastlock_t *lock)
+{
+       if (__sync_sub_and_fetch(&lock->cnt, 1) > 0)
+               sem_post(&lock->sem);
+}
+
 int ucma_complete(struct rdma_cm_id *id);
 static inline int ERR(int err)
 {
index 775e9b07c9b864528a61246551c5f52ea24d195e..2ffde9bb224e434de64049963c1202f0ac6c8320 100644 (file)
@@ -141,11 +141,10 @@ enum rs_state {
 
 struct rsocket {
        struct rdma_cm_id *cm_id;
-       pthread_mutex_t   slock;
-       pthread_mutex_t   rlock;
-       pthread_mutex_t   cq_lock;
-       pthread_cond_t    cq_cond;
-       int               cq_busy;
+       fastlock_t        slock;
+       fastlock_t        rlock;
+       fastlock_t        cq_lock;
+       fastlock_t        cq_wait_lock;
 
        int               opts;
        long              fd_flags;
@@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        rs->index = -1;
        rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
        rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
-       pthread_mutex_init(&rs->slock, NULL);
-       pthread_mutex_init(&rs->rlock, NULL);
-       pthread_mutex_init(&rs->cq_lock, NULL);
-       pthread_cond_init(&rs->cq_cond, NULL);
+       fastlock_init(&rs->slock);
+       fastlock_init(&rs->rlock);
+       fastlock_init(&rs->cq_lock);
+       fastlock_init(&rs->cq_wait_lock);
        return rs;
 }
 
@@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
 
-       pthread_cond_destroy(&rs->cq_cond);
-       pthread_mutex_destroy(&rs->cq_lock);
-       pthread_mutex_destroy(&rs->rlock);
-       pthread_mutex_destroy(&rs->slock);
+       fastlock_destroy(&rs->cq_wait_lock);
+       fastlock_destroy(&rs->cq_lock);
+       fastlock_destroy(&rs->rlock);
+       fastlock_destroy(&rs->slock);
        free(rs);
 }
 
@@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs)
        return ret;
 }
 
-static void rs_signal_cq_waiters(struct rsocket *rs)
-{
-       pthread_mutex_lock(&rs->cq_lock);
-       pthread_cond_signal(&rs->cq_cond);
-       pthread_mutex_unlock(&rs->cq_lock);
-}
-
 /*
  * Although we serialize rsend and rrecv calls with respect to themselves,
  * both calls may run simultaneously and need to poll the CQ for completions.
@@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs)
  * which could be stalled until the remote process calls rrecv.  This should
  * not block rrecv from receiving data from the remote side however.
  *
- * Perform a quick test before trying to acquire any locks.  Also note that
- * busy may be set to 1 by another thread once it's been reset to 0.
+ * We handle this by using two locks.  The cq_lock protects against polling
+ * the CQ and processing completions.  The cq_wait_lock serializes access to
+ * waiting on the CQ.
  */
 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
 {
        int ret;
 
-       pthread_mutex_lock(&rs->cq_lock);
-       if (rs->cq_busy && nonblock) {
-               pthread_mutex_unlock(&rs->cq_lock);
-               return ERR(EWOULDBLOCK);
-       }
-
-       while (rs->cq_busy && !test(rs)) {
-               pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
-
-               if (test(rs)) {
-                       pthread_mutex_unlock(&rs->cq_lock);
-                       return 0;
-               }
-       }
-
-       rs->cq_busy = 1;
-       pthread_mutex_unlock(&rs->cq_lock);
-
+       fastlock_acquire(&rs->cq_lock);
        do {
                rs_update_credits(rs);
                ret = rs_poll_cq(rs);
@@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
                        rs->cq_armed = 1;
                } else {
                        rs_update_credits(rs);
-                       rs_signal_cq_waiters(rs);
+                       fastlock_acquire(&rs->cq_wait_lock);
+                       fastlock_release(&rs->cq_lock);
+
                        ret = rs_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
+                       fastlock_acquire(&rs->cq_lock);
                }
        } while (!ret);
 
        rs_update_credits(rs);
-       rs->cq_busy = 0;
-       rs_signal_cq_waiters(rs);
+       fastlock_release(&rs->cq_lock);
        return ret;
 }
 
@@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        return ret;
                }
        }
-       pthread_mutex_lock(&rs->rlock);
+       fastlock_acquire(&rs->rlock);
        if (!rs_have_rdata(rs)) {
                ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
                if (ret && errno != ECONNRESET)
@@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        }
        rs->rbuf_bytes_avail += len - left;
 out:
-       pthread_mutex_unlock(&rs->rlock);
+       fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
 }
 
@@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                }
        }
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }
@@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }