]> git.openfabrics.org - ~shefty/rdma-win.git/commitdiff
dapl: fix ring buffer synchronization
authorSean Hefty <sean.hefty@intel.com>
Thu, 4 Mar 2010 22:54:46 +0000 (14:54 -0800)
committerSean Hefty <sean.hefty@intel.com>
Tue, 9 Mar 2010 01:13:44 +0000 (17:13 -0800)
The dapl ring buffer implementation is not thread safe and requires
the user to provide all synchronization.  Since the user must provide
synchronization, replace atomic definitions with simple ints to
improve performance and make the code more self-documenting that
no synchronization is provided.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c
trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h
trunk/ulp/dapl2/dapl/include/dapl.h

index 54517a936c874a935ba7b1da33fc0a79cbe3d9eb..710be9393072ba835f3bb3be3e39ce6a1e66e835 100644 (file)
@@ -31,6 +31,7 @@
  *
  * PURPOSE: Ring buffer management
  * Description: Support and management functions for ring buffers
+ * All synchronization must be provided by the user.
  *
  * $Id:$
  **********************************************************************/
@@ -41,8 +42,7 @@
  * dapls_rbuf_alloc
  *
  * Given a DAPL_RING_BUFFER, initialize it and provide memory for
- * the ringbuf itself. A passed in size will be adjusted to the next
- * largest power of two number to simplify management.
+ * the ringbuf itself.
  *
  * Input:
  *     rbuf            pointer to DAPL_RING_BUFFER
  */
 DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-       unsigned int rsize;     /* real size */
-
        /* The circular buffer must be allocated one too large.
         * This eliminates any need for a distinct counter, as
         * having the two pointers equal always means "empty" -- never "full"
         */
        size++;
 
-       /* Put size on a power of 2 boundary */
-       rsize = 1;
-       while ((DAT_COUNT) rsize < size) {
-               rsize <<= 1;
-       }
-
-       rbuf->base = (void *)dapl_os_alloc(rsize * sizeof(void *));
-       if (rbuf->base != NULL) {
-               rbuf->lim = rsize - 1;
-               dapl_os_atomic_set(&rbuf->head, 0);
-               dapl_os_atomic_set(&rbuf->tail, 0);
-       } else {
+       rbuf->base = (void *)dapl_os_alloc(size * sizeof(void *));
+       if (rbuf->base == NULL)
                return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
-       }
 
+       rbuf->size = size;
+       rbuf->head = 0;
+       rbuf->tail = 0;
        return DAT_SUCCESS;
 }
 
 /*
  * dapls_rbuf_realloc
  *
- * Resizes a DAPL_RING_BUFFER. This function is not thread safe;
- * adding or removing elements from a ring buffer while resizing 
- * will have indeterminate results.
+ * Resizes a DAPL_RING_BUFFER.
  *
  * Input:
  *     rbuf            pointer to DAPL_RING_BUFFER
@@ -106,41 +94,33 @@ DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-       DAPL_RING_BUFFER new_rbuf;
-       void *entry;
-       DAT_RETURN dat_status;
-
-       dat_status = DAT_SUCCESS;
+       void **base;
 
        /* decreasing the size or retaining the old size is not allowed */
-       if (size <= rbuf->lim + 1) {
-               dat_status = DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
-               goto bail;
-       }
+       if (size <= rbuf->size + 1)
+               return DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
 
-       /*
-        * !This is NOT ATOMIC!
-        * Simple algorithm: Allocate a new ring buffer, take everything
-        * out of the old one and put it in the new one, and release the 
-        * old base buffer.
-        */
-       dat_status = dapls_rbuf_alloc(&new_rbuf, size);
-       if (dat_status != DAT_SUCCESS) {
-               goto bail;
-       }
+       base = (void *) dapl_os_alloc(size * sizeof(void *));
+       if (base == NULL)
+               return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
 
-       while ((entry = dapls_rbuf_remove(rbuf)) != NULL) {
-               /* We know entries will fit so ignore the return code */
-               (void)dapls_rbuf_add(&new_rbuf, entry);
+       if (rbuf->head > rbuf->tail) {
+               memcpy(&base[rbuf->tail], &rbuf->base[rbuf->tail],
+                       (rbuf->head - rbuf->tail) * sizeof(void *));
+       } else if (rbuf->head < rbuf->tail) {
+               memcpy(&base[0], &rbuf->base[rbuf->tail],
+                       (rbuf->size - rbuf->tail) * sizeof(void *));
+               memcpy(&base[rbuf->size - rbuf->tail], &rbuf->base[0],
+                       rbuf->head * sizeof(void *));
+               rbuf->head = rbuf->size - rbuf->tail + rbuf->head;
+               rbuf->tail = 0;
        }
 
-       /* release the old base buffer */
-       dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
-
-       *rbuf = new_rbuf;
+       dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
+       rbuf->base = base;
+       rbuf->size = size;
 
-      bail:
-       return dat_status;
+       return DAT_SUCCESS;
 }
 
 /*
@@ -160,15 +140,7 @@ DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
 {
-       if ((NULL == rbuf) || (NULL == rbuf->base)) {
-               return;
-       }
-
-       dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
-       rbuf->base = NULL;
-       rbuf->lim = 0;
-
-       return;
+       dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
 }
 
 /*
@@ -190,22 +162,18 @@ void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
  */
 DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
 {
-       int pos;
-       int val;
+       DAT_RETURN ret;
 
-       while (((dapl_os_atomic_read(&rbuf->head) + 1) & rbuf->lim) !=
-              (dapl_os_atomic_read(&rbuf->tail) & rbuf->lim)) {
-               pos = dapl_os_atomic_read(&rbuf->head);
-               val = dapl_os_atomic_assign(&rbuf->head, pos, pos + 1);
-               if (val == pos) {
-                       pos = (pos + 1) & rbuf->lim;    /* verify in range */
-                       rbuf->base[pos] = entry;
-                       return DAT_SUCCESS;
-               }
+       if (dapls_rbuf_count(rbuf) < rbuf->size - 1) {
+               rbuf->base[rbuf->head++] = entry;
+               if (rbuf->head == rbuf->size)
+                       rbuf->head = 0;
+               ret = DAT_SUCCESS;
+       } else {
+               ret = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
        }
 
-       return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
-
+       return ret;
 }
 
 /*
@@ -226,26 +194,22 @@ DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
  */
 void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
 {
-       int pos;
-       int val;
-
-       while (dapl_os_atomic_read(&rbuf->head) !=
-              dapl_os_atomic_read(&rbuf->tail)) {
-               pos = dapl_os_atomic_read(&rbuf->tail);
-               val = dapl_os_atomic_assign(&rbuf->tail, pos, pos + 1);
-               if (val == pos) {
-                       pos = (pos + 1) & rbuf->lim;    /* verify in range */
+       void *entry;
 
-                       return (rbuf->base[pos]);
-               }
+       if (!dapls_rbuf_empty(rbuf)) {
+               entry = rbuf->base[rbuf->tail++];
+               if (rbuf->tail == rbuf->size)
+                       rbuf->tail = 0;
+       } else {
+               entry = NULL;
        }
 
-       return NULL;
+       return entry;
 
 }
 
 /*
- * dapli_rbuf_count
+ * dapls_rbuf_count
  *
  * Return the number of entries in use in the ring buffer
  *
@@ -262,20 +226,10 @@ void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
  */
 DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
 {
-       DAT_COUNT count;
-       int head;
-       int tail;
-
-       head = dapl_os_atomic_read(&rbuf->head) & rbuf->lim;
-       tail = dapl_os_atomic_read(&rbuf->tail) & rbuf->lim;
-       if (head > tail) {
-               count = head - tail;
-       } else {
-               /* add 1 to lim as it is a mask, number of entries - 1 */
-               count = (rbuf->lim + 1 - tail + head) & rbuf->lim;
-       }
-
-       return count;
+       if (rbuf->head >= rbuf->tail)
+               return rbuf->head - rbuf->tail;
+       else
+               return rbuf->size - rbuf->tail + rbuf->head;
 }
 
 /*
@@ -299,19 +253,13 @@ DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
  */
 void dapls_rbuf_adjust(IN DAPL_RING_BUFFER * rbuf, IN intptr_t offset)
 {
-       int pos;
+       int i;
 
-       pos = dapl_os_atomic_read(&rbuf->head);
-       while (pos != dapl_os_atomic_read(&rbuf->tail)) {
-               rbuf->base[pos] = (void *)((char *)rbuf->base[pos] + offset);
-               pos = (pos + 1) & rbuf->lim;    /* verify in range */
-       }
+       for (i = 0; i < rbuf->size; i++)
+               rbuf->base[i] = (void *) ((char *)rbuf->base[i] + offset);
 }
 
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- *  tab-width: 8
- * End:
- */
+int dapls_rbuf_empty(IN DAPL_RING_BUFFER * rbuf)
+{
+       return rbuf->head == rbuf->tail;
+}
index 46c82c95e717119854b6fcc33cacb8a55a0dac4b..1eb782d63e746a43266aa64cefc1c3543e83e2db 100644 (file)
@@ -68,11 +68,8 @@ void dapls_rbuf_adjust (
        IN  DAPL_RING_BUFFER            *rbuf,
        IN  intptr_t                    offset);
 
-
-/*
- * Simple functions
- */
-#define dapls_rbuf_empty(rbuf) (rbuf->head == rbuf->tail)
+int dapls_rbuf_empty(
+       IN   DAPL_RING_BUFFER           *rbuf);
 
 
 #endif /* _DAPL_RING_BUFFER_H_ */
index 4439ec58fab456a4879682d48e1049a6f3679ea3..57aaecf3605ff4150321b97a64733488082759e6 100644 (file)
@@ -237,9 +237,9 @@ struct dapl_llist_entry
 struct dapl_ring_buffer\r
 {\r
     void               **base;         /* base of element array */\r
-    DAT_COUNT          lim;            /* mask, number of entries - 1 */\r
-    DAPL_ATOMIC                head;           /* head pointer index */\r
-    DAPL_ATOMIC                tail;           /* tail pointer index */\r
+    DAT_COUNT          size;\r
+    DAT_COUNT          head;\r
+    DAT_COUNT          tail;\r
 };\r
 \r
 struct dapl_cookie_buffer\r