]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Optimize synchronization to improve performance
authorSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
Performance analysis using VTune showed that pthread_mutex_unlock()
is the single biggest contributor to increasing latency for 64-byte
transfers.  Unlocked was followed by get_sw_cqe(), then
__pthread_mutex_lock().  Replace the use of mutexes with an atomic
and a semaphore.  When there's no contention for the lock (which
would usually be the case when using nonblocking sockets), the
code simply increments and decrements an atomic varible.  Semaphores
are only used when contention occurs.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/cma.h
src/rsocket.c

index 91528c0569b56f9fb8979f1b049c5e5963a841ab..f28020ed0e083b1abfe3e918bbf7652b40578eba 100644 (file)
--- a/src/cma.h
+++ b/src/cma.h
@@ -42,6 +42,7 @@
 #include <errno.h>
 #include <endian.h>
 #include <byteswap.h>
+#include <semaphore.h>
 
 #include <rdma/rdma_cma.h>
 
@@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; }
 
 #define min(a, b) (a < b ? a : b)
 
+/*
+ * Fast synchronization for low contention locking.
+ */
+typedef struct {
+       sem_t sem;
+       volatile int cnt;
+} fastlock_t;
+static inline void fastlock_init(fastlock_t *lock)
+{
+       sem_init(&lock->sem, 0, 0);
+       lock->cnt = 0;
+}
+static inline void fastlock_destroy(fastlock_t *lock)
+{
+       sem_destroy(&lock->sem);
+}
+static inline void fastlock_acquire(fastlock_t *lock)
+{
+       if (__sync_add_and_fetch(&lock->cnt, 1) > 1)
+               sem_wait(&lock->sem);
+}
+static inline void fastlock_release(fastlock_t *lock)
+{
+       if (__sync_sub_and_fetch(&lock->cnt, 1) > 0)
+               sem_post(&lock->sem);
+}
+
 int ucma_complete(struct rdma_cm_id *id);
 static inline int ERR(int err)
 {
index 775e9b07c9b864528a61246551c5f52ea24d195e..6d43373398993fa13e963b794754b654fc8af3fb 100644 (file)
@@ -141,11 +141,10 @@ enum rs_state {
 
 struct rsocket {
        struct rdma_cm_id *cm_id;
-       pthread_mutex_t   slock;
-       pthread_mutex_t   rlock;
-       pthread_mutex_t   cq_lock;
-       pthread_cond_t    cq_cond;
-       int               cq_busy;
+       fastlock_t        slock;
+       fastlock_t        rlock;
+       fastlock_t        cq_lock;
+       fastlock_t        cq_wait_lock;
 
        int               opts;
        long              fd_flags;
@@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        rs->index = -1;
        rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
        rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
-       pthread_mutex_init(&rs->slock, NULL);
-       pthread_mutex_init(&rs->rlock, NULL);
-       pthread_mutex_init(&rs->cq_lock, NULL);
-       pthread_cond_init(&rs->cq_cond, NULL);
+       fastlock_init(&rs->slock);
+       fastlock_init(&rs->rlock);
+       fastlock_init(&rs->cq_lock);
+       fastlock_init(&rs->cq_wait_lock);
        return rs;
 }
 
@@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
 
-       pthread_cond_destroy(&rs->cq_cond);
-       pthread_mutex_destroy(&rs->cq_lock);
-       pthread_mutex_destroy(&rs->rlock);
-       pthread_mutex_destroy(&rs->slock);
+       fastlock_destroy(&rs->cq_wait_lock);
+       fastlock_destroy(&rs->cq_lock);
+       fastlock_destroy(&rs->rlock);
+       fastlock_destroy(&rs->slock);
        free(rs);
 }
 
@@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs)
        return ret;
 }
 
-static void rs_signal_cq_waiters(struct rsocket *rs)
-{
-       pthread_mutex_lock(&rs->cq_lock);
-       pthread_cond_signal(&rs->cq_cond);
-       pthread_mutex_unlock(&rs->cq_lock);
-}
-
 /*
  * Although we serialize rsend and rrecv calls with respect to themselves,
  * both calls may run simultaneously and need to poll the CQ for completions.
@@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs)
  * which could be stalled until the remote process calls rrecv.  This should
  * not block rrecv from receiving data from the remote side however.
  *
- * Perform a quick test before trying to acquire any locks.  Also note that
- * busy may be set to 1 by another thread once it's been reset to 0.
+ * We handle this by using two locks.  The cq_lock protects against polling
+ * the CQ and processing completions.  The cq_wait_lock serializes access to
+ * waiting on the CQ.
  */
 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
 {
        int ret;
 
-       pthread_mutex_lock(&rs->cq_lock);
-       if (rs->cq_busy && nonblock) {
-               pthread_mutex_unlock(&rs->cq_lock);
-               return ERR(EWOULDBLOCK);
-       }
-
-       while (rs->cq_busy && !test(rs)) {
-               pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
-
-               if (test(rs)) {
-                       pthread_mutex_unlock(&rs->cq_lock);
-                       return 0;
-               }
-       }
-
-       rs->cq_busy = 1;
-       pthread_mutex_unlock(&rs->cq_lock);
-
+       fastlock_acquire(&rs->cq_lock);
        do {
                rs_update_credits(rs);
                ret = rs_poll_cq(rs);
@@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
                        rs->cq_armed = 1;
                } else {
                        rs_update_credits(rs);
-                       rs_signal_cq_waiters(rs);
+                       fastlock_acquire(&rs->cq_wait_lock);
+                       fastlock_release(&rs->cq_lock);
+
                        ret = rs_get_cq_event(rs);
+                       fastlock_release(&cq_wait_lock);
+                       fastlock_acquire(&rs->cq_lock);
                }
        } while (!ret);
 
        rs_update_credits(rs);
-       rs->cq_busy = 0;
-       rs_signal_cq_waiters(rs);
+       fastlock_release(&rs->cq_lock);
        return ret;
 }
 
@@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        return ret;
                }
        }
-       pthread_mutex_lock(&rs->rlock);
+       fastlock_acquire(&rs->rlock);
        if (!rs_have_rdata(rs)) {
                ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
                if (ret && errno != ECONNRESET)
@@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        }
        rs->rbuf_bytes_avail += len - left;
 out:
-       pthread_mutex_unlock(&rs->rlock);
+       fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
 }
 
@@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                }
        }
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }
@@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }