From: Sean Hefty Date: Tue, 8 May 2012 00:16:47 +0000 (-0700) Subject: rsockets: Optimize synchronization to improve performance X-Git-Url: https://openfabrics.org/gitweb/?a=commitdiff_plain;h=e20c2f4cc080f49661e1d5f745638105428f52c6;p=~shefty%2Flibrdmacm.git rsockets: Optimize synchronization to improve performance Performance analysis using VTune showed that pthread_mutex_unlock() is the single biggest contributor to increasing latency for 64-byte transfers. Unlocked was followed by get_sw_cqe(), then __pthread_mutex_lock(). Replace the use of mutexes with an atomic and a semaphore. When there's no contention for the lock (which would usually be the case when using nonblocking sockets), the code simply increments and decrements an atomic varible. Semaphores are only used when contention occurs. Signed-off-by: Sean Hefty --- diff --git a/src/cma.h b/src/cma.h index 91528c05..f28020ed 100644 --- a/src/cma.h +++ b/src/cma.h @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; } #define min(a, b) (a < b ? a : b) +/* + * Fast synchronization for low contention locking. + */ +typedef struct { + sem_t sem; + volatile int cnt; +} fastlock_t; +static inline void fastlock_init(fastlock_t *lock) +{ + sem_init(&lock->sem, 0, 0); + lock->cnt = 0; +} +static inline void fastlock_destroy(fastlock_t *lock) +{ + sem_destroy(&lock->sem); +} +static inline void fastlock_acquire(fastlock_t *lock) +{ + if (__sync_add_and_fetch(&lock->cnt, 1) > 1) + sem_wait(&lock->sem); +} +static inline void fastlock_release(fastlock_t *lock) +{ + if (__sync_sub_and_fetch(&lock->cnt, 1) > 0) + sem_post(&lock->sem); +} + int ucma_complete(struct rdma_cm_id *id); static inline int ERR(int err) { diff --git a/src/rsocket.c b/src/rsocket.c index 775e9b07..0badf239 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -141,8 +141,8 @@ enum rs_state { struct rsocket { struct rdma_cm_id *cm_id; - pthread_mutex_t slock; - pthread_mutex_t rlock; + fastlock_t slock; + fastlock_t rlock; pthread_mutex_t cq_lock; pthread_cond_t cq_cond; int cq_busy; @@ -225,8 +225,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->index = -1; rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE; rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE; - pthread_mutex_init(&rs->slock, NULL); - pthread_mutex_init(&rs->rlock, NULL); + fastlock_init(&rs->slock); + fastlock_init(&rs->rlock); pthread_mutex_init(&rs->cq_lock, NULL); pthread_cond_init(&rs->cq_cond, NULL); return rs; @@ -377,8 +377,8 @@ static void rs_free(struct rsocket *rs) pthread_cond_destroy(&rs->cq_cond); pthread_mutex_destroy(&rs->cq_lock); - pthread_mutex_destroy(&rs->rlock); - pthread_mutex_destroy(&rs->slock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); free(rs); } @@ -1002,7 +1002,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) return ret; } } - pthread_mutex_lock(&rs->rlock); + fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); if (ret && errno != ECONNRESET) @@ -1040,7 +1040,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } rs->rbuf_bytes_avail += len - left; out: - pthread_mutex_unlock(&rs->rlock); + fastlock_release(&rs->rlock); return ret ? ret : len - left; } @@ -1105,7 +1105,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } } - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1157,7 +1157,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } @@ -1214,7 +1214,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1262,7 +1262,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; }