From 1b32ba003ddd6d87afa33599a75331eaacd0c37f Mon Sep 17 00:00:00 2001 From: Sean Hefty Date: Thu, 4 Mar 2010 14:00:34 -0800 Subject: [PATCH] dapl: fix ring buffer synchronization The dapl ring buffer implementation is not thread safe. Replace the use of atomic variables with actual locking to ensure that there are not races inserting and/or removing items at the same time. Without proper synchronization, the EVD can report invalid events or the same event multiple times. Signed-off-by: Sean Hefty --- .../dapl2/dapl/common/dapl_ring_buffer_util.c | 187 ++++++++---------- .../dapl2/dapl/common/dapl_ring_buffer_util.h | 7 +- trunk/ulp/dapl2/dapl/include/dapl.h | 12 +- 3 files changed, 92 insertions(+), 114 deletions(-) diff --git a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c index 54517a93..d1ee2693 100644 --- a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c +++ b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.c @@ -41,8 +41,7 @@ * dapls_rbuf_alloc * * Given a DAPL_RING_BUFFER, initialize it and provide memory for - * the ringbuf itself. A passed in size will be adjusted to the next - * largest power of two number to simplify management. + * the ringbuf itself. * * Input: * rbuf pointer to DAPL_RING_BUFFER @@ -58,38 +57,27 @@ */ DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size) { - unsigned int rsize; /* real size */ - /* The circular buffer must be allocated one too large. * This eliminates any need for a distinct counter, as * having the two pointers equal always means "empty" -- never "full" */ size++; - /* Put size on a power of 2 boundary */ - rsize = 1; - while ((DAT_COUNT) rsize < size) { - rsize <<= 1; - } - - rbuf->base = (void *)dapl_os_alloc(rsize * sizeof(void *)); - if (rbuf->base != NULL) { - rbuf->lim = rsize - 1; - dapl_os_atomic_set(&rbuf->head, 0); - dapl_os_atomic_set(&rbuf->tail, 0); - } else { + rbuf->base = (void *)dapl_os_alloc(size * sizeof(void *)); + if (rbuf->base == NULL) return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY; - } + dapl_os_lock_init(&rbuf->lock); + rbuf->size = size; + rbuf->head = 0; + rbuf->tail = 0; return DAT_SUCCESS; } /* * dapls_rbuf_realloc * - * Resizes a DAPL_RING_BUFFER. This function is not thread safe; - * adding or removing elements from a ring buffer while resizing - * will have indeterminate results. + * Resizes a DAPL_RING_BUFFER. * * Input: * rbuf pointer to DAPL_RING_BUFFER @@ -106,41 +94,35 @@ DAT_RETURN dapls_rbuf_alloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size) */ DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size) { - DAPL_RING_BUFFER new_rbuf; - void *entry; - DAT_RETURN dat_status; - - dat_status = DAT_SUCCESS; + void **base; /* decreasing the size or retaining the old size is not allowed */ - if (size <= rbuf->lim + 1) { - dat_status = DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2); - goto bail; - } + if (size <= rbuf->size + 1) + return DAT_ERROR(DAT_INVALID_PARAMETER, DAT_INVALID_ARG2); - /* - * !This is NOT ATOMIC! - * Simple algorithm: Allocate a new ring buffer, take everything - * out of the old one and put it in the new one, and release the - * old base buffer. - */ - dat_status = dapls_rbuf_alloc(&new_rbuf, size); - if (dat_status != DAT_SUCCESS) { - goto bail; - } + base = (void *) dapl_os_alloc(size * sizeof(void *)); + if (base == NULL) + return DAT_INSUFFICIENT_RESOURCES | DAT_RESOURCE_MEMORY; - while ((entry = dapls_rbuf_remove(rbuf)) != NULL) { - /* We know entries will fit so ignore the return code */ - (void)dapls_rbuf_add(&new_rbuf, entry); + dapl_os_lock(&rbuf->lock); + if (rbuf->head > rbuf->tail) { + memcpy(&base[rbuf->tail], &rbuf->base[rbuf->tail], + (rbuf->head - rbuf->tail) * sizeof(void *)); + } else if (rbuf->head < rbuf->tail) { + memcpy(&base[0], &rbuf->base[rbuf->tail], + (rbuf->size - rbuf->tail) * sizeof(void *)); + memcpy(&base[rbuf->size - rbuf->tail], &rbuf->base[0], + rbuf->head * sizeof(void *)); + rbuf->head = rbuf->size - rbuf->tail + rbuf->head; + rbuf->tail = 0; } - /* release the old base buffer */ - dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *)); + dapl_os_free(rbuf->base, rbuf->size * sizeof(void *)); + rbuf->base = base; + rbuf->size = size; + dapl_os_unlock(&rbuf->lock); - *rbuf = new_rbuf; - - bail: - return dat_status; + return DAT_SUCCESS; } /* @@ -160,15 +142,21 @@ DAT_RETURN dapls_rbuf_realloc(INOUT DAPL_RING_BUFFER * rbuf, IN DAT_COUNT size) */ void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf) { - if ((NULL == rbuf) || (NULL == rbuf->base)) { - return; - } + dapl_os_lock_destroy(&rbuf->lock); + dapl_os_free(rbuf->base, rbuf->size * sizeof(void *)); +} - dapl_os_free(rbuf->base, (rbuf->lim + 1) * sizeof(void *)); - rbuf->base = NULL; - rbuf->lim = 0; +static DAT_COUNT dapli_rbuf_count(IN DAPL_RING_BUFFER * rbuf) +{ + if (rbuf->head >= rbuf->tail) + return rbuf->head - rbuf->tail; + else + return rbuf->size - rbuf->tail + rbuf->head; +} - return; +static int dapli_rbuf_empty(IN DAPL_RING_BUFFER *rbuf) +{ + return rbuf->head == rbuf->tail; } /* @@ -190,22 +178,20 @@ void dapls_rbuf_destroy(IN DAPL_RING_BUFFER * rbuf) */ DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry) { - int pos; - int val; - - while (((dapl_os_atomic_read(&rbuf->head) + 1) & rbuf->lim) != - (dapl_os_atomic_read(&rbuf->tail) & rbuf->lim)) { - pos = dapl_os_atomic_read(&rbuf->head); - val = dapl_os_atomic_assign(&rbuf->head, pos, pos + 1); - if (val == pos) { - pos = (pos + 1) & rbuf->lim; /* verify in range */ - rbuf->base[pos] = entry; - return DAT_SUCCESS; - } + DAT_RETURN ret; + + dapl_os_lock(&rbuf->lock); + if (dapli_rbuf_count(rbuf) < rbuf->size - 1) { + rbuf->base[rbuf->head++] = entry; + if (rbuf->head == rbuf->size) + rbuf->head = 0; + ret = DAT_SUCCESS; + } else { + ret = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); } + dapl_os_unlock(&rbuf->lock); - return DAT_ERROR(DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY); - + return ret; } /* @@ -226,21 +212,19 @@ DAT_RETURN dapls_rbuf_add(IN DAPL_RING_BUFFER * rbuf, IN void *entry) */ void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf) { - int pos; - int val; - - while (dapl_os_atomic_read(&rbuf->head) != - dapl_os_atomic_read(&rbuf->tail)) { - pos = dapl_os_atomic_read(&rbuf->tail); - val = dapl_os_atomic_assign(&rbuf->tail, pos, pos + 1); - if (val == pos) { - pos = (pos + 1) & rbuf->lim; /* verify in range */ + void *entry; - return (rbuf->base[pos]); - } + dapl_os_lock(&rbuf->lock); + if (!dapli_rbuf_empty(rbuf)) { + entry = rbuf->base[rbuf->tail++]; + if (rbuf->tail == rbuf->size) + rbuf->tail = 0; + } else { + entry = NULL; } + dapl_os_unlock(&rbuf->lock); - return NULL; + return entry; } @@ -263,18 +247,10 @@ void *dapls_rbuf_remove(IN DAPL_RING_BUFFER * rbuf) DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf) { DAT_COUNT count; - int head; - int tail; - - head = dapl_os_atomic_read(&rbuf->head) & rbuf->lim; - tail = dapl_os_atomic_read(&rbuf->tail) & rbuf->lim; - if (head > tail) { - count = head - tail; - } else { - /* add 1 to lim as it is a mask, number of entries - 1 */ - count = (rbuf->lim + 1 - tail + head) & rbuf->lim; - } + dapl_os_lock(&rbuf->lock); + count = dapli_rbuf_count(rbuf); + dapl_os_unlock(&rbuf->lock); return count; } @@ -299,19 +275,20 @@ DAT_COUNT dapls_rbuf_count(IN DAPL_RING_BUFFER * rbuf) */ void dapls_rbuf_adjust(IN DAPL_RING_BUFFER * rbuf, IN intptr_t offset) { - int pos; + int i; - pos = dapl_os_atomic_read(&rbuf->head); - while (pos != dapl_os_atomic_read(&rbuf->tail)) { - rbuf->base[pos] = (void *)((char *)rbuf->base[pos] + offset); - pos = (pos + 1) & rbuf->lim; /* verify in range */ - } + dapl_os_lock(&rbuf->lock); + for (i = 0; i < rbuf->size; i++) + rbuf->base[i] = (void *) ((char *)rbuf->base[i] + offset); + dapl_os_unlock(&rbuf->lock); } -/* - * Local variables: - * c-indent-level: 4 - * c-basic-offset: 4 - * tab-width: 8 - * End: - */ +int dapls_rbuf_empty(IN DAPL_RING_BUFFER * rbuf) +{ + int empty; + + dapl_os_lock(&rbuf->lock); + empty = dapli_rbuf_empty(rbuf); + dapl_os_unlock(&rbuf->lock); + return empty; +} diff --git a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h index 46c82c95..1eb782d6 100644 --- a/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h +++ b/trunk/ulp/dapl2/dapl/common/dapl_ring_buffer_util.h @@ -68,11 +68,8 @@ void dapls_rbuf_adjust ( IN DAPL_RING_BUFFER *rbuf, IN intptr_t offset); - -/* - * Simple functions - */ -#define dapls_rbuf_empty(rbuf) (rbuf->head == rbuf->tail) +int dapls_rbuf_empty( + IN DAPL_RING_BUFFER *rbuf); #endif /* _DAPL_RING_BUFFER_H_ */ diff --git a/trunk/ulp/dapl2/dapl/include/dapl.h b/trunk/ulp/dapl2/dapl/include/dapl.h index a36b1107..64f62810 100644 --- a/trunk/ulp/dapl2/dapl/include/dapl.h +++ b/trunk/ulp/dapl2/dapl/include/dapl.h @@ -237,9 +237,10 @@ struct dapl_llist_entry struct dapl_ring_buffer { void **base; /* base of element array */ - DAT_COUNT lim; /* mask, number of entries - 1 */ - DAPL_ATOMIC head; /* head pointer index */ - DAPL_ATOMIC tail; /* tail pointer index */ + DAT_COUNT size; + DAT_COUNT head; + DAT_COUNT tail; + DAPL_OS_LOCK lock; }; struct dapl_cookie_buffer @@ -438,7 +439,10 @@ struct dapl_ep ib_qp_state_t qp_state; /* communications manager handle (IBM OS API) */ - dp_ib_cm_handle_t cm_handle; + // dp_ib_cm_handle_t cm_handle; + + /* Add support for multiple CM object ownership */ + DAPL_LLIST_HEAD cm_list_head; /* store the remote IA address here, reference from the param * struct which only has a pointer, no storage -- 2.46.0