#include <errno.h>
#include <endian.h>
#include <byteswap.h>
+#include <semaphore.h>
#include <rdma/rdma_cma.h>
#define min(a, b) (a < b ? a : b)
+/*
+ * Fast synchronization for low contention locking.
+ */
+typedef struct {
+ sem_t sem;
+ volatile int cnt;
+} fastlock_t;
+static inline void fastlock_init(fastlock_t *lock)
+{
+ sem_init(&lock->sem, 0, 0);
+ lock->cnt = 0;
+}
+static inline void fastlock_destroy(fastlock_t *lock)
+{
+ sem_destroy(&lock->sem);
+}
+static inline void fastlock_acquire(fastlock_t *lock)
+{
+ if (__sync_add_and_fetch(&lock->cnt, 1) > 1)
+ sem_wait(&lock->sem);
+}
+static inline void fastlock_release(fastlock_t *lock)
+{
+ if (__sync_sub_and_fetch(&lock->cnt, 1) > 0)
+ sem_post(&lock->sem);
+}
+
int ucma_complete(struct rdma_cm_id *id);
static inline int ERR(int err)
{
struct rsocket {
struct rdma_cm_id *cm_id;
- pthread_mutex_t slock;
- pthread_mutex_t rlock;
+ fastlock_t slock;
+ fastlock_t rlock;
pthread_mutex_t cq_lock;
pthread_cond_t cq_cond;
int cq_busy;
rs->index = -1;
rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
- pthread_mutex_init(&rs->slock, NULL);
- pthread_mutex_init(&rs->rlock, NULL);
+ fastlock_init(&rs->slock);
+ fastlock_init(&rs->rlock);
pthread_mutex_init(&rs->cq_lock, NULL);
pthread_cond_init(&rs->cq_cond, NULL);
return rs;
pthread_cond_destroy(&rs->cq_cond);
pthread_mutex_destroy(&rs->cq_lock);
- pthread_mutex_destroy(&rs->rlock);
- pthread_mutex_destroy(&rs->slock);
+ fastlock_destroy(&rs->rlock);
+ fastlock_destroy(&rs->slock);
free(rs);
}
return ret;
}
}
- pthread_mutex_lock(&rs->rlock);
+ fastlock_acquire(&rs->rlock);
if (!rs_have_rdata(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata);
if (ret && errno != ECONNRESET)
}
rs->rbuf_bytes_avail += len - left;
out:
- pthread_mutex_unlock(&rs->rlock);
+ fastlock_release(&rs->rlock);
return ret ? ret : len - left;
}
}
}
- pthread_mutex_lock(&rs->slock);
+ fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
if (ret)
break;
}
- pthread_mutex_unlock(&rs->slock);
+ fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
}
for (i = 1; i < iovcnt; i++)
len += iov[i].iov_len;
- pthread_mutex_lock(&rs->slock);
+ fastlock_acquire(&rs->slock);
for (left = len; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send);
if (ret)
break;
}
- pthread_mutex_unlock(&rs->slock);
+ fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
}