From d73b77822c16b0d36682796870f33e0c206ccd0c Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Mon, 7 May 2012 17:48:50 -0700 Subject: [PATCH] Refresh of rs-locking --- src/cma.h | 28 ++++++++++++++++++++++++++++ src/rsocket.c | 24 ++++++++++++------------ 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/cma.h b/src/cma.h index 91528c05..f28020ed 100644 --- a/src/cma.h +++ b/src/cma.h @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; } #define min(a, b) (a < b ? a : b) +/* + * Fast synchronization for low contention locking. + */ +typedef struct { + sem_t sem; + volatile int cnt; +} fastlock_t; +static inline void fastlock_init(fastlock_t *lock) +{ + sem_init(&lock->sem, 0, 0); + lock->cnt = 0; +} +static inline void fastlock_destroy(fastlock_t *lock) +{ + sem_destroy(&lock->sem); +} +static inline void fastlock_acquire(fastlock_t *lock) +{ + if (__sync_add_and_fetch(&lock->cnt, 1) > 1) + sem_wait(&lock->sem); +} +static inline void fastlock_release(fastlock_t *lock) +{ + if (__sync_sub_and_fetch(&lock->cnt, 1) > 0) + sem_post(&lock->sem); +} + int ucma_complete(struct rdma_cm_id *id); static inline int ERR(int err) { diff --git a/src/rsocket.c b/src/rsocket.c index 775e9b07..0badf239 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -141,8 +141,8 @@ enum rs_state { struct rsocket { struct rdma_cm_id *cm_id; - pthread_mutex_t slock; - pthread_mutex_t rlock; + fastlock_t slock; + fastlock_t rlock; pthread_mutex_t cq_lock; pthread_cond_t cq_cond; int cq_busy; @@ -225,8 +225,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->index = -1; rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE; rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE; - pthread_mutex_init(&rs->slock, NULL); - pthread_mutex_init(&rs->rlock, NULL); + fastlock_init(&rs->slock); + fastlock_init(&rs->rlock); pthread_mutex_init(&rs->cq_lock, NULL); pthread_cond_init(&rs->cq_cond, NULL); return rs; @@ -377,8 +377,8 @@ static void rs_free(struct rsocket *rs) pthread_cond_destroy(&rs->cq_cond); pthread_mutex_destroy(&rs->cq_lock); - pthread_mutex_destroy(&rs->rlock); - pthread_mutex_destroy(&rs->slock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); free(rs); } @@ -1002,7 +1002,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) return ret; } } - pthread_mutex_lock(&rs->rlock); + fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); if (ret && errno != ECONNRESET) @@ -1040,7 +1040,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } rs->rbuf_bytes_avail += len - left; out: - pthread_mutex_unlock(&rs->rlock); + fastlock_release(&rs->rlock); return ret ? ret : len - left; } @@ -1105,7 +1105,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } } - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1157,7 +1157,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } @@ -1214,7 +1214,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1262,7 +1262,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } -- 2.46.0