]> git.openfabrics.org - ~shefty/librdmacm.git/commitdiff
rsockets: Optimize synchronization to improve performance
authorSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
committerSean Hefty <sean.hefty@intel.com>
Tue, 8 May 2012 00:16:47 +0000 (17:16 -0700)
Performance analysis using VTune showed that pthread_mutex_unlock()
is the single biggest contributor to increasing latency for 64-byte
transfers.  Unlocked was followed by get_sw_cqe(), then
__pthread_mutex_lock().  Replace the use of mutexes with an atomic
and a semaphore.  When there's no contention for the lock (which
would usually be the case when using nonblocking sockets), the
code simply increments and decrements an atomic varible.  Semaphores
are only used when contention occurs.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
src/cma.h
src/rsocket.c

index 91528c0569b56f9fb8979f1b049c5e5963a841ab..f28020ed0e083b1abfe3e918bbf7652b40578eba 100644 (file)
--- a/src/cma.h
+++ b/src/cma.h
@@ -42,6 +42,7 @@
 #include <errno.h>
 #include <endian.h>
 #include <byteswap.h>
+#include <semaphore.h>
 
 #include <rdma/rdma_cma.h>
 
@@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; }
 
 #define min(a, b) (a < b ? a : b)
 
+/*
+ * Fast synchronization for low contention locking.
+ */
+typedef struct {
+       sem_t sem;
+       volatile int cnt;
+} fastlock_t;
+static inline void fastlock_init(fastlock_t *lock)
+{
+       sem_init(&lock->sem, 0, 0);
+       lock->cnt = 0;
+}
+static inline void fastlock_destroy(fastlock_t *lock)
+{
+       sem_destroy(&lock->sem);
+}
+static inline void fastlock_acquire(fastlock_t *lock)
+{
+       if (__sync_add_and_fetch(&lock->cnt, 1) > 1)
+               sem_wait(&lock->sem);
+}
+static inline void fastlock_release(fastlock_t *lock)
+{
+       if (__sync_sub_and_fetch(&lock->cnt, 1) > 0)
+               sem_post(&lock->sem);
+}
+
 int ucma_complete(struct rdma_cm_id *id);
 static inline int ERR(int err)
 {
index 775e9b07c9b864528a61246551c5f52ea24d195e..0badf2394de62137356cd93e639cfdc8e5975ba1 100644 (file)
@@ -141,8 +141,8 @@ enum rs_state {
 
 struct rsocket {
        struct rdma_cm_id *cm_id;
-       pthread_mutex_t   slock;
-       pthread_mutex_t   rlock;
+       fastlock_t        slock;
+       fastlock_t        rlock;
        pthread_mutex_t   cq_lock;
        pthread_cond_t    cq_cond;
        int               cq_busy;
@@ -225,8 +225,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
        rs->index = -1;
        rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
        rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
-       pthread_mutex_init(&rs->slock, NULL);
-       pthread_mutex_init(&rs->rlock, NULL);
+       fastlock_init(&rs->slock);
+       fastlock_init(&rs->rlock);
        pthread_mutex_init(&rs->cq_lock, NULL);
        pthread_cond_init(&rs->cq_cond, NULL);
        return rs;
@@ -377,8 +377,8 @@ static void rs_free(struct rsocket *rs)
 
        pthread_cond_destroy(&rs->cq_cond);
        pthread_mutex_destroy(&rs->cq_lock);
-       pthread_mutex_destroy(&rs->rlock);
-       pthread_mutex_destroy(&rs->slock);
+       fastlock_destroy(&rs->rlock);
+       fastlock_destroy(&rs->slock);
        free(rs);
 }
 
@@ -1002,7 +1002,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        return ret;
                }
        }
-       pthread_mutex_lock(&rs->rlock);
+       fastlock_acquire(&rs->rlock);
        if (!rs_have_rdata(rs)) {
                ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
                if (ret && errno != ECONNRESET)
@@ -1040,7 +1040,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
        }
        rs->rbuf_bytes_avail += len - left;
 out:
-       pthread_mutex_unlock(&rs->rlock);
+       fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
 }
 
@@ -1105,7 +1105,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                }
        }
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1157,7 +1157,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }
@@ -1214,7 +1214,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
@@ -1262,7 +1262,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }