From 54b068dd9e9431e56e9c45bb9c65c4d80433822d Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Wed, 9 May 2012 10:40:13 -0700 Subject: [PATCH] Refresh of rs-locking --- src/rsocket.c | 53 ++++++++++++++++----------------------------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/src/rsocket.c b/src/rsocket.c index 0badf239..6d433733 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -143,9 +143,8 @@ struct rsocket { struct rdma_cm_id *cm_id; fastlock_t slock; fastlock_t rlock; - pthread_mutex_t cq_lock; - pthread_cond_t cq_cond; - int cq_busy; + fastlock_t cq_lock; + fastlock_t cq_wait_lock; int opts; long fd_flags; @@ -227,8 +226,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE; fastlock_init(&rs->slock); fastlock_init(&rs->rlock); - pthread_mutex_init(&rs->cq_lock, NULL); - pthread_cond_init(&rs->cq_cond, NULL); + fastlock_init(&rs->cq_lock); + fastlock_init(&rs->cq_wait_lock); return rs; } @@ -375,8 +374,8 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - pthread_cond_destroy(&rs->cq_cond); - pthread_mutex_destroy(&rs->cq_lock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); fastlock_destroy(&rs->rlock); fastlock_destroy(&rs->slock); free(rs); @@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs) return ret; } -static void rs_signal_cq_waiters(struct rsocket *rs) -{ - pthread_mutex_lock(&rs->cq_lock); - pthread_cond_signal(&rs->cq_cond); - pthread_mutex_unlock(&rs->cq_lock); -} - /* * Although we serialize rsend and rrecv calls with respect to themselves, * both calls may run simultaneously and need to poll the CQ for completions. @@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs) * which could be stalled until the remote process calls rrecv. This should * not block rrecv from receiving data from the remote side however. * - * Perform a quick test before trying to acquire any locks. Also note that - * busy may be set to 1 by another thread once it's been reset to 0. + * We handle this by using two locks. The cq_lock protects against polling + * the CQ and processing completions. The cq_wait_lock serializes access to + * waiting on the CQ. */ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) { int ret; - pthread_mutex_lock(&rs->cq_lock); - if (rs->cq_busy && nonblock) { - pthread_mutex_unlock(&rs->cq_lock); - return ERR(EWOULDBLOCK); - } - - while (rs->cq_busy && !test(rs)) { - pthread_cond_wait(&rs->cq_cond, &rs->cq_lock); - - if (test(rs)) { - pthread_mutex_unlock(&rs->cq_lock); - return 0; - } - } - - rs->cq_busy = 1; - pthread_mutex_unlock(&rs->cq_lock); - + fastlock_acquire(&rs->cq_lock); do { rs_update_credits(rs); ret = rs_poll_cq(rs); @@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs rs->cq_armed = 1; } else { rs_update_credits(rs); - rs_signal_cq_waiters(rs); + fastlock_acquire(&rs->cq_wait_lock); + fastlock_release(&rs->cq_lock); + ret = rs_get_cq_event(rs); + fastlock_release(&cq_wait_lock); + fastlock_acquire(&rs->cq_lock); } } while (!ret); rs_update_credits(rs); - rs->cq_busy = 0; - rs_signal_cq_waiters(rs); + fastlock_release(&rs->cq_lock); return ret; } -- 2.46.0