IN DAPL_EVD *evd_ptr,
IN DAT_COUNT qlen)
{
- DAT_EVENT *event_ptr;
- DAT_COUNT i;
+ DAT_EVENT *events;
+ DAT_COUNT old_qlen;
+ DAT_COUNT i;
+ intptr_t diff;
DAT_RETURN dat_status;
/* Allocate EVENTs */
- event_ptr = (DAT_EVENT *) dapl_os_realloc (evd_ptr->events,
+ events = (DAT_EVENT *) dapl_os_realloc (evd_ptr->events,
qlen * sizeof (DAT_EVENT));
- if (event_ptr == NULL)
+ if ( NULL == events )
{
dat_status = DAT_ERROR (DAT_INSUFFICIENT_RESOURCES, DAT_RESOURCE_MEMORY);
goto bail;
}
- evd_ptr->events = event_ptr;
- /* allocate free event queue */
+ diff = events - evd_ptr->events;
+ evd_ptr->events = events;
+
+ old_qlen = evd_ptr->qlen;
+ evd_ptr->qlen = qlen;
+
+ /* reallocate free event queue */
dat_status = dapls_rbuf_realloc (&evd_ptr->free_event_queue, qlen);
if (dat_status != DAT_SUCCESS)
{
goto bail;
}
+ dapls_rbuf_adjust (&evd_ptr->free_event_queue, diff);
- /* allocate pending event queue */
+ /* reallocate pending event queue */
dat_status = dapls_rbuf_realloc (&evd_ptr->pending_event_queue, qlen);
if (dat_status != DAT_SUCCESS)
{
goto bail;
}
-
- evd_ptr->qlen = qlen;
+ dapls_rbuf_adjust (&evd_ptr->pending_event_queue, diff);
/*
- * add events to free event queue. Need to verify that an entry is
- * not on the current queues before putting it on the free queue
+ * add new events to free event queue.
*/
- for (i = 0; i < qlen; i++, event_ptr++)
+ for (i = old_qlen; i < qlen; i++)
{
- if (dapls_rbuf_contains (&evd_ptr->free_event_queue, event_ptr)
- || dapls_rbuf_contains (&evd_ptr->pending_event_queue, event_ptr))
- {
- continue;
- }
- dapls_rbuf_add (&evd_ptr->free_event_queue, (void *)event_ptr);
+ dapls_rbuf_add (&evd_ptr->free_event_queue, (void *) &events[i]);
}
bail:
IN ib_work_completion_t *cqe_ptr)
{
#ifdef DAPL_DBG
+ static char *optable[] =
+ {
+ "OP_SEND",
+ "OP_RDMA_READ",
+ "OP_RDMA_WRITE",
+ "OP_COMP_AND_SWAP",
+ "OP_FETCH_AND_ADD",
+ "OP_RECEIVE",
+ "OP_BIND_MW",
+ 0
+ };
+
dapl_dbg_log (DAPL_DBG_TYPE_CALLBACK,
"\t >>>>>>>>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<<<<\n");
dapl_dbg_log (DAPL_DBG_TYPE_CALLBACK,
"\t dapl_evd_dto_callback : CQE \n");
dapl_dbg_log (DAPL_DBG_TYPE_CALLBACK,
- "\t\t work_req_id %llx\n",
+ "\t\t work_req_id %lli\n",
DAPL_GET_CQE_WRID (cqe_ptr));
if (DAPL_GET_CQE_STATUS (cqe_ptr) == 0)
{
dapl_dbg_log (DAPL_DBG_TYPE_CALLBACK,
- "\t\t op_type: %x\n",
- DAPL_GET_CQE_OPTYPE (cqe_ptr));
+ "\t\t op_type: %s\n",
+ optable[DAPL_GET_CQE_OPTYPE (cqe_ptr)]);
dapl_dbg_log (DAPL_DBG_TYPE_CALLBACK,
"\t\t bytes_num %d\n",
DAPL_GET_CQE_BYTESNUM (cqe_ptr));
DAPL_GET_CQE_OPTYPE (cqe_ptr), ep_ptr);
event_ptr->event_data.rmr_completion_event_data.status =
DAT_RMR_OPERATION_FAILED;
+ dapl_os_atomic_dec(&cookie->val.rmr.rmr->lmr->lmr_ref_count);
}
dapls_cookie_dealloc (&ep_ptr->req_buffer, cookie);
unsigned int rsize; /* real size */
/* The circular buffer must be allocated one too large.
- * This eliminates any need for a distinct counter, as that
+ * This eliminates any need for a distinct counter, as
* having the two pointers equal always means "empty" -- never "full"
*/
size++;
/*
* dapls_rbuf_realloc
*
- * Resizes an empty DAPL_RING_BUFFER. This function is not thread safe;
+ * Resizes a DAPL_RING_BUFFER. This function is not thread safe;
* adding or removing elements from a ring buffer while resizing
* will have indeterminate results.
*
INOUT DAPL_RING_BUFFER *rbuf,
IN DAT_COUNT size )
{
- DAPL_RING_BUFFER newring;
- void *entry;
- void **base;
- int val;
- int lim;
- int elem;
- int newelem;
+ DAPL_RING_BUFFER new_rbuf;
+ void *entry;
DAT_RETURN dat_status;
-
dat_status = DAT_SUCCESS;
+ /* decreasing the size or retaining the old size is not allowed */
if (size <= rbuf->lim + 1)
{
dat_status = DAT_ERROR (DAT_INVALID_PARAMETER, DAT_INVALID_ARG2);
/*
* !This is NOT ATOMIC!
* Simple algorithm: Allocate a new ring buffer, take everything
- * out of the old one and put it in the new one, swizzle pointers
- * and release the old base buffer.
+ * out of the old one and put it in the new one, and release the
+ * old base buffer.
*/
- dat_status = dapls_rbuf_alloc (&newring, size);
+ dat_status = dapls_rbuf_alloc (&new_rbuf, size);
if (dat_status != DAT_SUCCESS)
{
goto bail;
while ( (entry = dapls_rbuf_remove(rbuf)) != NULL)
{
/* We know entries will fit so ignore the return code */
- (void)dapls_rbuf_add (&newring, entry);
+ (void)dapls_rbuf_add (&new_rbuf, entry);
}
- /* swizzle pointers & values */
- elem = dapl_os_atomic_read (&rbuf->head);
- newelem = dapl_os_atomic_read (&newring.head);
- val = dapl_os_atomic_assign (&rbuf->head, elem, newelem);
- elem = dapl_os_atomic_read (&rbuf->tail);
- newelem = dapl_os_atomic_read (&newring.tail);
- val = dapl_os_atomic_assign (&rbuf->tail, elem, newelem);
-
- lim = rbuf->lim;
- base = rbuf->base;
- rbuf->lim = newring.lim;
- rbuf->base = newring.base;
/* release the old base buffer */
- dapl_os_free (base, (lim + 1) * sizeof (void *));
+ dapl_os_free (rbuf->base, (rbuf->lim + 1) * sizeof (void *));
+
+ *rbuf = new_rbuf;
bail:
-
return dat_status;
}
/*
- * dapls_rbuf_contains
+ * dapls_rbuf_adjust
*
- * Return TRUE or FALSE if an element exists in a ring buffer
+ * Adjusts the addresses of all elements stored in the
+ * ring buffer by a constant. This is useful for situations
+ * in which the memory area for the elements being stored
+ * has been reallocated (see dapl_evd_realloc() and helper
+ * functions).
*
* Input:
* rbuf pointer to DAPL_RING_BUFFER
- * entry entry to match
- *
+ * offset offset to adjust elemnt addresss by,
+ * used for addresses of type void *
* Output:
* none
*
* Returns:
- * DAT_TRUE
- * DAT_FALSE
-
+ * none
*/
-DAT_BOOLEAN
-dapls_rbuf_contains (
+void
+dapls_rbuf_adjust (
IN DAPL_RING_BUFFER *rbuf,
- IN void *entry)
+ IN intptr_t offset)
{
int pos;
pos = dapl_os_atomic_read (&rbuf->head);
while ( pos != dapl_os_atomic_read (&rbuf->tail) )
{
- if (rbuf->base[pos] == entry)
- return DAT_TRUE;
+ rbuf->base[pos] = rbuf->base[pos] + offset;
pos = (pos + 1) & rbuf->lim; /* verify in range */
}
-
- return DAT_FALSE;
-
}
/*