struct rsocket {
struct rdma_cm_id *cm_id;
- pthread_mutex_t slock;
- pthread_mutex_t rlock;
- pthread_mutex_t cq_lock;
- pthread_cond_t cq_cond;
- int cq_busy;
+ fastlock_t slock;
+ fastlock_t rlock;
+ fastlock_t cq_lock;
+ fastlock_t cq_wait_lock;
int opts;
long fd_flags;
rs->index = -1;
rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
- pthread_mutex_init(&rs->slock, NULL);
- pthread_mutex_init(&rs->rlock, NULL);
- pthread_mutex_init(&rs->cq_lock, NULL);
- pthread_cond_init(&rs->cq_cond, NULL);
+ fastlock_init(&rs->slock);
+ fastlock_init(&rs->rlock);
+ fastlock_init(&rs->cq_lock);
+ fastlock_init(&rs->cq_wait_lock);
return rs;
}
rdma_destroy_id(rs->cm_id);
}
- pthread_cond_destroy(&rs->cq_cond);
- pthread_mutex_destroy(&rs->cq_lock);
- pthread_mutex_destroy(&rs->rlock);
- pthread_mutex_destroy(&rs->slock);
+ fastlock_destroy(&rs->cq_wait_lock);
+ fastlock_destroy(&rs->cq_lock);
+ fastlock_destroy(&rs->rlock);
+ fastlock_destroy(&rs->slock);
free(rs);
}
return ret;
}
-static void rs_signal_cq_waiters(struct rsocket *rs)
-{
- pthread_mutex_lock(&rs->cq_lock);
- pthread_cond_signal(&rs->cq_cond);
- pthread_mutex_unlock(&rs->cq_lock);
-}
-
/*
* Although we serialize rsend and rrecv calls with respect to themselves,
* both calls may run simultaneously and need to poll the CQ for completions.
* which could be stalled until the remote process calls rrecv. This should
* not block rrecv from receiving data from the remote side however.
*
- * Perform a quick test before trying to acquire any locks. Also note that
- * busy may be set to 1 by another thread once it's been reset to 0.
+ * We handle this by using two locks. The cq_lock protects against polling
+ * the CQ and processing completions. The cq_wait_lock serializes access to
+ * waiting on the CQ.
*/
static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs))
{
int ret;
- pthread_mutex_lock(&rs->cq_lock);
- if (rs->cq_busy && nonblock) {
- pthread_mutex_unlock(&rs->cq_lock);
- return ERR(EWOULDBLOCK);
- }
-
- while (rs->cq_busy && !test(rs)) {
- pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
-
- if (test(rs)) {
- pthread_mutex_unlock(&rs->cq_lock);
- return 0;
- }
- }
-
- rs->cq_busy = 1;
- pthread_mutex_unlock(&rs->cq_lock);
-
+ fastlock_acquire(&rs->cq_lock);
do {
rs_update_credits(rs);
ret = rs_poll_cq(rs);
rs->cq_armed = 1;
} else {
rs_update_credits(rs);
- rs_signal_cq_waiters(rs);
+ fastlock_acquire(&rs->cq_wait_lock);
+ fastlock_release(&rs->cq_lock);
+
ret = rs_get_cq_event(rs);
+ fastlock_release(&cq_wait_lock);
+ fastlock_acquire(&rs->cq_lock);
}
} while (!ret);
rs_update_credits(rs);
- rs->cq_busy = 0;
- rs_signal_cq_waiters(rs);
+ fastlock_release(&rs->cq_lock);
return ret;
}
return ret;
}
}
- pthread_mutex_lock(&rs->rlock);
+ fastlock_acquire(&rs->rlock);
if (!rs_have_rdata(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
if (ret && errno != ECONNRESET)
}
rs->rbuf_bytes_avail += len - left;
out:
- pthread_mutex_unlock(&rs->rlock);
+ fastlock_release(&rs->rlock);
return ret ? ret : len - left;
}
}
}
- pthread_mutex_lock(&rs->slock);
+ fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
if (ret)
break;
}
- pthread_mutex_unlock(&rs->slock);
+ fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
}
for (i = 1; i < iovcnt; i++)
len += iov[i].iov_len;
- pthread_mutex_lock(&rs->slock);
+ fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
if (ret)
break;
}
- pthread_mutex_unlock(&rs->slock);
+ fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
}