From 9cf2fc769f8084615c19b720adc6ba9ca3162a51 Mon Sep 17 00:00:00 2001 From: tzachid Date: Mon, 21 Nov 2005 12:09:40 +0000 Subject: [PATCH] Basic implementation of send now works. (Rev 79) git-svn-id: svn://openib.tc.cornell.edu/gen1@164 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86 --- trunk/ulp/sdp/kernel/Precompile.h | 4 +- trunk/ulp/sdp/kernel/SOURCES | 1 + trunk/ulp/sdp/kernel/SdpBufferPool.cpp | 321 ++++++++++++++++++ trunk/ulp/sdp/kernel/SdpBufferPool.h | 135 ++++++++ trunk/ulp/sdp/kernel/SdpGenUtils.cpp | 23 ++ trunk/ulp/sdp/kernel/SdpGenUtils.h | 6 + trunk/ulp/sdp/kernel/SdpLock.h | 245 +++++++++++++ trunk/ulp/sdp/kernel/SdpSocket.cpp | 453 +++++++++++++++++++++++-- trunk/ulp/sdp/kernel/SdpSocket.h | 23 +- trunk/ulp/sdp/kernel/SdpTrace.cpp | 3 +- trunk/ulp/sdp/kernel/SdpTrace.h | 3 + 11 files changed, 1182 insertions(+), 35 deletions(-) create mode 100644 trunk/ulp/sdp/kernel/SdpBufferPool.cpp create mode 100644 trunk/ulp/sdp/kernel/SdpBufferPool.h create mode 100644 trunk/ulp/sdp/kernel/SdpLock.h diff --git a/trunk/ulp/sdp/kernel/Precompile.h b/trunk/ulp/sdp/kernel/Precompile.h index ca6a3bc5..d2079c20 100644 --- a/trunk/ulp/sdp/kernel/Precompile.h +++ b/trunk/ulp/sdp/kernel/Precompile.h @@ -16,12 +16,14 @@ class SdpArp; #include "ib_al.h" #include "sdpMsgs.h" +#include "SdpGenUtils.h" #include "SdpTrace.h" +#include "sdpLock.h" #include "RefCount.h" #include "sdpdriver.h" #include "SdpShared.h" #include "SdpUserFile.h" -#include "SdpGenUtils.h" +#include "SdpBufferPool.h" #include "SdpSocket.h" #include "SdpArp.h" diff --git a/trunk/ulp/sdp/kernel/SOURCES b/trunk/ulp/sdp/kernel/SOURCES index 1b775bcc..79448a6b 100644 --- a/trunk/ulp/sdp/kernel/SOURCES +++ b/trunk/ulp/sdp/kernel/SOURCES @@ -7,6 +7,7 @@ SOURCES= SdpDriver.cpp \ SdpGenUtils.cpp \ SdpSocket.cpp \ SdpArp.cpp \ + SdpBufferPool.cpp \ SdpTrace.cpp INCLUDES=..\include;\ diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp new file mode 100644 index 00000000..b71a6be2 --- /dev/null +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp @@ -0,0 +1,321 @@ +/* Copyright mellanox */ +#pragma warning(disable: 4244 ) + +#include "preCompile.h" + +NTSTATUS +BufferPool::Init( + int MaxBuffers, + int MaxConcurrentSends, + int MaxMessageSize, + ib_pd_handle_t pd, + ib_qp_handle_t qp + ) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + m_MaxBuffers = MaxBuffers; + m_MaxConcurrentSends = MaxConcurrentSends; + m_MaxMessageSize = MaxMessageSize; + m_ClientBeingServed = false; + m_CurrentlySentBuffers = 0; + m_CurrentlyAllocated = 0; + m_ClientWaiting = false; + KeInitializeEvent(&m_WaitingClients, NotificationEvent, FALSE); + ASSERT(pd != NULL); + m_pd = pd; + ASSERT(qp != NULL); + m_qp = qp; + + return STATUS_SUCCESS; +} + +/* + This function is being called by a thread that wants to do a send in order + to have a buffer that he can copy the data to. + FirstBuffer tells if this is the first buffer that he wants. + If it is true, this means that no other request will be handled before + this client will indicate that he has finished queing his data. + If an event is returned this means that the caller has to wait on the + event before the request will be staisfied. + + This function is being called under a lock + +*/ +NTSTATUS +BufferPool::GetBuffer( + BufferDescriptor **ppBufferDescriptor, + KEVENT **ppEvent, + bool FirstBuffer + ) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p FirstBuffer = %s\n",this, + FirstBuffer ? "TRUE" : "FALSE")); + NTSTATUS rc = STATUS_SUCCESS; + *ppBufferDescriptor = NULL; + + if (m_ClientBeingServed == true && (FirstBuffer != false)) { + // The request can not be staisfied right now. We need to hold it + // until our request is being freed + // BUGBUG: iMPLMENT: create event and put it in the queue + ASSERT(FALSE); + } + + if (FirstBuffer == true) { + m_ClientBeingServed = true; + } + + // Can we supply a buffer right now ? + if (m_CurrentlyAllocated < m_MaxBuffers) { + // yes, supply a buffer + if (m_FreePackets.Size() > 0) { + LIST_ENTRY *item = m_FreePackets.RemoveHeadList(); + *ppBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + goto Cleanup; + } else { + // we need to alocate a new buffer + rc = AllocateBuffer(ppBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("AllocateBuffer failed rc = 0x%x\n", rc )); + ASSERT(*ppBufferDescriptor == NULL); + goto Cleanup; + } + m_CurrentlyAllocated++; + goto Cleanup; + } + } else { + // No buffers available, we have to wait + ASSERT(m_ClientWaiting == false); + KeClearEvent(&m_WaitingClients); + m_ClientWaiting = true; + *ppEvent = &m_WaitingClients; + } + +Cleanup: + return rc; +} + +NTSTATUS +BufferPool::AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p pBufferDescriptor = 0x%x\n",this, + pBufferDescriptor)); + NTSTATUS rc = STATUS_SUCCESS; + + if ((m_CurrentlySentBuffers < m_MaxConcurrentSends) && + (m_QueuedPackets.Size() == 0 )){ + // we can send right away (no need to wait for anything) + rc = SendBuffer(pBufferDescriptor); + goto Cleanup; + } else { + // we put the buffer in the queued list + m_QueuedPackets.InsertTailList(&pBufferDescriptor->BuffersList); + } + +Cleanup: + return rc; + +} + +/* + This function is being called by a client that has asked for some buffers + when he has recieved all it's data +*/ +VOID +BufferPool::AllowOthersToGet() +{ + ASSERT(m_ClientBeingServed == true); + m_ClientBeingServed = false; + + // BUGBUG: this means that we should free the next waiter (Once we support more + // than one thread). +} + + +VOID +BufferPool::ReturnBuffer(BufferDescriptor *pBufferDescriptor) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p buffer=0x%p\n",this, pBufferDescriptor)); + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); + // Is there a client waiting ? + if ( m_ClientWaiting) { + KeSetEvent( &m_WaitingClients, IO_NO_INCREMENT, FALSE ); + m_ClientWaiting = false; + } + m_CurrentlySentBuffers--; + ASSERT(m_CurrentlySentBuffers >= 0); +} + +/* + This function goes over the list of packets that we can send, and sends + them. It is called under the lock, and might be called also from a DPC + context. + +*/ +NTSTATUS +BufferPool::SendBuffersIfCan() +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + NTSTATUS rc = STATUS_SUCCESS; + + while ((m_QueuedPackets.Size() > 0) && + (m_CurrentlySentBuffers < m_MaxConcurrentSends)) { + // we can now send the next buffer + LIST_ENTRY *item = m_QueuedPackets.RemoveHeadList(); + BufferDescriptor *pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + rc = SendBuffer(pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("SendBuffer failed rc = 0x%x\n", rc )); + goto Cleanup; + } + } + +Cleanup: + return rc; + +} + +/* + This function is being called from under the lock and is the last one to be called. + It frees all resources + +*/ +VOID +BufferPool::ShutDown() +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + BufferDescriptor *pBufferDescriptor = NULL; + LIST_ENTRY *item = NULL; + + while (m_FreePackets.Size() > 0 ) { + item = m_FreePackets.RemoveHeadList(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + DeAllocateBuffer(pBufferDescriptor); + } + + while (m_QueuedPackets.Size() > 0 ) { + item = m_QueuedPackets.RemoveHeadList(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + DeAllocateBuffer(pBufferDescriptor); + } + +} + +NTSTATUS +BufferPool::AllocateBuffer(BufferDescriptor ** ppBufferDescriptor) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + NTSTATUS rc = STATUS_SUCCESS; + BufferDescriptor *pBufferDescriptor = NULL; + ib_mr_create_t mr_create; + uint32_t rkey; + + // Allocate the buffer descriptor + pBufferDescriptor = + (BufferDescriptor *) + ExAllocatePoolWithTag( + NonPagedPool , + sizeof BufferDescriptor, + SEND_BUFFERS_ALLOCATION_TAG + ); + if (pBufferDescriptor == NULL) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); + rc = STATUS_NO_MEMORY; + goto Cleanup; + } + + // Allocate the buffer itself + pBufferDescriptor->pBuffer = + ExAllocatePoolWithTag( + NonPagedPool , + m_MaxMessageSize, + SEND_BUFFERS_ALLOCATION_TAG + ); + + if (pBufferDescriptor->pBuffer == NULL) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); + rc = STATUS_NO_MEMORY; + goto Cleanup; + } + + pBufferDescriptor->BufferSize = m_MaxMessageSize; + pBufferDescriptor->DataSize = 0; + pBufferDescriptor->mr_handle = NULL; + + // Now we need to register this memory with the hardware + mr_create.vaddr = pBufferDescriptor->pBuffer; + mr_create.length = pBufferDescriptor->BufferSize; + mr_create.access_ctrl = IB_AC_LOCAL_WRITE; + + ib_api_status_t ib_status = ib_reg_mem( m_pd, &mr_create, &pBufferDescriptor->ds_array.lkey, &rkey, &pBufferDescriptor->mr_handle ); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ib_reg_mem failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + +Cleanup: + if (!NT_SUCCESS(rc)) { + if (pBufferDescriptor != NULL) { + if (pBufferDescriptor->pBuffer != NULL) { + ExFreePoolWithTag(pBufferDescriptor->pBuffer, SEND_BUFFERS_ALLOCATION_TAG); + } + ExFreePoolWithTag(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); + } + } + *ppBufferDescriptor = pBufferDescriptor; + return rc; +} + +VOID +BufferPool::DeAllocateBuffer(BufferDescriptor *pBufferDescriptor) +{ + //????? clear the memory here. + // ????? + +} + +NTSTATUS +BufferPool::SendBuffer(BufferDescriptor *pBufferDescriptor) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + NTSTATUS rc = STATUS_SUCCESS; + + msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *) pBufferDescriptor->pBuffer; + + pHeader->recv_bufs = QP_ATTRIB_RQ_DEPTH; //?????recv_bufs = conn->l_advt_bf; + pHeader->size = pBufferDescriptor->DataSize + sizeof msg_hdr_bsdh; + pHeader->seq_num = 1;//?????++conn->send_seq; + pHeader->seq_ack = 0;//????conn->advt_seq; + pHeader->mid = SDP_MID_DATA; + pHeader->flags = SDP_MSG_FLAG_NON_FLAG; + /* + * endian swap + */ + sdp_msg_swap_bsdh(pHeader); + + ib_send_wr_t send_wr; + + send_wr.p_next = NULL; + send_wr.wr_id = (uintn_t)pBufferDescriptor;//??? buff->wrid;//?????(uint64_t) (uintptr_t) wr; + send_wr.wr_type = WR_SEND; + send_wr.send_opt = IB_SEND_OPT_SIGNALED;//socket_info->send_opt; + + pBufferDescriptor->ds_array.length = pBufferDescriptor->DataSize + sizeof msg_hdr_bsdh; + pBufferDescriptor->ds_array.vaddr = (uint64_t)(void* __ptr64) pBufferDescriptor->pBuffer; + + send_wr.num_ds = 1; + send_wr.ds_array = &pBufferDescriptor->ds_array; + + ib_api_status_t ib_status = ib_post_send(m_qp, &send_wr, NULL); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ib_post_send failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + m_CurrentlySentBuffers ++; + +Cleanup: + return rc; +} + + diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.h b/trunk/ulp/sdp/kernel/SdpBufferPool.h new file mode 100644 index 00000000..a88fd35d --- /dev/null +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.h @@ -0,0 +1,135 @@ +/* Copyright mellanox */ + +#ifndef H_SDP_BUFFER_POOL_H +#define H_SDP_BUFFER_POOL_H + + +// This is simply a wrapper to the LIST_ENTRY class that allows +// easier work with this list +class LinkedList { + +public: + LinkedList() { + size = 0; + InitializeListHead(&m_Data); + } + + int Size() {return size;} + + LIST_ENTRY *RemoveHeadList() { + LIST_ENTRY *pTemp; + ASSERT(size > 0); + ASSERT(!IsListEmpty(&m_Data)); + pTemp = ::RemoveHeadList(&m_Data); + size--; + return pTemp; + } + + VOID InsertTailList (LIST_ENTRY *Item) { + ::InsertTailList(&m_Data, Item); + size++; + } + +private: + int size; + LIST_ENTRY m_Data; +}; + + +// The defenition of the function that we use to report back errors +typedef void (* SendErrorCB )(NTSTATUS Error, VOID *Context); + + +// Each buffer starts with msg_hdr_bsdh and is followed by the actual data +class BufferDescriptor { +public: + NTSTATUS WriteData(char *pData, uint32_t size) { + NTSTATUS rc = STATUS_SUCCESS; + ASSERT(size <= BufferSize - sizeof msg_hdr_bsdh); + char *pStart = (char *) pBuffer + sizeof msg_hdr_bsdh; + rc = CopyFromUser(pStart, pData, size); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("CopyFromUser failed rc = 0x%x\n", rc )); + goto Cleanup; + } + DataSize = size; + Cleanup: + return rc; + } + + // Each buffer starts with bsdh_hdr structure + VOID *pBuffer; // A pointer to the actual place that we put the data + int BufferSize; // The total size of the buffer + int DataSize; // The size of the data that we have allocated + LIST_ENTRY BuffersList; // The place to hold the list of the buffers + ib_mr_handle_t mr_handle; // A handle to the registared memory, + + ib_local_ds_t ds_array; // Used for sending the buffer + +}; + +class BufferPool { + +public: + + NTSTATUS Init( + int MaxBuffers, + int MaxConcurrentSends, + int MaxMessageSize, + ib_pd_handle_t pd, + ib_qp_handle_t qp + ); + + NTSTATUS GetBuffer( + BufferDescriptor ** ppBufferDescriptor, + KEVENT **ppEvent, + bool FirstBuffer + ); + + NTSTATUS AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor); + + VOID AllowOthersToGet(); + + VOID ReturnBuffer(BufferDescriptor *pBufferDescriptor); + + NTSTATUS SendBuffersIfCan(); + + VOID ShutDown(); + +private: + + NTSTATUS AllocateBuffer(BufferDescriptor ** ppBufferDescriptor); + + VOID DeAllocateBuffer(BufferDescriptor *pBufferDescriptor); + + NTSTATUS SendBuffer(BufferDescriptor *pBufferDescriptor); + + // Global data about this connection + int m_MaxBuffers; // The maximum number of buffers that we allow for this QP + int m_MaxConcurrentSends; // The total numbers of sends that are allowd for the QP + int m_MaxMessageSize; // The maximum buffer size that we allw + + int m_CurrentlySentBuffers; // Number of buffers that we have sent, and didn't get an ack yet + int m_CurrentlyAllocated; // The number of buffers that we have allocated + + bool m_ClientBeingServed; // true if we have already started giving buffers to a client + + LinkedList m_FreePackets; // This packets are free and might be used + LinkedList m_QueuedPackets; // This packets were filled with data and should be filled + + + // TODO: A queue of events for threads that are waiting for buffers. + + // IBAL constants from the main socket structure + // TODO: Should they stay here and be used like this ? + ib_pd_handle_t m_pd; + ib_qp_handle_t m_qp; + + // A list of events that the users has to wait on. ???? currently only one + KEVENT m_WaitingClients; // switch to a linked list + bool m_ClientWaiting; + +}; + +#endif // H_SDP_BUFFER_POOL_H + diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp index b99b18bb..bfccea60 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp @@ -67,6 +67,29 @@ NTSTATUS return rc; } +NTSTATUS +CopyFromUser( + IN void* const p_dest, + IN const void* const p_src, + IN const size_t count ) +{ + /* + * The memory copy must be done within a try/except block as the + * memory could be changing while the buffer is copied. + */ + __try + { + ProbeForRead( (void*)p_src, count, 1 ); + RtlCopyMemory( p_dest, p_src, count ); + return STATUS_SUCCESS; + } + __except(EXCEPTION_EXECUTE_HANDLER) + { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("copying memory from user failed\n")); + ASSERT(FALSE); + return STATUS_ACCESS_DENIED; + } +} void* __cdecl operator new(size_t n ) throw() { diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.h b/trunk/ulp/sdp/kernel/SdpGenUtils.h index 30deb83e..622eae08 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.h +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.h @@ -5,6 +5,7 @@ #define GLOBAL_ALLOCATION_TAG ' pdS' +#define SEND_BUFFERS_ALLOCATION_TAG 'SpdS' class CSpinLockWrapper { @@ -43,6 +44,11 @@ NTSTATUS IB2Status (ib_api_status_t ib_status); USHORT nthos(USHORT in); +NTSTATUS +CopyFromUser( + IN void* const p_dest, + IN const void* const p_src, + IN const size_t count ); NTSTATUS MyKeWaitForSingleObject( diff --git a/trunk/ulp/sdp/kernel/SdpLock.h b/trunk/ulp/sdp/kernel/SdpLock.h new file mode 100644 index 00000000..9ec2ea8c --- /dev/null +++ b/trunk/ulp/sdp/kernel/SdpLock.h @@ -0,0 +1,245 @@ +/* Copyright mellanox */ +#ifndef _SDP_LOCK_H +#define _SDP_LOCK_H + +/* +The goal of this lock is to be a user mode lock that will allow us to synchronize +both "user" operations at PASSIVE level as well as DPC's at DPC level. + +The main problem that we have is that we have many functions that we can only call at +passive level, and therefore can not be called under a spinlock. + +We might, however, receive notifications at DPC level. Example of such are send and +receive completions. As always, shutdown might appear at any time (at any level?). + +Bottom line of this is that the lock will be implemented as an event. DPC level +callers that will call us will only mark our state as send/received/shutdown arrived. + +Once one tries to take/free the lock from passive level, he will have to handle this +events first. + +Callers at DPC level, (send receive call backs) will only signal if the lock is taken +or do the actual job if it is not taken. + +There will therefore be a spinlock that will protect the event. + +*/ + + +// Still Need to make sure that all errors are handled when they should ?????? + +typedef NTSTATUS (* SendCBHandler )(SdpSocket *); + +const int SEND_CB_CALLED = 0x00000001; +const int RECV_CB_CALLED = 0x00000002; +const int SHUTDOWN_SIGNALLED = 0x00000004; +const int SHUTDOWN_HANDELED = 0x00000008; +const int ERROR_SIGNALLED = 0x00000010; + +const int DPC_FLAGS = SEND_CB_CALLED | SEND_CB_CALLED; +inline void ResetFlags(int &Flags) +{ + Flags &= (!(SEND_CB_CALLED | RECV_CB_CALLED)); +} + +inline void ResetDpcFlags(int &Flags) +{ + // Currently this function is just like the one above it. It will probably + // change in the future + Flags &= (!(DPC_FLAGS)); +} + +inline bool SomethingToHandle(int flags) +{ + if (flags & SEND_CB_CALLED) return true; + if (flags & RECV_CB_CALLED) return true; + if ((flags & SHUTDOWN_SIGNALLED) && !(flags & SHUTDOWN_HANDELED) ) return true; + + return false; +} + +class SdpLock { +public: + SdpLock() { + m_InUse = false; + m_flags = 0; + KeInitializeEvent(&m_Event, NotificationEvent , TRUE); + KeInitializeSpinLock(&m_SpinLock); + m_SendCBHandler = NULL; + } + + VOID Init(SendCBHandler SendCB, SdpSocket *pSdpSocket) + { + m_SendCBHandler = SendCB; + m_pSdpSocket = pSdpSocket; + } + + /* + Lock should handle recieve_cb/send_cb without user knowledge. + for shutdown, it should return false and not continue + + return value of false means that the lock can not be taken (eitheir + shutdown or STATUS_ALERTED, or some error has happend) + */ + bool Lock() { + KIRQL OldIrql; + int OldFlags = 0; + NTSTATUS rc = STATUS_SUCCESS; + ASSERT(KeGetCurrentIrql() == PASSIVE_LEVEL); + bool Locked = false; + do { + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + + if (m_InUse) { + // We have to release the spinlock and wait on the event + KeReleaseSpinLock(&m_SpinLock, OldIrql); + rc = MyKeWaitForSingleObject(&m_Event, UserRequest, UserMode, false, NULL); + if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { + SDP_PRINT(SDP_WARN, SDP_LOCK, ("MyKeWaitForSingleObject was alerted = 0x%x\n", rc )); + rc = STATUS_UNEXPECTED_IO_ERROR; + SignalShutdown(); + Locked = false; + goto Cleanup; + } + continue; + } + m_InUse = true; + KeClearEvent(&m_Event); + OldFlags = m_flags; + ResetFlags(m_flags); + KeReleaseSpinLock(&m_SpinLock, OldIrql); + rc = HandleFlags(OldFlags); + if (!NT_SUCCESS(rc)) { + // We have to signal the error to the calling side + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + Locked = false; + ASSERT(m_flags & ERROR_SIGNALLED); + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + m_InUse = false; + // Release whoever is waiting + KeSetEvent(&m_Event, IO_NO_INCREMENT, FALSE); + KeReleaseSpinLock(&m_SpinLock, OldIrql); + goto Cleanup; + } + // Exit the loop + Locked = true; + goto Cleanup; + } while (true); + +Cleanup: + SDP_PRINT(SDP_DEBUG, SDP_LOCK,("Lock is returing %s\n", Locked ? "true" : "false")); + return Locked; + } + + /* + Frees the lock and handle any events that might happen there. + Please note that the lock is freed no metter what the error code is. + An error means that there was some error in the sockets. + */ + NTSTATUS Unlock() + { + KIRQL OldIrql; + int OldFlags = 0; + NTSTATUS rc = STATUS_SUCCESS; + + while (true) { + ASSERT(m_InUse); + ASSERT(KeGetCurrentIrql() == PASSIVE_LEVEL); + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + OldFlags = m_flags; + ResetFlags(m_flags); + if (!SomethingToHandle(OldFlags)) { + // We can safely quit the lock + m_InUse = false; + } + KeReleaseSpinLock(&m_SpinLock, OldIrql); + if (SomethingToHandle(OldFlags)) { + rc = HandleFlags(OldFlags); + if (!NT_SUCCESS(rc)) { + // We have to signal the error to the calling side + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + ASSERT(m_flags & ERROR_SIGNALLED); + } + // At the time that we were handeling the flags, someone might have + // signaled something, so we have to try again + continue; + } + break; + } + + // Release whoever is waiting + KeSetEvent(&m_Event, IO_NO_INCREMENT, FALSE); + return rc; + } +/* + This function is being called at DPC level. It has some message of a call back. + to tell us. Once called, it will try to take the lock. If it succeeds, it will + do the actual work, if not it will only signal. Once it returns the lock is freed + again +*/ + bool SignalCB(int flags) + { + KIRQL OldIrql; + int OldFlags = 0; + NTSTATUS rc = STATUS_SUCCESS; + ASSERT(KeGetCurrentIrql() == DISPATCH_LEVEL); + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + if (m_InUse) { + m_flags |= flags; + KeReleaseSpinLock(&m_SpinLock, OldIrql); + return false; + } + m_InUse = true; + // In this lock, we only handle DPC events + OldFlags = (m_flags & DPC_FLAGS) | flags; + ResetDpcFlags(m_flags); + KeClearEvent(&m_Event); + KeReleaseSpinLock(&m_SpinLock, OldIrql); + rc = HandleFlags(OldFlags); + if (!NT_SUCCESS(rc)) { + // We have to signal the error to the calling side + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + ASSERT(m_flags & ERROR_SIGNALLED); + } + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + // Release whoever is waiting + m_InUse = false; + KeSetEvent(&m_Event, IO_NO_INCREMENT, FALSE); + KeReleaseSpinLock(&m_SpinLock, OldIrql); + return true; + } + + /* + This function is responsible for handling the flags that we might get. + Currently it can be called from passive or DPC level, and handle only "DPC" events + */ + NTSTATUS HandleFlags(int flags) { + NTSTATUS rc = STATUS_SUCCESS; + if (flags & SEND_CB_CALLED) { + // We need to handle the send CB + rc = m_SendCBHandler(m_pSdpSocket); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("SendBuffer failed rc = 0x%x\n", rc )); + m_flags |= ERROR_SIGNALLED; + // We continue from here since, there might be other things to handle, + // and this might be in a DPC context + } + } + return rc; + } + + VOID SignalShutdown() {ASSERT (FALSE);} //????????????? Make sure this is used + VOID SignalError(NTSTATUS rc) {ASSERT (FALSE);} //????????????? + + KEVENT m_Event; // the event for passive level threads + KSPIN_LOCK m_SpinLock; // The real guard of the lock + SendCBHandler m_SendCBHandler; + + + bool m_InUse; // Tells if this lock has any user + int m_flags; // call backs that were recieved + + SdpSocket *m_pSdpSocket; // The socket that this class depends on +}; + +#endif // _SDP_LOCK_H diff --git a/trunk/ulp/sdp/kernel/SdpSocket.cpp b/trunk/ulp/sdp/kernel/SdpSocket.cpp index 33cd8b22..a1853604 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.cpp +++ b/trunk/ulp/sdp/kernel/SdpSocket.cpp @@ -5,11 +5,12 @@ #pragma warning(disable: 4244 ) NTSTATUS sdp_cm_hello_ack_check(struct sdp_msg_hello_ack *hello_ack); +static NTSTATUS __send_cb2(SdpSocket * pSdpSocket); static void AL_API cm_rej_callback(IN ib_cm_rej_rec_t *p_cm_rej_rec ) { - SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("cm_rej_callback called")); + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("dispatch level = %d\n", KeGetCurrentIrql())); // BUGBUG: This should be used to return error to the connecting side } @@ -55,7 +56,7 @@ cm_apr_callback( static void AL_API cm_dreq_callback(IN ib_cm_dreq_rec_t *p_cm_dreq_rec ) { - SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("cm_dreq_callback called")); + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("dispatch level = %d\n", KeGetCurrentIrql())); ASSERT(FALSE); } SdpSocket::SdpSocket() @@ -70,8 +71,6 @@ SdpSocket::SdpSocket() m_scq = NULL; m_qp = NULL; - m_shutdown = false; - m_state = SS_IDLE; } @@ -83,13 +82,158 @@ NTSTATUS SdpSocket::Init( SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("SdpSocket::Init this = 0x%p\n", this)); m_CreationFlags = pSocketInParam->dwFlags; + + m_Lock.Init(__send_cb2, this); pSocketOutParam->Errno = 0;// No error pSocketOutParam->pSocket = this; // give the user a handle to the socket - KeInitializeSpinLock(&m_Lock); return rc; } +struct sdpc_buff { +// struct sdpc_buff *next; +// struct sdpc_buff *prev; +// u32 type; /* element type. (for generic queue) */ +// struct sdpc_buff_q *pool; /* pool currently holding this buffer. */ +// int (*release)(struct sdpc_buff *buff); /* release the object */ + /* + * primary generic data pointers + */ + void *head; /* first byte of data buffer */ + void *data; /* first byte of valid data in buffer */ + void *tail; /* last byte of valid data in buffer */ + void *end; /* last byte of data buffer */ + /* + * Experimental + */ + uint32_t flags; /* Buffer flags */ + /* + * Protocol specific data + */ + struct msg_hdr_bsdh *bsdh_hdr; /* SDP header (BSDH) */ + uint32_t data_size; /* size of just data in the buffer */ + uint64_t wrid; /* IB work request ID */ + /* + * IB specific data (The main buffer pool sets the lkey when + * it is created) + */ + uint64_t real; /* component of scather/gather list (address) */ + uint32_t size; /* component of scather/gather list (lenght) */ + uint32_t lkey; /* component of scather/gather list (key) */ +}; + +const int BUFFER_SIZE = 4000 + 16;//65536; + +#define SDP_BUFF_F_UNSIG 0x0001 /* unsignalled buffer */ + +#define SDP_BUFF_F_GET_UNSIG(buff) ((buff)->flags & SDP_BUFF_F_UNSIG) +#define SDP_BUFF_F_SET_UNSIG(buff) ((buff)->flags |= SDP_BUFF_F_UNSIG) +#define SDP_BUFF_F_CLR_UNSIG(buff) ((buff)->flags &= (~SDP_BUFF_F_UNSIG)) + + +NTSTATUS SdpSocket::WSPSend( + WspSendIn *pWspSendIn, + WspSendOut *pWspSendOut + ) +{ + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n",this)); + char temp[4000]; + memcpy(temp,"abcd",5); + + NTSTATUS rc = STATUS_SUCCESS; + BufferDescriptor * pBufferDescriptor = NULL; + bool First = true; + ULONG Coppied = 0; + bool Locked = false; + PRKEVENT pBuffersEvent = NULL; + + while (Coppied < pWspSendIn->BufferSize) { + if ((Locked == false) && !m_Lock.Lock()) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); + rc = STATUS_SHUTDOWN_IN_PROGRESS; + goto Cleanup; + } + Locked = true; + + rc = m_SendBufferPool.GetBuffer(&pBufferDescriptor, &pBuffersEvent, First); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.GetBuffer failed rc = 0x%x\n", rc )); + m_Lock.Unlock(); // Error ignored as this is already an error pass + Locked = false; + goto Cleanup; + } + First = false; + + if (pBuffersEvent != NULL) { + // We are told to wait on this event + rc = m_Lock.Unlock(); + Locked = false; + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); + goto Cleanup; + } + + rc = MyKeWaitForSingleObject( + pBuffersEvent, + UserRequest, + UserMode, + FALSE, + NULL + ); + + if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { + // BUGBUG: Think what to do here, we should be able to stop the + // connect, and quit (probably shutdown should be enough) + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("MyKeWaitForSingleObject was alerted = 0x%x\n", rc )); + rc = STATUS_UNEXPECTED_IO_ERROR; + //pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error + Shutdown(); + goto Cleanup; + } + // try getting the buffer again + continue; + } + + // copy the data from the user mode to the buffers + ULONG CopySize = pBufferDescriptor->BufferSize - sizeof msg_hdr_bsdh; + CopySize = min(CopySize, pWspSendIn->BufferSize - Coppied); + + pBufferDescriptor->WriteData(pWspSendIn->pData + Coppied, CopySize); + Coppied += CopySize; + + // return the data to the buffer + rc = m_SendBufferPool.AddBufferToQueuedList(pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.AddBufferToQueuedList failed rc = 0x%x\n", rc )); + // free the buffer that you have + m_SendBufferPool.ReturnBuffer(pBufferDescriptor); + m_Lock.Unlock(); // Error ignored as this is already an error pass + goto Cleanup; + } + } + ASSERT(Locked == true); + rc = m_Lock.Unlock(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); + goto Cleanup; + } + + m_SendBufferPool.AllowOthersToGet(); + +Cleanup: + if (NT_SUCCESS(rc) ) { + pWspSendOut->Errno = 0; + pWspSendOut->NumberOfBytesSent = pWspSendIn->BufferSize; + } else { + // Make sure that we have the error setted + ASSERT(pWspSendOut->Errno != 0); // BUGBUG: Need to make sure that this + // is indeed the case. + } + return rc; +} + +#if 0 +//Naive send implmentation. NTSTATUS SdpSocket::WSPSend( WspSendIn *pWspSendIn, WspSendOut *pWspSendOut @@ -98,13 +242,147 @@ NTSTATUS SdpSocket::WSPSend( NTSTATUS rc = STATUS_SUCCESS; SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n",this)); + ib_mr_create_t mr_create; + + /* Memory registration parameters, returned by ib_reg_mem. */ + char *BufferStart = NULL; + uint32_t lkey; + uint32_t rkey; + ib_mr_handle_t mr_handle = NULL; + + + + // First allocate a buffer and a buffer descriptor + sdpc_buff *buff = new sdpc_buff; + ASSERT(buff != NULL); + BufferStart = new CHAR [BUFFER_SIZE]; + ASSERT(BufferStart != NULL); + buff->head = BufferStart; + + // we leave enough space for holding the header of the request + buff->end = (CHAR *)(buff->head) + BUFFER_SIZE; + buff->head = (char *)(buff->head) + 0x10; + + buff->data = buff->head; + buff->tail = buff->head; + buff->lkey = 0; + buff->real = 0; + buff->size = 0; + + // Copy the data to the buffer + memcpy(buff->data, "5678",5); + buff->tail = (char *)(buff->tail) + 5; + + + // Register the buffer + mr_create.vaddr = BufferStart; + mr_create.length = BUFFER_SIZE; + mr_create.access_ctrl = IB_AC_LOCAL_WRITE; + + ib_api_status_t ib_status = ib_reg_mem( m_pd, &mr_create, &lkey, &rkey, &mr_handle ); + ASSERT(ib_status == IB_SUCCESS); + + + + + + // Send the buffer. + buff->data = (char *)(buff->head) - sizeof(struct msg_hdr_bsdh); + buff->bsdh_hdr = (struct msg_hdr_bsdh *) buff->data; + buff->bsdh_hdr->mid = SDP_MID_DATA; + buff->bsdh_hdr->flags = SDP_MSG_FLAG_NON_FLAG; + buff->bsdh_hdr->size = BUFFER_SIZE; + + + /* + * signalled? With no delay turned off, data transmission may be + * waiting for a send completion. + */ + SDP_BUFF_F_SET_UNSIG(buff); + + buff->wrid = 0;//conn->send_wrid++; + + buff->lkey = lkey; + buff->bsdh_hdr->recv_bufs = QP_ATTRIB_RQ_DEPTH; //?????recv_bufs = conn->l_advt_bf; +//?????? put this in buff->bsdh_hdr->size = (char *)buff->tail - (char *)buff->data; + buff->bsdh_hdr->seq_num = 1;//?????++conn->send_seq; + buff->bsdh_hdr->seq_ack = 0;//????conn->advt_seq; + + /* + * endian swap + */ + sdp_msg_swap_bsdh(buff->bsdh_hdr); + buff->real = (uint64_t)(void* __ptr64)BufferStart; + buff->size = BUFFER_SIZE; + + /* + * save the buffer for the event handler. + */ +#if 0 + result = sdp_buff_q_put_tail(&conn->send_post, buff); + if (result < 0) { + sdp_dbg_warn(conn, "Error <%d> queueing send buffer", result); + goto done; + } +#endif + /* + * post send + */ +/* + buff->size = buff->tail - buff->data; + buff->real = dma_map_single(conn->ca->dma_device, + buff->data, + buff->size, + PCI_DMA_TODEVICE); + send_param.next = NULL; + send_param.wr_id = buff->wrid; + send_param.sg_list = (struct ib_sge *)&buff->real; + send_param.num_sge = 1; + send_param.opcode = IB_WR_SEND; +*/ + ib_send_wr_t send_wr; + + send_wr.p_next = NULL; + send_wr.wr_id = buff->wrid;//?????(uint64_t) (uintptr_t) wr; + send_wr.wr_type = WR_SEND; + send_wr.send_opt = IB_SEND_OPT_SIGNALED;//IB_SEND_OPT_INLINE;//socket_info->send_opt; + + + ib_local_ds_t ds_array; + ds_array.length = buff->size; + ds_array.lkey = buff->lkey; + ds_array.vaddr = buff->real; + + send_wr.num_ds = 1; + send_wr.ds_array = &ds_array; + + ib_status = ib_post_send(m_qp, &send_wr, NULL); + ASSERT(ib_status == IB_SUCCESS); + + + + // Wait for the notification of send compleated ????? + rc = MyKeWaitForSingleObject( + &m_SendCompleteEvent, + UserRequest, + UserMode, + FALSE, + NULL); + KeResetEvent(&m_SendCompleteEvent); + +//Cleanup: + if (mr_handle != NULL) { + ib_dereg_mr(mr_handle); + } + delete [] BufferStart; + delete buff; pWspSendOut->Errno = 0; pWspSendOut->NumberOfBytesSent = pWspSendIn->BufferSize; - return rc; - + return rc; } +#endif NTSTATUS SdpSocket::WSPConnect( @@ -117,7 +395,6 @@ NTSTATUS SdpSocket::WSPConnect( ib_net64_t SrcPortGuid; ib_net64_t DestPortGuid; ib_path_rec_t path_rec; - CSpinLockWrapper Lock(m_Lock); SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p remote addresses ip=%d.%d.%d.%d:%d\n", this, @@ -136,13 +413,14 @@ NTSTATUS SdpSocket::WSPConnect( // check socket state // BUGBUG: Do a better work here - Lock.Lock(); + m_Lock.Lock();//??? retval if (m_state != SS_IDLE) { // We can not connect in this state SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Invalid Socket state %s\n", SS2String(m_state))); pWspConnectOut->Errno = WSAEINVAL; + m_Lock.Unlock(); //?????retval goto Cleanup; - Lock.Unlock(); + } // @@ -156,7 +434,7 @@ NTSTATUS SdpSocket::WSPConnect( if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_pSdpArp->SourceAddrFromDestAddr failed rc = 0x%x\n", rc )); pWspConnectOut->Errno = WSAENETUNREACH; - Lock.Unlock(); + m_Lock.Unlock(); // Error ignored as this is already an error pass goto Cleanup; } } @@ -167,7 +445,7 @@ NTSTATUS SdpSocket::WSPConnect( if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_pSdpArp->SourcePortGidFromIP failed rc = 0x%x\n", rc )); pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error - Lock.Unlock(); + m_Lock.Unlock(); // Error ignored as this is already an error pass goto Cleanup; } } @@ -175,7 +453,7 @@ NTSTATUS SdpSocket::WSPConnect( if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_pSdpArp->SourcePortGidFromIP failed rc = 0x%x\n", rc )); pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error - Lock.Unlock(); + m_Lock.Unlock(); // Error ignored as this is already an error pass goto Cleanup; } @@ -183,7 +461,7 @@ NTSTATUS SdpSocket::WSPConnect( if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_pSdpArp->DestPortGidFromIP failed rc = 0x%x\n", rc )); pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error - Lock.Unlock(); + m_Lock.Unlock(); // Error ignored as this is already an error pass goto Cleanup; } @@ -193,7 +471,7 @@ NTSTATUS SdpSocket::WSPConnect( // Since this is a function that might wait we do it without the lock m_state = SS_CONNECTING_QPR_SENT; - Lock.Unlock(); + m_Lock.Unlock(); //????? rc = g_pSdpDriver->m_pSdpArp->QueryPathRecord( SrcPortGuid, DestPortGuid, &path_rec ); if (!NT_SUCCESS(rc)) { @@ -217,10 +495,6 @@ NTSTATUS SdpSocket::WSPConnect( goto Cleanup; } -// Lock.Lock(); // Do we really need the lock ? - - - // We need to prepare the hello mesage for the CM sdp_msg_hello hello_msg; CreateHelloHeader(&hello_msg, pWspConnectIn->IP); @@ -230,7 +504,7 @@ NTSTATUS SdpSocket::WSPConnect( CreateCmRequest(&cm_req, &hello_msg, &path_rec, pWspConnectIn->Port); // Create the event to wait on to the connection request to end: - KeInitializeEvent(&m_ConnectCmCompleteEvent, NotificationEvent, FALSE ); + KeInitializeEvent(&m_ConnectCmCompleteEvent, NotificationEvent , FALSE ); m_state = SS_CONNECTING_REQ_SENT; @@ -273,18 +547,18 @@ NTSTATUS SdpSocket::WSPConnect( } // we should now complete the request - Lock.Lock(); + m_Lock.Lock(); //????? retval if (m_state == SS_CONNECTED) { pWspConnectOut->Errno = 0; ASSERT(rc == STATUS_SUCCESS); - Lock.Unlock(); + m_Lock.Unlock(); //????? retval goto Cleanup; } else { // There probably was some error or some kind of shutdown, we // need to return an error. rc = STATUS_UNEXPECTED_IO_ERROR; pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error - Lock.Unlock(); + m_Lock.Unlock(); // Error ignored as this is already an error pass goto Cleanup; } @@ -327,7 +601,15 @@ NTSTATUS SdpSocket::CmSendRTU() if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("sdp_cm_hello_ack_check failed rc = 0x%x\n", rc )); goto Cleanup; - } + } + + int MaxMessageSize = min(m_hello_ack.hah.l_rcv_size, MAX_SEND_BUFFER_SIZE); + + rc = m_SendBufferPool.Init(MAX_SEND_PACKETS, QP_ATTRIB_SQ_DEPTH, MaxMessageSize, m_pd, m_qp); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.Init failed rc = 0x%x\n", rc )); + goto Cleanup; + } #if 0 /* @@ -378,6 +660,21 @@ NTSTATUS SdpSocket::CmSendRTU() // How should this be locked ?? m_state = SS_CONNECTED; + // we now arm the CQs + ib_status = ib_rearm_cq(m_rcq, FALSE); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_rearm_cq failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + + ib_status = ib_rearm_cq(m_scq, FALSE); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_rearm_cq failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + Cleanup: return rc; } @@ -414,6 +711,104 @@ __recv_cb1( ASSERT(FALSE); } +// TODO: Clear the callback functions mess +void +SdpSocket::__send_cb1( + IN const ib_cq_handle_t h_cq, + IN void *cq_context ) +{ + SdpSocket *pSocket = (SdpSocket *) cq_context; + pSocket->m_Lock.SignalCB(SEND_CB_CALLED); +} + +// This function is here so it's addresses can be taken +static NTSTATUS __send_cb2(SdpSocket * pSdpSocket) +{ + return pSdpSocket->send_cb(); +} + +NTSTATUS SdpSocket::send_cb() +{ + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("called this =0x%x\n", this)); + NTSTATUS rc = STATUS_SUCCESS; + ib_api_status_t ib_status; + ib_wc_t wc[QP_ATTRIB_SQ_DEPTH], *p_wc, *p_free; + size_t i; + BufferDescriptor *pBufferDescriptor = NULL; + + for( i = 0; i < QP_ATTRIB_SQ_DEPTH; i++ ) { + wc[i].p_next = &wc[i + 1]; + } + wc[QP_ATTRIB_SQ_DEPTH - 1].p_next = NULL; + + do + { + p_free = wc; + ib_status = ib_poll_cq( m_scq, &p_free, &p_wc ); + ASSERT( ib_status == IB_SUCCESS || ib_status == IB_NOT_FOUND); + if (ib_status != IB_SUCCESS) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_poll_cq failed ib_status=%d, this =0x%x\n", ib_status,this)); + ASSERT(ib_status == IB_INVALID_CQ_HANDLE || ib_status == IB_NOT_FOUND); + rc = IB2Status(ib_status); + goto Cleanup; + } + + while( p_wc ) + { + ASSERT( p_wc->wc_type == IB_WC_SEND ); + pBufferDescriptor = (BufferDescriptor*)(uintn_t)p_wc->wr_id; + m_SendBufferPool.ReturnBuffer(pBufferDescriptor); + + switch( p_wc->status ) + { + case IB_WCS_SUCCESS: + // Nothing to do here + break; + + case IB_WCS_WR_FLUSHED_ERR: + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Flushed send completion. this =0x%x\n", this)); + // Intentainly fall down + default: + SDP_PRINT( SDP_ERR, SDP_SOCKET, ("Send failed with %s\n", + ib_get_wc_status_str( p_wc->status )) ); + m_Lock.SignalError(IB2Status(ib_status)); + } +/* Do we need this ???? + free the memory that was used for the send + if( p_send_buf ) + { + cl_perf_start( FreeSendBuf ); + ExFreeToNPagedLookasideList( &p_port->buf_mgr.send_buf_list, + p_send_buf ); + cl_perf_stop( &p_port->p_adapter->perf, FreeSendBuf ); + } +*/ + + p_wc = p_wc->p_next; + } + /* If we didn't use up every WC, break out. */ + } while( !p_free ); + + + /* Rearm the CQ. */ + ib_status = ib_rearm_cq(m_scq, FALSE ); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_rearm_cq failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + + /* Resume any sends awaiting resources. */ + rc = m_SendBufferPool.SendBuffersIfCan(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.Init SendBuffersIfCan rc = 0x%x\n", rc )); + goto Cleanup; + } + +Cleanup: + return rc; + +} // BUGBUG: This code is based on __cq_event, find out what it realy does @@ -501,7 +896,7 @@ NTSTATUS SdpSocket::CreateQp() /* Allocate send CQ. */ cq_create.size = QP_ATTRIB_SQ_DEPTH; - cq_create.pfn_comp_cb = __recv_cb1; // ???? We are not doing anything there ??? why bother + cq_create.pfn_comp_cb = SdpSocket::__send_cb1; ib_status = ib_create_cq( mh_Ca, @@ -628,7 +1023,7 @@ VOID SdpSocket::CreateCmRequest( else if( cm_req->local_resp_timeout < CM_MIN_LOCAL_TIMEOUT ) cm_req->local_resp_timeout = CM_MIN_LOCAL_TIMEOUT; - cm_req->rnr_nak_timeout = 6;//???QP_ATTRIB_RNR_NAK_TIMEOUT; + cm_req->rnr_nak_timeout = 6;//6;//???QP_ATTRIB_RNR_NAK_TIMEOUT; cm_req->rnr_retry_cnt = 6;//????QP_ATTRIB_RNR_RETRY; cm_req->retry_cnt = 6;//????QP_ATTRIB_RETRY_COUNT; @@ -641,8 +1036,7 @@ VOID SdpSocket::CreateCmRequest( VOID SdpSocket::Shutdown() { //???? locking - // if(m_shutdown) ??? - m_shutdown = true; + // if(m_shutdown - on the lock) ??? SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("SdpSocket::Shutdown called this = 0x%p\n", this)); @@ -665,6 +1059,9 @@ VOID SdpSocket::Shutdown() if (mh_Ca != NULL) { ib_close_ca(mh_Ca, NULL); //?????? CALL BACK ??? IMPLMENT } + + // Now that all ibal operations have finished we can free the memory + m_SendBufferPool.ShutDown(); } /* diff --git a/trunk/ulp/sdp/kernel/SdpSocket.h b/trunk/ulp/sdp/kernel/SdpSocket.h index 1850583e..adb4420c 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.h +++ b/trunk/ulp/sdp/kernel/SdpSocket.h @@ -11,6 +11,10 @@ It keeps a list of all the objects so we know when to remove them. #ifndef _SDP_SOCKET_H #define _SDP_SOCKET_H +const int MAX_SEND_BUFFER_SIZE = 32768; // This is the maximum send packet size +const int MAX_SEND_PACKETS = 40; // This is the maximum number of packets allocated per send + + #define QP_ATTRIB_SQ_DEPTH 16 #define QP_ATTRIB_SQ_SGE 1 /* Set based on inline data requirements */ @@ -35,6 +39,7 @@ enum SocketStates { }; + class SdpSocket : public RefCountImpl { private: @@ -47,22 +52,30 @@ private: USHORT m_SrcPort; ULONG m_SrcIp; - - KSPIN_LOCK m_Lock; - bool m_shutdown; + bool m_shutdown; // Make sure this is synced w + SdpLock m_Lock; // A handle to the ca that is being used (in connect) and its guid ib_ca_handle_t mh_Ca; net64_t m_CaGuid; - ib_pd_handle_t m_pd; ib_cq_handle_t m_rcq; + ib_cq_handle_t m_scq; + ib_qp_handle_t m_qp; + BufferPool m_SendBufferPool; + KEVENT m_ConnectCmCompleteEvent; + VOID SignalShutdown(); + + static VOID __send_cb1( + IN const ib_cq_handle_t h_cq, + IN void *cq_context ); + public: SdpSocket(); @@ -105,7 +118,7 @@ public: struct sdp_msg_hello_ack m_hello_ack; ib_cm_handle_t m_cm_handle_t; // BUGBUG: Check how this is used / locked - + NTSTATUS send_cb(); // Used to allow the user file to remember us LIST_ENTRY m_UserFileList; diff --git a/trunk/ulp/sdp/kernel/SdpTrace.cpp b/trunk/ulp/sdp/kernel/SdpTrace.cpp index ee935831..d69c2a2f 100644 --- a/trunk/ulp/sdp/kernel/SdpTrace.cpp +++ b/trunk/ulp/sdp/kernel/SdpTrace.cpp @@ -5,6 +5,7 @@ BOOLEAN CheckCondition(int sev, int top, char *file, int line, char * func) { - DbgPrint ("%s ", func); +// return FALSE; + DbgPrint ("%s: ", func); return TRUE; } \ No newline at end of file diff --git a/trunk/ulp/sdp/kernel/SdpTrace.h b/trunk/ulp/sdp/kernel/SdpTrace.h index 77b15547..27526bd7 100644 --- a/trunk/ulp/sdp/kernel/SdpTrace.h +++ b/trunk/ulp/sdp/kernel/SdpTrace.h @@ -15,6 +15,9 @@ #define SDP_DRIVER 0x000004 #define SDP_SOCKET 0x000008 #define SDP_ARP 0x000010 +#define SDP_BUFFER_POOL 0x000020 +#define SDP_LOCK 0x000040 + // BUGBUG: CONVERT TO A FUNCTION -- 2.41.0