+++ /dev/null
-Bottom: 0a763ea65957b893b6af0eba1e31e6cc858869bd
-Top: d4bd62f68b19a3992515a80d129cc327a1c971d8
-Author: Sean Hefty <sean.hefty@intel.com>
-Date: 2012-05-09 10:40:13 -0700
-
-Refresh of rs-locking
-
----
-
-diff --git a/src/rsocket.c b/src/rsocket.c
-index 0badf23..6d43373 100644
---- a/src/rsocket.c
-+++ b/src/rsocket.c
-@@ -143,9 +143,8 @@ struct rsocket {
- struct rdma_cm_id *cm_id;
- fastlock_t slock;
- fastlock_t rlock;
-- pthread_mutex_t cq_lock;
-- pthread_cond_t cq_cond;
-- int cq_busy;
-+ fastlock_t cq_lock;
-+ fastlock_t cq_wait_lock;
-
- int opts;
- long fd_flags;
-@@ -227,8 +226,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
- rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
- fastlock_init(&rs->slock);
- fastlock_init(&rs->rlock);
-- pthread_mutex_init(&rs->cq_lock, NULL);
-- pthread_cond_init(&rs->cq_cond, NULL);
-+ fastlock_init(&rs->cq_lock);
-+ fastlock_init(&rs->cq_wait_lock);
- return rs;
- }
-
-@@ -375,8 +374,8 @@ static void rs_free(struct rsocket *rs)
- rdma_destroy_id(rs->cm_id);
- }
-
-- pthread_cond_destroy(&rs->cq_cond);
-- pthread_mutex_destroy(&rs->cq_lock);
-+ fastlock_destroy(&rs->cq_wait_lock);
-+ fastlock_destroy(&rs->cq_lock);
- fastlock_destroy(&rs->rlock);
- fastlock_destroy(&rs->slock);
- free(rs);
-@@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs)
- return ret;
- }
-
--static void rs_signal_cq_waiters(struct rsocket *rs)
--{
-- pthread_mutex_lock(&rs->cq_lock);
-- pthread_cond_signal(&rs->cq_cond);
-- pthread_mutex_unlock(&rs->cq_lock);
--}
--
- /*
- * Although we serialize rsend and rrecv calls with respect to themselves,
- * both calls may run simultaneously and need to poll the CQ for completions.
-@@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs)
- * which could be stalled until the remote process calls rrecv. This should
- * not block rrecv from receiving data from the remote side however.
- *
-- * Perform a quick test before trying to acquire any locks. Also note that
-- * busy may be set to 1 by another thread once it's been reset to 0.
-+ * We handle this by using two locks. The cq_lock protects against polling
-+ * the CQ and processing completions. The cq_wait_lock serializes access to
-+ * waiting on the CQ.
- */
- static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
- {
- int ret;
-
-- pthread_mutex_lock(&rs->cq_lock);
-- if (rs->cq_busy && nonblock) {
-- pthread_mutex_unlock(&rs->cq_lock);
-- return ERR(EWOULDBLOCK);
-- }
--
-- while (rs->cq_busy && !test(rs)) {
-- pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
--
-- if (test(rs)) {
-- pthread_mutex_unlock(&rs->cq_lock);
-- return 0;
-- }
-- }
--
-- rs->cq_busy = 1;
-- pthread_mutex_unlock(&rs->cq_lock);
--
-+ fastlock_acquire(&rs->cq_lock);
- do {
- rs_update_credits(rs);
- ret = rs_poll_cq(rs);
-@@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
- rs->cq_armed = 1;
- } else {
- rs_update_credits(rs);
-- rs_signal_cq_waiters(rs);
-+ fastlock_acquire(&rs->cq_wait_lock);
-+ fastlock_release(&rs->cq_lock);
-+
- ret = rs_get_cq_event(rs);
-+ fastlock_release(&cq_wait_lock);
-+ fastlock_acquire(&rs->cq_lock);
- }
- } while (!ret);
-
- rs_update_credits(rs);
-- rs->cq_busy = 0;
-- rs_signal_cq_waiters(rs);
-+ fastlock_release(&rs->cq_lock);
- return ret;
- }
Bottom: de666c51520c9988ea3a07e332fa0402fdef6010
-Top: 0a763ea65957b893b6af0eba1e31e6cc858869bd
+Top: d4bd62f68b19a3992515a80d129cc327a1c971d8
Author: Sean Hefty <sean.hefty@intel.com>
Date: 2012-05-07 17:16:47 -0700
static inline int ERR(int err)
{
diff --git a/src/rsocket.c b/src/rsocket.c
-index 775e9b0..0badf23 100644
+index 775e9b0..6d43373 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
-@@ -141,8 +141,8 @@ enum rs_state {
+@@ -141,11 +141,10 @@ enum rs_state {
struct rsocket {
struct rdma_cm_id *cm_id;
- pthread_mutex_t slock;
- pthread_mutex_t rlock;
+- pthread_mutex_t cq_lock;
+- pthread_cond_t cq_cond;
+- int cq_busy;
+ fastlock_t slock;
+ fastlock_t rlock;
- pthread_mutex_t cq_lock;
- pthread_cond_t cq_cond;
- int cq_busy;
-@@ -225,8 +225,8 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
++ fastlock_t cq_lock;
++ fastlock_t cq_wait_lock;
+
+ int opts;
+ long fd_flags;
+@@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs)
rs->index = -1;
rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
- pthread_mutex_init(&rs->slock, NULL);
- pthread_mutex_init(&rs->rlock, NULL);
+- pthread_mutex_init(&rs->cq_lock, NULL);
+- pthread_cond_init(&rs->cq_cond, NULL);
+ fastlock_init(&rs->slock);
+ fastlock_init(&rs->rlock);
- pthread_mutex_init(&rs->cq_lock, NULL);
- pthread_cond_init(&rs->cq_cond, NULL);
++ fastlock_init(&rs->cq_lock);
++ fastlock_init(&rs->cq_wait_lock);
return rs;
-@@ -377,8 +377,8 @@ static void rs_free(struct rsocket *rs)
+ }
+
+@@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs)
+ rdma_destroy_id(rs->cm_id);
+ }
- pthread_cond_destroy(&rs->cq_cond);
- pthread_mutex_destroy(&rs->cq_lock);
+- pthread_cond_destroy(&rs->cq_cond);
+- pthread_mutex_destroy(&rs->cq_lock);
- pthread_mutex_destroy(&rs->rlock);
- pthread_mutex_destroy(&rs->slock);
++ fastlock_destroy(&rs->cq_wait_lock);
++ fastlock_destroy(&rs->cq_lock);
+ fastlock_destroy(&rs->rlock);
+ fastlock_destroy(&rs->slock);
free(rs);
}
-@@ -1002,7 +1002,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs)
+ return ret;
+ }
+
+-static void rs_signal_cq_waiters(struct rsocket *rs)
+-{
+- pthread_mutex_lock(&rs->cq_lock);
+- pthread_cond_signal(&rs->cq_cond);
+- pthread_mutex_unlock(&rs->cq_lock);
+-}
+-
+ /*
+ * Although we serialize rsend and rrecv calls with respect to themselves,
+ * both calls may run simultaneously and need to poll the CQ for completions.
+@@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs)
+ * which could be stalled until the remote process calls rrecv. This should
+ * not block rrecv from receiving data from the remote side however.
+ *
+- * Perform a quick test before trying to acquire any locks. Also note that
+- * busy may be set to 1 by another thread once it's been reset to 0.
++ * We handle this by using two locks. The cq_lock protects against polling
++ * the CQ and processing completions. The cq_wait_lock serializes access to
++ * waiting on the CQ.
+ */
+ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
+ {
+ int ret;
+
+- pthread_mutex_lock(&rs->cq_lock);
+- if (rs->cq_busy && nonblock) {
+- pthread_mutex_unlock(&rs->cq_lock);
+- return ERR(EWOULDBLOCK);
+- }
+-
+- while (rs->cq_busy && !test(rs)) {
+- pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
+-
+- if (test(rs)) {
+- pthread_mutex_unlock(&rs->cq_lock);
+- return 0;
+- }
+- }
+-
+- rs->cq_busy = 1;
+- pthread_mutex_unlock(&rs->cq_lock);
+-
++ fastlock_acquire(&rs->cq_lock);
+ do {
+ rs_update_credits(rs);
+ ret = rs_poll_cq(rs);
+@@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs
+ rs->cq_armed = 1;
+ } else {
+ rs_update_credits(rs);
+- rs_signal_cq_waiters(rs);
++ fastlock_acquire(&rs->cq_wait_lock);
++ fastlock_release(&rs->cq_lock);
++
+ ret = rs_get_cq_event(rs);
++ fastlock_release(&cq_wait_lock);
++ fastlock_acquire(&rs->cq_lock);
+ }
+ } while (!ret);
+
+ rs_update_credits(rs);
+- rs->cq_busy = 0;
+- rs_signal_cq_waiters(rs);
++ fastlock_release(&rs->cq_lock);
+ return ret;
+ }
+
+@@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
return ret;
}
}
if (!rs_have_rdata(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
if (ret && errno != ECONNRESET)
-@@ -1040,7 +1040,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
+@@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
}
rs->rbuf_bytes_avail += len - left;
out:
return ret ? ret : len - left;
}
-@@ -1105,7 +1105,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
}
}
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
-@@ -1157,7 +1157,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
+@@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags)
if (ret)
break;
}
return (ret && left == len) ? ret : len - left;
}
-@@ -1214,7 +1214,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
for (i = 1; i < iovcnt; i++)
len += iov[i].iov_len;
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
-@@ -1262,7 +1262,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
+@@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags
if (ret)
break;
}