]> git.openfabrics.org - ~shefty/rdma-win.git/commitdiff
Refresh of dapl-rbuf
authorSean Hefty <sean.hefty@intel.com>
Thu, 4 Mar 2010 22:01:21 +0000 (14:01 -0800)
committerSean Hefty <sean.hefty@intel.com>
Thu, 4 Mar 2010 22:01:21 +0000 (14:01 -0800)
trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c
trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h
trunk/ulp/dapl2/dapl/include/dapl.h

index 54517a936c874a935ba7b1da33fc0a79cbe3d9eb..d1ee2693e58b2c6f7f506e6a53c48985d5767117 100644 (file)
@@ -41,8 +41,7 @@
  * dapls_rbuf_alloc
  *
  * Given a DAPL_RING_BUFFER, initialize it and provide memory for
- * the ringbuf itself. A passed in size will be adjusted to the next
- * largest power of two number to simplify management.
+ * the ringbuf itself.
  *
  * Input:
  *     rbuf            pointer to DAPL_RING_BUFFER
  */
 DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-       unsigned int rsize;     /* real size */
-
        /* The circular buffer must be allocated one too large.
         * This eliminates any need for a distinct counter, as
         * having the two pointers equal always means "empty" -- never "full"
         */
        size++;
 
-       /* Put size on a power of 2 boundary */
-       rsize = 1;
-       while ((DAT_COUNT) rsize < size) {
-               rsize <<= 1;
-       }
-
-       rbuf->base = (void *)dapl_os_alloc(rsize * sizeof(void *));
-       if (rbuf->base != NULL) {
-               rbuf->lim = rsize - 1;
-               dapl_os_atomic_set(&rbuf->head, 0);
-               dapl_os_atomic_set(&rbuf->tail, 0);
-       } else {
+       rbuf->base = (void *)dapl_os_alloc(size * sizeof(void *));
+       if (rbuf->base == NULL)
                return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
-       }
 
+       dapl_os_lock_init(&rbuf->lock);
+       rbuf->size = size;
+       rbuf->head = 0;
+       rbuf->tail = 0;
        return DAT_SUCCESS;
 }
 
 /*
  * dapls_rbuf_realloc
  *
- * Resizes a DAPL_RING_BUFFER. This function is not thread safe;
- * adding or removing elements from a ring buffer while resizing 
- * will have indeterminate results.
+ * Resizes a DAPL_RING_BUFFER.
  *
  * Input:
  *     rbuf            pointer to DAPL_RING_BUFFER
@@ -106,41 +94,35 @@ DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
 {
-       DAPL_RING_BUFFER new_rbuf;
-       void *entry;
-       DAT_RETURN dat_status;
-
-       dat_status = DAT_SUCCESS;
+       void **base;
 
        /* decreasing the size or retaining the old size is not allowed */
-       if (size <= rbuf->lim + 1) {
-               dat_status = DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
-               goto bail;
-       }
+       if (size <= rbuf->size + 1)
+               return DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
 
-       /*
-        * !This is NOT ATOMIC!
-        * Simple algorithm: Allocate a new ring buffer, take everything
-        * out of the old one and put it in the new one, and release the 
-        * old base buffer.
-        */
-       dat_status = dapls_rbuf_alloc(&new_rbuf, size);
-       if (dat_status != DAT_SUCCESS) {
-               goto bail;
-       }
+       base = (void *) dapl_os_alloc(size * sizeof(void *));
+       if (base == NULL)
+               return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY;
 
-       while ((entry = dapls_rbuf_remove(rbuf)) != NULL) {
-               /* We know entries will fit so ignore the return code */
-               (void)dapls_rbuf_add(&new_rbuf, entry);
+       dapl_os_lock(&rbuf->lock);
+       if (rbuf->head > rbuf->tail) {
+               memcpy(&base[rbuf->tail], &rbuf->base[rbuf->tail],
+                       (rbuf->head - rbuf->tail) * sizeof(void *));
+       } else if (rbuf->head < rbuf->tail) {
+               memcpy(&base[0], &rbuf->base[rbuf->tail],
+                       (rbuf->size - rbuf->tail) * sizeof(void *));
+               memcpy(&base[rbuf->size - rbuf->tail], &rbuf->base[0],
+                       rbuf->head * sizeof(void *));
+               rbuf->head = rbuf->size - rbuf->tail + rbuf->head;
+               rbuf->tail = 0;
        }
 
-       /* release the old base buffer */
-       dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
+       dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
+       rbuf->base = base;
+       rbuf->size = size;
+       dapl_os_unlock(&rbuf->lock);
 
-       *rbuf = new_rbuf;
-
-      bail:
-       return dat_status;
+       return DAT_SUCCESS;
 }
 
 /*
@@ -160,15 +142,21 @@ DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size)
  */
 void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
 {
-       if ((NULL == rbuf) || (NULL == rbuf->base)) {
-               return;
-       }
+       dapl_os_lock_destroy(&rbuf->lock);
+       dapl_os_free(rbuf->base, rbuf->size * sizeof(void *));
+}
 
-       dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *));
-       rbuf->base = NULL;
-       rbuf->lim = 0;
+static DAT_COUNT dapli_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
+{
+       if (rbuf->head >= rbuf->tail)
+               return rbuf->head - rbuf->tail;
+       else
+               return rbuf->size - rbuf->tail + rbuf->head;
+}
 
-       return;
+static int dapli_rbuf_empty(IN DAPL_RING_BUFFER *rbuf)
+{
+       return rbuf->head == rbuf->tail;
 }
 
 /*
@@ -190,22 +178,20 @@ void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf)
  */
 DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
 {
-       int pos;
-       int val;
-
-       while (((dapl_os_atomic_read(&rbuf->head) + 1) & rbuf->lim) !=
-              (dapl_os_atomic_read(&rbuf->tail) & rbuf->lim)) {
-               pos = dapl_os_atomic_read(&rbuf->head);
-               val = dapl_os_atomic_assign(&rbuf->head, pos, pos + 1);
-               if (val == pos) {
-                       pos = (pos + 1) & rbuf->lim;    /* verify in range */
-                       rbuf->base[pos] = entry;
-                       return DAT_SUCCESS;
-               }
+       DAT_RETURN ret;
+
+       dapl_os_lock(&rbuf->lock);
+       if (dapli_rbuf_count(rbuf) < rbuf->size - 1) {
+               rbuf->base[rbuf->head++] = entry;
+               if (rbuf->head == rbuf->size)
+                       rbuf->head = 0;
+               ret = DAT_SUCCESS;
+       } else {
+               ret = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
        }
+       dapl_os_unlock(&rbuf->lock);
 
-       return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
-
+       return ret;
 }
 
 /*
@@ -226,21 +212,19 @@ DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry)
  */
 void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
 {
-       int pos;
-       int val;
-
-       while (dapl_os_atomic_read(&rbuf->head) !=
-              dapl_os_atomic_read(&rbuf->tail)) {
-               pos = dapl_os_atomic_read(&rbuf->tail);
-               val = dapl_os_atomic_assign(&rbuf->tail, pos, pos + 1);
-               if (val == pos) {
-                       pos = (pos + 1) & rbuf->lim;    /* verify in range */
+       void *entry;
 
-                       return (rbuf->base[pos]);
-               }
+       dapl_os_lock(&rbuf->lock);
+       if (!dapli_rbuf_empty(rbuf)) {
+               entry = rbuf->base[rbuf->tail++];
+               if (rbuf->tail == rbuf->size)
+                       rbuf->tail = 0;
+       } else {
+               entry = NULL;
        }
+       dapl_os_unlock(&rbuf->lock);
 
-       return NULL;
+       return entry;
 
 }
 
@@ -263,18 +247,10 @@ void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf)
 DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
 {
        DAT_COUNT count;
-       int head;
-       int tail;
-
-       head = dapl_os_atomic_read(&rbuf->head) & rbuf->lim;
-       tail = dapl_os_atomic_read(&rbuf->tail) & rbuf->lim;
-       if (head > tail) {
-               count = head - tail;
-       } else {
-               /* add 1 to lim as it is a mask, number of entries - 1 */
-               count = (rbuf->lim + 1 - tail + head) & rbuf->lim;
-       }
 
+       dapl_os_lock(&rbuf->lock);
+       count = dapli_rbuf_count(rbuf);
+       dapl_os_unlock(&rbuf->lock);
        return count;
 }
 
@@ -299,19 +275,20 @@ DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf)
  */
 void dapls_rbuf_adjust(IN DAPL_RING_BUFFER * rbuf, IN intptr_t offset)
 {
-       int pos;
+       int i;
 
-       pos = dapl_os_atomic_read(&rbuf->head);
-       while (pos != dapl_os_atomic_read(&rbuf->tail)) {
-               rbuf->base[pos] = (void *)((char *)rbuf->base[pos] + offset);
-               pos = (pos + 1) & rbuf->lim;    /* verify in range */
-       }
+       dapl_os_lock(&rbuf->lock);
+       for (i = 0; i < rbuf->size; i++)
+               rbuf->base[i] = (void *) ((char *)rbuf->base[i] + offset);
+       dapl_os_unlock(&rbuf->lock);
 }
 
-/*
- * Local variables:
- *  c-indent-level: 4
- *  c-basic-offset: 4
- *  tab-width: 8
- * End:
- */
+int dapls_rbuf_empty(IN DAPL_RING_BUFFER * rbuf)
+{
+       int empty;
+
+       dapl_os_lock(&rbuf->lock);
+       empty = dapli_rbuf_empty(rbuf);
+       dapl_os_unlock(&rbuf->lock);
+       return empty;
+}
index 46c82c95e717119854b6fcc33cacb8a55a0dac4b..1eb782d63e746a43266aa64cefc1c3543e83e2db 100644 (file)
@@ -68,11 +68,8 @@ void dapls_rbuf_adjust (
        IN  DAPL_RING_BUFFER            *rbuf,
        IN  intptr_t                    offset);
 
-
-/*
- * Simple functions
- */
-#define dapls_rbuf_empty(rbuf) (rbuf->head == rbuf->tail)
+int dapls_rbuf_empty(
+       IN   DAPL_RING_BUFFER           *rbuf);
 
 
 #endif /* _DAPL_RING_BUFFER_H_ */
index a36b110733d561d97fb0d5255ed3e598e10b46bf..64f628103cff8fff0feda733cce2022316a62f75 100644 (file)
@@ -237,9 +237,10 @@ struct dapl_llist_entry
 struct dapl_ring_buffer
 {
     void               **base;         /* base of element array */
-    DAT_COUNT          lim;            /* mask, number of entries - 1 */
-    DAPL_ATOMIC                head;           /* head pointer index */
-    DAPL_ATOMIC                tail;           /* tail pointer index */
+    DAT_COUNT          size;
+    DAT_COUNT          head;
+    DAT_COUNT          tail;
+    DAPL_OS_LOCK       lock;
 };
 
 struct dapl_cookie_buffer
@@ -438,7 +439,10 @@ struct dapl_ep
     ib_qp_state_t              qp_state;
 
     /* communications manager handle (IBM OS API) */
-    dp_ib_cm_handle_t          cm_handle;
+   // dp_ib_cm_handle_t                cm_handle;
+
+    /* Add support for multiple CM object ownership */
+    DAPL_LLIST_HEAD            cm_list_head;   
 
     /* store the remote IA address here, reference from the param
      * struct which only has a pointer, no storage