From: Sean Hefty Date: Tue, 8 May 2012 00:16:47 +0000 (-0700) Subject: rsockets: Optimize synchronization to improve performance X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=e69294e4ca7fb71f62ca32fc9f0ab385813cfe58;p=~shefty%2Flibrdmacm.git rsockets: Optimize synchronization to improve performance Hotspot performance analysis using VTune showed pthread_mutex_unlock() as the most significant hotspot when transferring small messages using rstream. To reduce the impact of using pthread mutexes, replace it with a custom lock built using an atomic variable and a semaphore. When there's no contention for the lock (which is the expected case for nonblocking sockets), the synchronization is reduced to incrementing and decrementing an atomic variable. A test that acquired and released a lock 2 billion times reported that the custom lock was roughly 20% faster than using the mutex. 26.6 seconds versus 33.0 seconds. Unfortunately, further analysis showed that using the custom lock provided a minimal performance gain on rstream itself, and simply moved the hotspot to the custom unlock call. The hotspot is likely a result of some other interaction, rather than caused by slowness in releasing a lock. However, we keep the custom lock based on the results of the direct lock tests that were done. Signed-off-by: Sean Hefty --- diff --git a/src/cma.h b/src/cma.h index 91528c05..f28020ed 100644 --- a/src/cma.h +++ b/src/cma.h @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; } #define min(a, b) (a < b ? a : b) +/* + * Fast synchronization for low contention locking. + */ +typedef struct { + sem_t sem; + volatile int cnt; +} fastlock_t; +static inline void fastlock_init(fastlock_t *lock) +{ + sem_init(&lock->sem, 0, 0); + lock->cnt = 0; +} +static inline void fastlock_destroy(fastlock_t *lock) +{ + sem_destroy(&lock->sem); +} +static inline void fastlock_acquire(fastlock_t *lock) +{ + if (__sync_add_and_fetch(&lock->cnt, 1) > 1) + sem_wait(&lock->sem); +} +static inline void fastlock_release(fastlock_t *lock) +{ + if (__sync_sub_and_fetch(&lock->cnt, 1) > 0) + sem_post(&lock->sem); +} + int ucma_complete(struct rdma_cm_id *id); static inline int ERR(int err) { diff --git a/src/rsocket.c b/src/rsocket.c index 775e9b07..2ffde9bb 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -141,11 +141,10 @@ enum rs_state { struct rsocket { struct rdma_cm_id *cm_id; - pthread_mutex_t slock; - pthread_mutex_t rlock; - pthread_mutex_t cq_lock; - pthread_cond_t cq_cond; - int cq_busy; + fastlock_t slock; + fastlock_t rlock; + fastlock_t cq_lock; + fastlock_t cq_wait_lock; int opts; long fd_flags; @@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->index = -1; rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE; rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE; - pthread_mutex_init(&rs->slock, NULL); - pthread_mutex_init(&rs->rlock, NULL); - pthread_mutex_init(&rs->cq_lock, NULL); - pthread_cond_init(&rs->cq_cond, NULL); + fastlock_init(&rs->slock); + fastlock_init(&rs->rlock); + fastlock_init(&rs->cq_lock); + fastlock_init(&rs->cq_wait_lock); return rs; } @@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - pthread_cond_destroy(&rs->cq_cond); - pthread_mutex_destroy(&rs->cq_lock); - pthread_mutex_destroy(&rs->rlock); - pthread_mutex_destroy(&rs->slock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); free(rs); } @@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs) return ret; } -static void rs_signal_cq_waiters(struct rsocket *rs) -{ - pthread_mutex_lock(&rs->cq_lock); - pthread_cond_signal(&rs->cq_cond); - pthread_mutex_unlock(&rs->cq_lock); -} - /* * Although we serialize rsend and rrecv calls with respect to themselves, * both calls may run simultaneously and need to poll the CQ for completions. @@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs) * which could be stalled until the remote process calls rrecv. This should * not block rrecv from receiving data from the remote side however. * - * Perform a quick test before trying to acquire any locks. Also note that - * busy may be set to 1 by another thread once it's been reset to 0. + * We handle this by using two locks. The cq_lock protects against polling + * the CQ and processing completions. The cq_wait_lock serializes access to + * waiting on the CQ. */ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) { int ret; - pthread_mutex_lock(&rs->cq_lock); - if (rs->cq_busy && nonblock) { - pthread_mutex_unlock(&rs->cq_lock); - return ERR(EWOULDBLOCK); - } - - while (rs->cq_busy && !test(rs)) { - pthread_cond_wait(&rs->cq_cond, &rs->cq_lock); - - if (test(rs)) { - pthread_mutex_unlock(&rs->cq_lock); - return 0; - } - } - - rs->cq_busy = 1; - pthread_mutex_unlock(&rs->cq_lock); - + fastlock_acquire(&rs->cq_lock); do { rs_update_credits(rs); ret = rs_poll_cq(rs); @@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs rs->cq_armed = 1; } else { rs_update_credits(rs); - rs_signal_cq_waiters(rs); + fastlock_acquire(&rs->cq_wait_lock); + fastlock_release(&rs->cq_lock); + ret = rs_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); + fastlock_acquire(&rs->cq_lock); } } while (!ret); rs_update_credits(rs); - rs->cq_busy = 0; - rs_signal_cq_waiters(rs); + fastlock_release(&rs->cq_lock); return ret; } @@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) return ret; } } - pthread_mutex_lock(&rs->rlock); + fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); if (ret && errno != ECONNRESET) @@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } rs->rbuf_bytes_avail += len - left; out: - pthread_mutex_unlock(&rs->rlock); + fastlock_release(&rs->rlock); return ret ? ret : len - left; } @@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } } - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } @@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; }