From bcb5f02c1fb6f3995fdebd5716a3372aad40216a Mon Sep 17 00:00:00 2001 From: tzachid Date: Mon, 21 Nov 2005 12:15:25 +0000 Subject: [PATCH] Basic implementation of receive. (Rev 100) git-svn-id: svn://openib.tc.cornell.edu/gen1@168 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86 --- trunk/ulp/sdp/include/SdpShared.h | 15 + trunk/ulp/sdp/kernel/Precompile.h | 1 + trunk/ulp/sdp/kernel/SOURCES | 5 + trunk/ulp/sdp/kernel/SdpBufferPool.cpp | 180 ++++++---- trunk/ulp/sdp/kernel/SdpBufferPool.h | 69 +++- trunk/ulp/sdp/kernel/SdpDriver.cpp | 79 ++-- trunk/ulp/sdp/kernel/SdpGenUtils.cpp | 84 +++-- trunk/ulp/sdp/kernel/SdpGenUtils.h | 24 +- trunk/ulp/sdp/kernel/SdpLock.h | 92 +++-- trunk/ulp/sdp/kernel/SdpRecvPool.cpp | 310 ++++++++++++++++ trunk/ulp/sdp/kernel/SdpRecvPool.h | 84 +++++ trunk/ulp/sdp/kernel/SdpSocket.cpp | 477 +++++++++++-------------- trunk/ulp/sdp/kernel/SdpSocket.h | 32 +- trunk/ulp/sdp/kernel/SdpTrace.cpp | 2 + trunk/ulp/sdp/todo | 18 +- 15 files changed, 1016 insertions(+), 456 deletions(-) create mode 100644 trunk/ulp/sdp/kernel/SdpRecvPool.cpp create mode 100644 trunk/ulp/sdp/kernel/SdpRecvPool.h diff --git a/trunk/ulp/sdp/include/SdpShared.h b/trunk/ulp/sdp/include/SdpShared.h index a6e65320..2c4a8d5d 100644 --- a/trunk/ulp/sdp/include/SdpShared.h +++ b/trunk/ulp/sdp/include/SdpShared.h @@ -14,6 +14,8 @@ #define IOCTL_WSP_SOCKET CTL_CODE(FILE_DEVICE_UNKNOWN, 0x801, METHOD_BUFFERED ,FILE_ANY_ACCESS) #define IOCTL_WSP_CONNECT CTL_CODE(FILE_DEVICE_UNKNOWN, 0x802, METHOD_BUFFERED ,FILE_ANY_ACCESS) #define IOCTL_WSP_SEND CTL_CODE(FILE_DEVICE_UNKNOWN, 0x803, METHOD_BUFFERED ,FILE_ANY_ACCESS) +#define IOCTL_WSP_RECV CTL_CODE(FILE_DEVICE_UNKNOWN, 0x804, METHOD_BUFFERED ,FILE_ANY_ACCESS) + // Data structures that are used for connect @@ -48,5 +50,18 @@ struct WspSendOut { int Errno; }; +struct WspRecvIn { + VOID *pSocket; + CHAR *pData; + ULONG BufferSize; + ULONG dwFlags; +}; + +struct WspRecvOut { + ULONG NumberOfBytesRecieved; + int Errno; + ULONG dwFlags; +}; + #endif //_SDP_SHARED_H diff --git a/trunk/ulp/sdp/kernel/Precompile.h b/trunk/ulp/sdp/kernel/Precompile.h index d2079c20..3f31442a 100644 --- a/trunk/ulp/sdp/kernel/Precompile.h +++ b/trunk/ulp/sdp/kernel/Precompile.h @@ -24,6 +24,7 @@ class SdpArp; #include "SdpShared.h" #include "SdpUserFile.h" #include "SdpBufferPool.h" +#include "SdpRecvPool.h" #include "SdpSocket.h" #include "SdpArp.h" diff --git a/trunk/ulp/sdp/kernel/SOURCES b/trunk/ulp/sdp/kernel/SOURCES index 79448a6b..a675f0ad 100644 --- a/trunk/ulp/sdp/kernel/SOURCES +++ b/trunk/ulp/sdp/kernel/SOURCES @@ -8,6 +8,7 @@ SOURCES= SdpDriver.cpp \ SdpSocket.cpp \ SdpArp.cpp \ SdpBufferPool.cpp \ + SdpRecvPool.cpp \ SdpTrace.cpp INCLUDES=..\include;\ @@ -31,3 +32,7 @@ TARGETLIBS= $(TARGETLIBS) $(DDK_LIB_PATH)\ntstrsafe.lib !endif MSC_WARNING_LEVEL= /W3 + +PRECOMPILED_INCLUDE=Precompile.h +PRECOMPILED_PCH=Precompile.pch +PRECOMPILED_CXX=1 diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp index 80cda011..5524e4de 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp @@ -4,6 +4,82 @@ #include "preCompile.h" +//static +NTSTATUS +BufferDescriptor::AllocateBuffer(BufferDescriptor ** ppBufferDescriptor, int BufferSize, int Tag) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("\n")); + NTSTATUS rc = STATUS_SUCCESS; + BufferDescriptor *pBufferDescriptor = NULL; + + // Allocate the buffer descriptor + pBufferDescriptor = + (BufferDescriptor *) + ExAllocatePoolWithTag( + NonPagedPool , + sizeof BufferDescriptor, + Tag + ); + if (pBufferDescriptor == NULL) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); + rc = STATUS_NO_MEMORY; + goto Cleanup; + } + + // Allocate the buffer itself + pBufferDescriptor->pBuffer = + ExAllocatePoolWithTag( + NonPagedPool , + BufferSize, + Tag + ); + + if (pBufferDescriptor->pBuffer == NULL) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); + rc = STATUS_NO_MEMORY; + goto Cleanup; + } + + pBufferDescriptor->BufferSize = BufferSize; + pBufferDescriptor->Reset(); + +Cleanup: + if (!NT_SUCCESS(rc)) { + if (pBufferDescriptor != NULL) { + if (pBufferDescriptor->pBuffer != NULL) { + ExFreePoolWithTag(pBufferDescriptor->pBuffer, SEND_BUFFERS_ALLOCATION_TAG); + } + ExFreePoolWithTag(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); + pBufferDescriptor = NULL; + } + } + *ppBufferDescriptor = pBufferDescriptor; + return rc; + +} + +//static +VOID +BufferDescriptor::DeAllocateBuffer(BufferDescriptor *pBufferDescriptor, int Tag) +{ + ExFreePoolWithTag(pBufferDescriptor->pBuffer, Tag); + ExFreePoolWithTag(pBufferDescriptor, Tag); + +} +/* + Currently the implmentation of shutdown should allow it to work, even without + init being called +*/ +BufferPool::BufferPool() +{ + m_SendSeq = 0; + m_AdvtSeq = 0; + m_ClientBeingServed = false; + m_CurrentlySentBuffers = 0; + m_CurrentlyAllocated = 0; + m_ClientWaiting = false; +} + NTSTATUS BufferPool::Init( int MaxBuffers, @@ -20,10 +96,6 @@ BufferPool::Init( m_MaxBuffers = MaxBuffers; m_MaxConcurrentSends = MaxConcurrentSends; m_MaxMessageSize = MaxMessageSize; - m_ClientBeingServed = false; - m_CurrentlySentBuffers = 0; - m_CurrentlyAllocated = 0; - m_ClientWaiting = false; KeInitializeEvent(&m_WaitingClients, NotificationEvent, FALSE); ASSERT(pd != NULL); m_pd = pd; @@ -31,11 +103,7 @@ BufferPool::Init( m_qp = qp; ASSERT(lkey != NULL); m_lkey = lkey; -#if DBG m_pSdpSocket = pSdpSocket; -#endif - - return STATUS_SUCCESS; } @@ -69,7 +137,9 @@ BufferPool::GetBuffer( // The request can not be staisfied right now. We need to hold it // until our request is being freed // BUGBUG: iMPLMENT: create event and put it in the queue + // This might only happen when there are two threads calling us ASSERT(FALSE); + return STATUS_UNEXPECTED_IO_ERROR; } if (FirstBuffer == true) { @@ -85,12 +155,13 @@ BufferPool::GetBuffer( goto Cleanup; } else if (m_CurrentlyAllocated < m_MaxBuffers) { // we need to alocate a new buffer - rc = AllocateBuffer(ppBufferDescriptor); + rc = BufferDescriptor::AllocateBuffer(ppBufferDescriptor, m_MaxMessageSize, SEND_BUFFERS_ALLOCATION_TAG); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("AllocateBuffer failed rc = 0x%x\n", rc )); ASSERT(*ppBufferDescriptor == NULL); goto Cleanup; } + m_CurrentlyAllocated++; goto Cleanup; } } @@ -104,6 +175,11 @@ Cleanup: return rc; } +/* +Send the buffers if possibale, if not possibale ,adds them to the +queue +*/ + NTSTATUS BufferPool::AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor) { @@ -117,6 +193,12 @@ BufferPool::AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor) (m_QueuedPackets.Size() == 0 )){ // we can send right away (no need to wait for anything) rc = SendBuffer(pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("SendBuffer failed rc = 0x%x\n", rc )); + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); + goto Cleanup; + } + // We have finished our job goto Cleanup; } else { // we put the buffer in the queued list @@ -142,7 +224,9 @@ BufferPool::AllowOthersToGet() // than one thread). } - +/* + called when a send packet has finished. +*/ VOID BufferPool::ReturnBuffer(BufferDescriptor *pBufferDescriptor) { @@ -187,6 +271,7 @@ BufferPool::SendBuffersIfCan() rc = SendBuffer(pBufferDescriptor); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("SendBuffer failed rc = 0x%x\n", rc )); + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); goto Cleanup; } } @@ -212,79 +297,15 @@ BufferPool::ShutDown() while (m_FreePackets.Size() > 0 ) { item = m_FreePackets.RemoveHeadList(); pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); - DeAllocateBuffer(pBufferDescriptor); + BufferDescriptor::DeAllocateBuffer(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); } while (m_QueuedPackets.Size() > 0 ) { item = m_QueuedPackets.RemoveHeadList(); pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); - DeAllocateBuffer(pBufferDescriptor); - } - -} - -NTSTATUS -BufferPool::AllocateBuffer(BufferDescriptor ** ppBufferDescriptor) -{ - SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); - AssertLocked(); - NTSTATUS rc = STATUS_SUCCESS; - BufferDescriptor *pBufferDescriptor = NULL; - - // Allocate the buffer descriptor - pBufferDescriptor = - (BufferDescriptor *) - ExAllocatePoolWithTag( - NonPagedPool , - sizeof BufferDescriptor, - SEND_BUFFERS_ALLOCATION_TAG - ); - if (pBufferDescriptor == NULL) { - SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); - rc = STATUS_NO_MEMORY; - goto Cleanup; - } - - // Allocate the buffer itself - pBufferDescriptor->pBuffer = - ExAllocatePoolWithTag( - NonPagedPool , - m_MaxMessageSize, - SEND_BUFFERS_ALLOCATION_TAG - ); - - if (pBufferDescriptor->pBuffer == NULL) { - SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ExAllocatePoolWithTag failed \n")); - rc = STATUS_NO_MEMORY; - goto Cleanup; + BufferDescriptor::DeAllocateBuffer(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); } - pBufferDescriptor->BufferSize = m_MaxMessageSize; - pBufferDescriptor->DataSize = 0; - pBufferDescriptor->mr_handle = NULL; - - -Cleanup: - if (NT_SUCCESS(rc)) { - m_CurrentlyAllocated++; - } else { - if (pBufferDescriptor != NULL) { - if (pBufferDescriptor->pBuffer != NULL) { - ExFreePoolWithTag(pBufferDescriptor->pBuffer, SEND_BUFFERS_ALLOCATION_TAG); - } - ExFreePoolWithTag(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); - pBufferDescriptor = NULL; - } - } - *ppBufferDescriptor = pBufferDescriptor; - return rc; -} - -VOID -BufferPool::DeAllocateBuffer(BufferDescriptor *pBufferDescriptor) -{ - ExFreePoolWithTag(pBufferDescriptor->pBuffer, SEND_BUFFERS_ALLOCATION_TAG); - ExFreePoolWithTag(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); } NTSTATUS @@ -296,10 +317,11 @@ BufferPool::SendBuffer(BufferDescriptor *pBufferDescriptor) msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *) pBufferDescriptor->pBuffer; - pHeader->recv_bufs = QP_ATTRIB_RQ_DEPTH; //?????recv_bufs = conn->l_advt_bf; + pHeader->recv_bufs = m_pSdpSocket->m_RecvBufferPool.GetCurrentlyPostedRecievedBuffers(); //?????recv_bufs = conn->l_advt_bf; pHeader->size = pBufferDescriptor->DataSize + sizeof msg_hdr_bsdh; - pHeader->seq_num = 1;//?????++conn->send_seq; - pHeader->seq_ack = 0;//????conn->advt_seq; + pHeader->seq_num = GetSendSeq(); + pHeader->seq_ack = m_pSdpSocket->m_RecvBufferPool.GetRecvSeq();//????conn->advt_seq; + m_AdvtSeq = pHeader->seq_ack;// Currently only for debug pHeader->mid = SDP_MID_DATA; pHeader->flags = SDP_MSG_FLAG_NON_FLAG; /* diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.h b/trunk/ulp/sdp/kernel/SdpBufferPool.h index 8ad0da57..28ed8db3 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.h +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.h @@ -31,6 +31,14 @@ public: size++; } + LIST_ENTRY *Head() { + ASSERT(size > 0); + ASSERT(!IsListEmpty(&m_Data)); + return m_Data.Flink; + + } + + private: int size; LIST_ENTRY m_Data; @@ -44,35 +52,64 @@ typedef void (* SendErrorCB )(NTSTATUS Error, VOID *Context); // Each buffer starts with msg_hdr_bsdh and is followed by the actual data class BufferDescriptor { public: - NTSTATUS WriteData(char *pData, uint32_t size) { + // copies the data from the user to a buffer (to be used for send only) + NTSTATUS WriteData(char *pData, uint32_t Size) { NTSTATUS rc = STATUS_SUCCESS; - ASSERT(size <= BufferSize - sizeof msg_hdr_bsdh); + ASSERT(Size <= BufferSize - sizeof msg_hdr_bsdh); char *pStart = (char *) pBuffer + sizeof msg_hdr_bsdh; - rc = CopyFromUser(pStart, pData, size); + rc = CopyFromUser(pStart, pData, Size); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("CopyFromUser failed rc = 0x%x\n", rc )); goto Cleanup; } - DataSize = size; + DataSize = Size; + Cleanup: + return rc; + } + + // copies data from the buffer to a user suplied buffer + // to be used for recieve only + NTSTATUS CopyToUser(char *pData, uint32_t Size) { + NTSTATUS rc = STATUS_SUCCESS; + ASSERT(DataSize >= Size); + char *pStart = (char *) pBuffer + DataStart; + rc = ::CopyToUser(pData, pStart, Size); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("CopyToUser failed rc = 0x%x\n", rc )); + goto Cleanup; + } + DataStart += Size; + DataSize -= Size; Cleanup: return rc; } + VOID Reset() { + DataSize = 0; + DataStart = 0; + } // Each buffer starts with bsdh_hdr structure VOID *pBuffer; // A pointer to the actual place that we put the data - int BufferSize; // The total size of the buffer - int DataSize; // The size of the data that we have allocated + uint32_t BufferSize; // The total size of the buffer (size that we have allocated) + uint32_t DataSize; // The size of the data + uint32_t DataStart; // The place in which the data starts (used for recieve packets) LIST_ENTRY BuffersList; // The place to hold the list of the buffers - ib_mr_handle_t mr_handle; // A handle to the registared memory, ib_local_ds_t ds_array; // Used for sending the buffer + static NTSTATUS AllocateBuffer(BufferDescriptor ** ppBufferDescriptor, int BufferSize, int Tag); + + static VOID DeAllocateBuffer(BufferDescriptor *pBufferDescriptor, int Tag); + + }; class BufferPool { public: + BufferPool(); + NTSTATUS Init( int MaxBuffers, int MaxConcurrentSends, @@ -98,19 +135,20 @@ public: NTSTATUS SendBuffersIfCan(); VOID ShutDown(); - -private: - NTSTATUS AllocateBuffer(BufferDescriptor ** ppBufferDescriptor); + uint32_t GetSendSeq() {return m_SendSeq++;} + uint32_t GetAdvtSeq() {return m_AdvtSeq;} - VOID DeAllocateBuffer(BufferDescriptor *pBufferDescriptor); + + +private: NTSTATUS SendBuffer(BufferDescriptor *pBufferDescriptor); // Global data about this connection - int m_MaxBuffers; // The maximum number of buffers that we allow for this QP + int m_MaxBuffers; // The maximum number of buffers that we allow for this QP (to be allocated) int m_MaxConcurrentSends; // The total numbers of sends that are allowd for the QP - int m_MaxMessageSize; // The maximum buffer size that we allw + int m_MaxMessageSize; // The maximum buffer size that we allow int m_CurrentlySentBuffers; // Number of buffers that we have sent, and didn't get an ack yet int m_CurrentlyAllocated; // The number of buffers that we have allocated @@ -133,9 +171,10 @@ private: KEVENT m_WaitingClients; // switch to a linked list bool m_ClientWaiting; -#if DBG + uint32_t m_SendSeq; //sequence number of last message sent (send_seq in linux) + uint32_t m_AdvtSeq; // sequence number of last message acknowledged (advt_seq in linux) + SdpSocket *m_pSdpSocket; -#endif //DBG VOID AssertLocked(); diff --git a/trunk/ulp/sdp/kernel/SdpDriver.cpp b/trunk/ulp/sdp/kernel/SdpDriver.cpp index 556328ac..e262f0da 100644 --- a/trunk/ulp/sdp/kernel/SdpDriver.cpp +++ b/trunk/ulp/sdp/kernel/SdpDriver.cpp @@ -9,7 +9,7 @@ VOID DriverUnload ( IN PDRIVER_OBJECT pDriverObject ) { - SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("DriverUnload called pDriverObject = 0x%x\n", pDriverObject )); + SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("called pDriverObject = 0x%x\n", pDriverObject )); ib_api_status_t ib_status; @@ -41,7 +41,7 @@ extern "C" NTSTATUS DriverEntry ( UNICODE_STRING DevName, LinkName; int i; - SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("DriverEntry called\n" )); + SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("called\n" )); // fill the device functions pDriverObject->DriverUnload = DriverUnload; @@ -339,14 +339,17 @@ NTSTATUS SdpDriver::DispatchDeviceIoControl( NTSTATUS rc = STATUS_SUCCESS; SdpUserFile *pSdpUserFile = NULL; SdpSocket *pSdpSocket = NULL; - - WspConnectIn *pWspConnectIn = NULL; - WspConnectOut *pWspConnectOut = NULL; switch (IoControlCode) { case IOCTL_WSP_SOCKET : - SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("DispatchDeviceIoControl IOCTL_WSP_SOCKET recieved\n" )); + { + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_SOCKET recieved\n" )); VERIFY_BUFFERS(InputBufferLength, OutputBufferLength, WspSocketIn, WspSocketOut); + OutputDataSize = sizeof (WspSocketOut); + + WspSocketIn wspSocketIn = *(WspSocketIn *) pInputBuffer; + WspSocketOut *pWspSocketOut = (WspSocketOut *) pOutputBuffer; + pSdpUserFile = (SdpUserFile *)pIrpSp->FileObject->FsContext; pSdpSocket = new SdpSocket; if (pSdpSocket == NULL) { @@ -354,41 +357,43 @@ NTSTATUS SdpDriver::DispatchDeviceIoControl( SDP_PRINT(SDP_ERR, SDP_DRIVER, ("new SdpSocket failed rc = 0x%x\n", rc )); goto Cleanup; } - rc = pSdpSocket->Init((WspSocketIn *)pInputBuffer, (WspSocketOut *)pOutputBuffer); + rc = pSdpSocket->Init(&wspSocketIn, pWspSocketOut); if (!NT_SUCCESS(rc)) { - SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->Init failed rc = 0x%x\n", rc )); + SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->Init failed rc = 0x%x\n", rc )); goto Cleanup; } rc = pSdpUserFile->AddSocket(pSdpSocket); if (!NT_SUCCESS(rc)) { - SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpUserFile->AddSocket failed rc = 0x%x\n", rc )); + SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpUserFile->AddSocket failed rc = 0x%x\n", rc )); + pSdpSocket->Shutdown(); goto Cleanup; - } - OutputDataSize = sizeof (WspSocketOut); - + } + } break; case IOCTL_WSP_CONNECT : - SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("DispatchDeviceIoControl IOCTL_WSP_CONNECT recieved\n" )); + { + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_CONNECT recieved\n" )); VERIFY_BUFFERS(InputBufferLength, OutputBufferLength, WspConnectIn, WspConnectOut); OutputDataSize = sizeof (WspConnectOut); // get the socket based on the users pointer - pWspConnectIn = (WspConnectIn *) pInputBuffer; - pWspConnectOut = (WspConnectOut *) pOutputBuffer; + WspConnectIn wspConnectIn = *(WspConnectIn *) pInputBuffer; + WspConnectOut *pWspConnectOut = (WspConnectOut *) pOutputBuffer; pSdpUserFile = (SdpUserFile *)pIrpSp->FileObject->FsContext; - pSdpSocket = pSdpUserFile->SocketByPointer(pWspConnectIn->pSocket); + pSdpSocket = pSdpUserFile->SocketByPointer(wspConnectIn.pSocket); if (pSdpSocket == NULL) { - SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("DispatchDeviceIoControl IOCTL_WSP_CONNECT socket %x not found\n",pWspConnectIn->pSocket)); + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_CONNECT socket %x not found\n",wspConnectIn.pSocket)); // This is a well defined winsock error pWspConnectOut->Errno = WSAENOTSOCK; goto Cleanup; } - rc = pSdpSocket->WSPConnect(pWspConnectIn, pWspConnectOut); + rc = pSdpSocket->WSPConnect(&wspConnectIn, pWspConnectOut); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->WSPConnect failed rc = 0x%x\n", rc )); goto Cleanup; } + } break; case IOCTL_WSP_SEND : @@ -398,29 +403,55 @@ NTSTATUS SdpDriver::DispatchDeviceIoControl( OutputDataSize = sizeof (WspSendOut); // get the socket based on the users pointer - WspSendIn *pWspSendIn = (WspSendIn *) pInputBuffer; + WspSendIn wspSendIn = *(WspSendIn *) pInputBuffer; WspSendOut *pWspSendOut = (WspSendOut *) pOutputBuffer; pSdpUserFile = (SdpUserFile *)pIrpSp->FileObject->FsContext; - pSdpSocket = pSdpUserFile->SocketByPointer(pWspSendIn->pSocket); + pSdpSocket = pSdpUserFile->SocketByPointer(wspSendIn.pSocket); if (pSdpSocket == NULL) { - SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_SEND socket %x not found\n",pWspConnectIn->pSocket)); + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_SEND socket %x not found\n",wspSendIn.pSocket)); // This is a well defined winsock error - pWspConnectOut->Errno = WSAENOTSOCK; + pWspSendOut->Errno = WSAENOTSOCK; goto Cleanup; } - rc = pSdpSocket->WSPSend(pWspSendIn, pWspSendOut); + rc = pSdpSocket->WSPSend(&wspSendIn, pWspSendOut); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->WSPSend failed rc = 0x%x\n", rc )); goto Cleanup; } } break; + + case IOCTL_WSP_RECV : + { + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_RECV recieved\n" )); + VERIFY_BUFFERS(InputBufferLength, OutputBufferLength, WspRecvIn, WspRecvOut); + OutputDataSize = sizeof (WspRecvOut); + + // get the socket based on the users pointer + WspRecvIn wspRecvIn = *(WspRecvIn *) pInputBuffer; + WspRecvOut *pWspRecvOut = (WspRecvOut *) pOutputBuffer; + pSdpUserFile = (SdpUserFile *)pIrpSp->FileObject->FsContext; + pSdpSocket = pSdpUserFile->SocketByPointer(wspRecvIn.pSocket); + if (pSdpSocket == NULL) { + SDP_PRINT(SDP_DEBUG, SDP_DRIVER, ("IOCTL_WSP_RECV socket %x not found\n",wspRecvIn.pSocket)); + // This is a well defined winsock error + pWspRecvOut->Errno = WSAENOTSOCK; + goto Cleanup; + } + rc = pSdpSocket->WSPRecv(&wspRecvIn, pWspRecvOut); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->WSPRecv failed rc = 0x%x\n", rc )); + goto Cleanup; + } + } + break; + default: // This is an unrecgnized IOCTL ASSERT(FALSE); - SDP_PRINT(SDP_ERR, SDP_DRIVER, ("DispatchDeviceIoControl unknow IOCTL code = 0x%x\n", IoControlCode )); + SDP_PRINT(SDP_ERR, SDP_DRIVER, ("unknow IOCTL code = 0x%x\n", IoControlCode )); rc = STATUS_INVALID_PARAMETER; goto Cleanup; diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp index bfccea60..db4eb659 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp @@ -32,7 +32,7 @@ NTSTATUS IB2Status (ib_api_status_t ib_status) } - +// BUGBUG: Understand how to reomove the 20 from the code. // This function is a wrapper for the KeWaitForSingleObject that adds // assertsions to the valuas returned by it NTSTATUS @@ -44,26 +44,38 @@ NTSTATUS IN PLARGE_INTEGER Timeout OPTIONAL ) { - NTSTATUS rc = KeWaitForSingleObject( - Object, - WaitReason, - WaitMode, - Alertable, - Timeout - ); - if (!NT_SUCCESS(rc)) { - ASSERT(FALSE); - SDP_PRINT(SDP_ERR, SDP_SOCKET, ("KeWaitForSingleObject failed rc = 0x%x\n", rc )); - // No meter what we do the program can't continue, let's crush it - int *i = NULL; - *i = 5; + NTSTATUS rc; + for (int i=0; i < 20; i++) { + rc = KeWaitForSingleObject( + Object, + WaitReason, + WaitMode, + Alertable, + Timeout + ); + if (!NT_SUCCESS(rc)) { + ASSERT(FALSE); + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("KeWaitForSingleObject failed rc = 0x%x\n", rc )); + // No meter what we do the program can't continue, let's crush it + int *i = NULL; + *i = 5; + } + ASSERT((rc == STATUS_SUCCESS ) || + (rc == STATUS_ALERTED ) || + (rc == STATUS_USER_APC ) || + (rc == STATUS_TIMEOUT )); // This are simply all the return code from DDK + + ASSERT( (Timeout != NULL ) || rc != STATUS_TIMEOUT); + if (rc != STATUS_USER_APC) { + break; + } else { + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("KeWaitForSingleObject was stoped because of STATUS_USER_APC\n" )); + } + } + if (i == 20) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("!!!! KeWaitForSingleObject was Exhausted STATUS_USER_APC\n" )); + } - ASSERT((rc == STATUS_SUCCESS ) || - (rc == STATUS_ALERTED ) || - (rc == STATUS_USER_APC ) || - (rc == STATUS_TIMEOUT )); // This are simply all the return code from DDK - - ASSERT( (Timeout != NULL ) || rc != STATUS_TIMEOUT); return rc; } @@ -91,6 +103,38 @@ CopyFromUser( } } +NTSTATUS +CopyToUser( + IN void* const p_dest, + IN const void* const p_src, + IN const size_t count + ) +{ + /* + * The memory copy must be done within a try/except block as the + * memory could be changing while the buffer is copied. + */ + __try + { + ProbeForWrite( p_dest, count, 1 ); + RtlCopyMemory( p_dest, p_src, count ); + return CL_SUCCESS; + } + __except(EXCEPTION_EXECUTE_HANDLER) + { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("copying memory from user failed\n")); + ASSERT(FALSE); + return STATUS_ACCESS_DENIED; + } +} + +VOID UpdateRc(NTSTATUS *rc, NTSTATUS rc1) +{ + // We want to keep the first errro + if (NT_SUCCESS(*rc)) { + *rc = rc1; + } +} void* __cdecl operator new(size_t n ) throw() { return ExAllocatePoolWithTag(NonPagedPool , n, GLOBAL_ALLOCATION_TAG); diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.h b/trunk/ulp/sdp/kernel/SdpGenUtils.h index 622eae08..e810afdc 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.h +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.h @@ -44,12 +44,6 @@ NTSTATUS IB2Status (ib_api_status_t ib_status); USHORT nthos(USHORT in); -NTSTATUS -CopyFromUser( - IN void* const p_dest, - IN const void* const p_src, - IN const size_t count ); - NTSTATUS MyKeWaitForSingleObject( IN PVOID Object, @@ -59,6 +53,24 @@ NTSTATUS IN PLARGE_INTEGER Timeout OPTIONAL ); + +NTSTATUS +CopyFromUser( + IN void* const p_dest, + IN const void* const p_src, + IN const size_t count + ); + +NTSTATUS +CopyToUser( + IN void* const p_dest, + IN const void* const p_src, + IN const size_t count + ); + +VOID UpdateRc(NTSTATUS *rc, NTSTATUS rc1); + + // This error codes are taken from winsock2.h (the file can not) // be included from user mode diff --git a/trunk/ulp/sdp/kernel/SdpLock.h b/trunk/ulp/sdp/kernel/SdpLock.h index c920d691..56b1625a 100644 --- a/trunk/ulp/sdp/kernel/SdpLock.h +++ b/trunk/ulp/sdp/kernel/SdpLock.h @@ -29,6 +29,8 @@ There will therefore be a spinlock that will protect the event. // Still Need to make sure that all errors are handled when they should ?????? typedef NTSTATUS (* SendCBHandler )(SdpSocket *); +typedef NTSTATUS (* RecvCBHandler )(SdpSocket *); + const int SEND_CB_CALLED = 0x00000001; const int RECV_CB_CALLED = 0x00000002; @@ -36,7 +38,7 @@ const int SHUTDOWN_SIGNALLED = 0x00000004; const int SHUTDOWN_HANDELED = 0x00000008; const int ERROR_SIGNALLED = 0x00000010; -const int DPC_FLAGS = SEND_CB_CALLED | SEND_CB_CALLED; +const int DPC_FLAGS = SEND_CB_CALLED | RECV_CB_CALLED; inline void ResetFlags(int &Flags) { Flags &= (!(SEND_CB_CALLED | RECV_CB_CALLED)); @@ -66,13 +68,15 @@ public: KeInitializeEvent(&m_Event, NotificationEvent , TRUE); KeInitializeSpinLock(&m_SpinLock); m_SendCBHandler = NULL; - m_ClientWaiting = false; + m_RecvCBHandler = NULL; + m_NumberOfClientWaiting = 0; } - VOID Init(SendCBHandler SendCB, SdpSocket *pSdpSocket) + VOID Init(SendCBHandler SendCB, RecvCBHandler RecvCB, SdpSocket *pSdpSocket) { m_SendCBHandler = SendCB; m_pSdpSocket = pSdpSocket; + m_RecvCBHandler = RecvCB; } /* @@ -93,7 +97,7 @@ public: if (m_InUse) { // We have to release the spinlock and wait on the event - m_ClientWaiting = true; + m_NumberOfClientWaiting++; KeReleaseSpinLock(&m_SpinLock, OldIrql); rc = MyKeWaitForSingleObject(&m_Event, UserRequest, UserMode, false, NULL); if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { @@ -109,7 +113,7 @@ public: KeClearEvent(&m_Event); OldFlags = m_flags; ResetFlags(m_flags); - m_ClientWaiting = false; + m_NumberOfClientWaiting--; KeReleaseSpinLock(&m_SpinLock, OldIrql); rc = HandleFlags(OldFlags); if (!NT_SUCCESS(rc)) { @@ -154,25 +158,26 @@ Cleanup: if (!SomethingToHandle(OldFlags)) { // We can safely quit the lock m_InUse = false; + KeReleaseSpinLock(&m_SpinLock, OldIrql); + break; } KeReleaseSpinLock(&m_SpinLock, OldIrql); - if (SomethingToHandle(OldFlags)) { - ASSERT(m_InUse); - rc = HandleFlags(OldFlags); - if (!NT_SUCCESS(rc)) { - // We have to signal the error to the calling side - SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); - ASSERT(m_flags & ERROR_SIGNALLED); - } - // At the time that we were handeling the flags, someone might have - // signaled something, so we have to try again - continue; + ASSERT(SomethingToHandle(OldFlags)); + ASSERT(m_InUse); + rc = HandleFlags(OldFlags); + if (!NT_SUCCESS(rc)) { + // We have to signal the error to the calling side + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + ASSERT(m_flags & ERROR_SIGNALLED); } - break; + // At the time that we were handeling the flags, someone might have + // signaled something, so we have to try again + continue; } // Release whoever is waiting KeSetEvent(&m_Event, IO_NO_INCREMENT, FALSE); + return rc; } /* @@ -181,6 +186,7 @@ Cleanup: do the actual work, if not it will only signal. Once it returns the lock is freed again */ + bool SignalCB(int flags) { KIRQL OldIrql; @@ -188,28 +194,39 @@ Cleanup: NTSTATUS rc = STATUS_SUCCESS; ASSERT(KeGetCurrentIrql() == DISPATCH_LEVEL); KeAcquireSpinLock(&m_SpinLock, &OldIrql); - if (m_InUse || m_ClientWaiting ) { + if (m_InUse || (m_NumberOfClientWaiting > 0 ) ) { m_flags |= flags; KeReleaseSpinLock(&m_SpinLock, OldIrql); return false; } - m_InUse = true; - // In this lock, we only handle DPC events - OldFlags = (m_flags & DPC_FLAGS) | flags; - ResetDpcFlags(m_flags); - KeClearEvent(&m_Event); - KeReleaseSpinLock(&m_SpinLock, OldIrql); - rc = HandleFlags(OldFlags); - if (!NT_SUCCESS(rc)) { - // We have to signal the error to the calling side - SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); - ASSERT(m_flags & ERROR_SIGNALLED); + while (true) { + m_InUse = true; + // In this lock, we only handle DPC events + OldFlags = (m_flags & DPC_FLAGS) | flags; + flags = 0; // No need to handle the same event any more + ResetDpcFlags(m_flags); + KeClearEvent(&m_Event); + KeReleaseSpinLock(&m_SpinLock, OldIrql); + rc = HandleFlags(OldFlags); + if (!NT_SUCCESS(rc)) { + // We have to signal the error to the calling side + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + ASSERT(m_flags & ERROR_SIGNALLED); + } + KeAcquireSpinLock(&m_SpinLock, &OldIrql); + int xxx = m_flags; + if ((m_flags & DPC_FLAGS) == 0) { + // No flags to handle from the DPC layer + ASSERT(m_flags == 0); + break; + } } - KeAcquireSpinLock(&m_SpinLock, &OldIrql); + // Release whoever is waiting m_InUse = false; KeSetEvent(&m_Event, IO_NO_INCREMENT, FALSE); KeReleaseSpinLock(&m_SpinLock, OldIrql); + return true; } @@ -224,7 +241,17 @@ Cleanup: // We need to handle the send CB rc = m_SendCBHandler(m_pSdpSocket); if (!NT_SUCCESS(rc)) { - SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("SendBuffer failed rc = 0x%x\n", rc )); + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("m_SendCBHandler failed rc = 0x%x\n", rc )); + m_flags |= ERROR_SIGNALLED; + // We continue from here since, there might be other things to handle, + // and this might be in a DPC context + } + } + if (flags & RECV_CB_CALLED) { + // We need to handle the send CB + rc = m_RecvCBHandler(m_pSdpSocket); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("m_RecvCBHandler failed rc = 0x%x\n", rc )); m_flags |= ERROR_SIGNALLED; // We continue from here since, there might be other things to handle, // and this might be in a DPC context @@ -250,11 +277,12 @@ Cleanup: KEVENT m_Event; // the event for passive level threads KSPIN_LOCK m_SpinLock; // The real guard of the lock SendCBHandler m_SendCBHandler; + RecvCBHandler m_RecvCBHandler; bool m_InUse; // Tells if this lock has any user int m_flags; // call backs that were recieved - bool m_ClientWaiting; // True if there is a client waiting to be served + int m_NumberOfClientWaiting; // Number of clients that are waiting to be served SdpSocket *m_pSdpSocket;// The socket that this class depends on }; diff --git a/trunk/ulp/sdp/kernel/SdpRecvPool.cpp b/trunk/ulp/sdp/kernel/SdpRecvPool.cpp new file mode 100644 index 00000000..2cdf2e8f --- /dev/null +++ b/trunk/ulp/sdp/kernel/SdpRecvPool.cpp @@ -0,0 +1,310 @@ +/* Copyright mellanox */ +#pragma warning(disable: 4244 ) + +#include "preCompile.h" + +RecvPool::RecvPool() +{ + m_RecvSeq = 0; + m_ClientBeingServed = false; + m_CurrentlyPostedRecievedBuffers = 0; + m_CurrentlyAllocated = 0; + m_ClientWaiting = false; +} + +NTSTATUS +RecvPool::Init( + int MaxBuffers, + int MaxConcurrentRecieves, + int MaxMessageSize, + ib_pd_handle_t pd, + ib_qp_handle_t qp, + net32_t lkey, + SdpSocket *pSdpSocket + ) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + m_MaxBuffers = MaxBuffers; + m_MaxConcurrentRecieves = MaxConcurrentRecieves; + m_MaxMessageSize = MaxMessageSize; + KeInitializeEvent(&m_WaitingClients, NotificationEvent, FALSE); + ASSERT(pd != NULL); + m_pd = pd; + ASSERT(qp != NULL); + m_qp = qp; + ASSERT(lkey != NULL); + m_lkey = lkey; +#if DBG + m_pSdpSocket = pSdpSocket; +#endif + return STATUS_SUCCESS; +} + +/* + A buffer was compleated and is being added to the queued list +*/ + +NTSTATUS +RecvPool::RecievedBuffer(BufferDescriptor *pBufferDescriptor, bool error) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p pBufferDescriptor = 0x%x error = %s\n" + ,this, pBufferDescriptor, error ? "true" : "false")); + AssertLocked(); + NTSTATUS rc = STATUS_SUCCESS; + if (error) { + // Not much that we can do in this case (only return the packet) + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); + goto Cleanup; + } + + // We have recieved a "RAW" buffer, we have to make sure that the buffer + // descriptor is OK. + msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *)pBufferDescriptor->pBuffer; + + ASSERT(pBufferDescriptor->DataStart == 0); + pBufferDescriptor->DataStart = sizeof (msg_hdr_bsdh); + pBufferDescriptor->DataSize = pHeader->size - sizeof msg_hdr_bsdh; + + m_RecvSeq = pHeader->seq_num; + + m_FullPackets.InsertTailList(&pBufferDescriptor->BuffersList); + ASSERT(m_FullPackets.Size() <= m_MaxBuffers); + + // we need to notify the client that is waiting + if (m_ClientWaiting) { + KeSetEvent( &m_WaitingClients, IO_NO_INCREMENT, FALSE ); + m_ClientWaiting = false; + } + // ???? Handle state changes here ???? + + //???? we will also have to wake up the clients of the send ??????? + + + m_CurrentlyPostedRecievedBuffers--; + ASSERT(m_CurrentlyPostedRecievedBuffers >= 0); + // We might be able to post a new recieve buffer now + ASSERT(m_CurrentlyPostedRecievedBuffers < m_MaxConcurrentRecieves); + rc = ReceiveIfCan(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("RecieveIfCan failed rc = 0x%x\n", rc )); + goto Cleanup; + } + +Cleanup: + return rc; + +} + +/* + This function is being called by a thread that wants to do a recieve in order + to have a buffer with the data, that he can copy. + FirstBuffer tells if this is the first buffer that he wants. + If it is true, this means that no other request will be handled before + this client will indicate that he has finished recieving his data. + If an event is returned this means that the caller has to wait on the + event before the request will be staisfied. + + pData is the place that the data should be copied to. CopySize is the number of + requested bytes, while Copied is the number actually copied. + + This function is being called under the lock +*/ +NTSTATUS +RecvPool::GetData( + char *pData, + uint32_t CopySize, + uint32_t *Copied, + KEVENT **ppEvent, + bool FirstBuffer + ) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p FirstBuffer = %s\n",this, + FirstBuffer ? "TRUE" : "FALSE")); + AssertLocked(); + bool BufferFreed = false; + *Copied = 0; + BufferDescriptor *pBufferDescriptor = NULL; + uint32_t OldDataSize = 0; + + NTSTATUS rc = STATUS_SUCCESS; + ASSERT(*ppEvent == NULL); + + if (m_ClientBeingServed == true && (FirstBuffer != false)) { + // The request can not be staisfied right now. We need to hold it + // until our request is being freed + // BUGBUG: iMPLMENT: create event and put it in the queue. + // This might only happen when there are two threads calling us + ASSERT(FALSE); + return STATUS_UNEXPECTED_IO_ERROR; + } + + if (FirstBuffer == true) { + m_ClientBeingServed = true; + } + + // Can we supply data to the userd right now ? + while (*Copied < CopySize) { + if (m_FullPackets.Size()) { + // We have a buffer, we can use it to copy data to the user + LIST_ENTRY *item = m_FullPackets.Head(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + OldDataSize = pBufferDescriptor->DataSize; + if (OldDataSize > CopySize - *Copied) { + // we can only copy part of the buffer + ASSERT(CopySize > *Copied); + rc = pBufferDescriptor->CopyToUser(pData + *Copied, CopySize - *Copied); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("pBufferDescriptor->CopyToUser failed rc = 0x%x\n", rc )); + goto Cleanup; + } + *Copied += CopySize - *Copied; + + } else { + // We copy the entire buffer and remove it + rc = pBufferDescriptor->CopyToUser(pData + *Copied, OldDataSize); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("pBufferDescriptor->CopyToUser failed rc = 0x%x\n", rc )); + goto Cleanup; + } + *Copied += OldDataSize; + m_FullPackets.RemoveHeadList(); + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); + BufferFreed = true; + } + + } else { + // No buffers available, we have to wait + ASSERT(m_ClientWaiting == false); + KeClearEvent(&m_WaitingClients); + m_ClientWaiting = true; + *ppEvent = &m_WaitingClients; + break; + } + } + + // As data was copyed, it is possibale that we will be able to post more receives + if (BufferFreed) { + ReceiveIfCan(); + } + +Cleanup: + return rc; +} + +VOID +RecvPool::AllowOthersToGet() +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + ASSERT(m_ClientBeingServed == true); + m_ClientBeingServed = false; + + // BUGBUG: this means that we should free the next waiter (Once we support more + // than one thread). +} + +NTSTATUS +RecvPool::ReceiveIfCan() +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); + BufferDescriptor *pBufferDescriptor = NULL; + NTSTATUS rc = STATUS_SUCCESS; + + while (m_CurrentlyPostedRecievedBuffers < m_MaxConcurrentRecieves) { + // do we have a free packet ? + if (m_FreePackets.Size() > 0) { + // we can take a packet from the list + LIST_ENTRY *item = m_FreePackets.RemoveHeadList(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + } else if (m_CurrentlyAllocated < m_MaxBuffers) { + // We can allocate more buffers + rc = BufferDescriptor::AllocateBuffer(&pBufferDescriptor, m_MaxMessageSize, SEND_BUFFERS_ALLOCATION_TAG); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("AllocateBuffer failed rc = 0x%x\n", rc )); + goto Cleanup; + } + m_CurrentlyAllocated++; + } else { + // Couldn't get any more free packets. + break; + } + pBufferDescriptor->Reset(); + + // we can now post the buffer for recieve + rc = PostReceiveBuffer(pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("PostReceiveBuffer failed rc = 0x%x\n", rc )); + goto Cleanup; + } + } + +Cleanup: + return rc; +} + +/* + Currently the implmentation of shutdown should allow it to work, even without + init being called +*/ + +VOID +RecvPool::ShutDown() +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + //???? AssertLocked(); + BufferDescriptor *pBufferDescriptor = NULL; + LIST_ENTRY *item = NULL; + + while (m_FreePackets.Size() > 0 ) { + item = m_FreePackets.RemoveHeadList(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + BufferDescriptor::DeAllocateBuffer(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); + } + + while (m_FullPackets.Size() > 0 ) { + item = m_FullPackets.RemoveHeadList(); + pBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + BufferDescriptor::DeAllocateBuffer(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); + } +} + +NTSTATUS +RecvPool::PostReceiveBuffer(BufferDescriptor *pBufferDescriptor) +{ + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); + NTSTATUS rc = STATUS_SUCCESS; + + ib_recv_wr_t recv_wr; + recv_wr.p_next = NULL; + recv_wr.wr_id = (uintn_t)pBufferDescriptor; + recv_wr.num_ds = 1; + recv_wr.ds_array = &pBufferDescriptor->ds_array; + + pBufferDescriptor->ds_array.length = pBufferDescriptor->BufferSize; + pBufferDescriptor->ds_array.vaddr = MmGetPhysicalAddress( pBufferDescriptor->pBuffer ).QuadPart; + pBufferDescriptor->ds_array.lkey = m_lkey; + + ib_api_status_t ib_status = ib_post_recv(m_qp, &recv_wr, NULL); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("ib_post_recv failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + m_CurrentlyPostedRecievedBuffers++; + +Cleanup: + return rc; + +} + +VOID +RecvPool::AssertLocked() +{ +#if DBG + m_pSdpSocket->AssertLocked(); +#endif +} + + diff --git a/trunk/ulp/sdp/kernel/SdpRecvPool.h b/trunk/ulp/sdp/kernel/SdpRecvPool.h new file mode 100644 index 00000000..68ac1833 --- /dev/null +++ b/trunk/ulp/sdp/kernel/SdpRecvPool.h @@ -0,0 +1,84 @@ +/* Copyright mellanox */ + +#ifndef H_SDP_RECV_POOL_H +#define H_SDP_RECV_POOL_H + +class RecvPool { + +public: + + RecvPool(); + + NTSTATUS Init( + int MaxBuffers, + int MaxConcurrentRecives, + int MaxMessageSize, + ib_pd_handle_t pd, + ib_qp_handle_t qp, + net32_t lkey, + SdpSocket *pSdpSocket + ); + + NTSTATUS GetData( + char *pData, + uint32_t CopySize, + uint32_t *Copied, + KEVENT **ppEvent, + bool FirstBuffer + ); + + VOID AllowOthersToGet(); + + NTSTATUS RecievedBuffer(BufferDescriptor *pBufferDescriptor, bool error); + + NTSTATUS ReceiveIfCan(); + + uint32_t GetRecvSeq() { return m_RecvSeq;} + + VOID ShutDown(); + + uint16_t GetCurrentlyPostedRecievedBuffers(){return m_CurrentlyPostedRecievedBuffers;} + +private: + + NTSTATUS PostReceiveBuffer(BufferDescriptor *pBufferDescriptor); + + // Global data about this connection + int m_MaxBuffers; // The maximum number of buffers that we allow for this QP (to be allocated) + int m_MaxConcurrentRecieves; // The total numbers of sends that are allowd for the QP + int m_MaxMessageSize; // The maximum buffer size that we allow for recieving + + uint16_t m_CurrentlyPostedRecievedBuffers; // Number of buffers that we have posted for recieve and didn't get an answer yet + int m_CurrentlyAllocated; // The number of buffers that we have already allocated + + bool m_ClientBeingServed; // true if we have already started giving buffers to a client + + LinkedList m_FreePackets; // This packets are free and might be used for recieving + LinkedList m_FullPackets; // This packets were filled with data and can be used by the user + + + // TODO: A queue of events for threads that are waiting for buffers. + + // IBAL constants from the main socket structure + // TODO: Should they stay here and be used like this ? + ib_pd_handle_t m_pd; + ib_qp_handle_t m_qp; + net32_t m_lkey; + + // A list of events that the users has to wait on. ???? currently only one + KEVENT m_WaitingClients; // switch to a linked list + bool m_ClientWaiting; + + uint32_t m_RecvSeq; // sequence number of last message received (recv_seq) + + +#if DBG + SdpSocket *m_pSdpSocket; +#endif //DBG + +VOID AssertLocked(); + +}; + +#endif // H_SDP_RECV_POOL_H + diff --git a/trunk/ulp/sdp/kernel/SdpSocket.cpp b/trunk/ulp/sdp/kernel/SdpSocket.cpp index 614292b4..af8adbb1 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.cpp +++ b/trunk/ulp/sdp/kernel/SdpSocket.cpp @@ -6,6 +6,7 @@ NTSTATUS sdp_cm_hello_ack_check(struct sdp_msg_hello_ack *hello_ack); static NTSTATUS __send_cb2(SdpSocket * pSdpSocket); +static NTSTATUS __recv_cb2(SdpSocket * pSdpSocket); static void AL_API cm_rej_callback(IN ib_cm_rej_rec_t *p_cm_rej_rec ) @@ -87,11 +88,11 @@ NTSTATUS SdpSocket::Init( WspSocketOut *pSocketOutParam) { NTSTATUS rc = STATUS_SUCCESS; - SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("SdpSocket::Init this = 0x%p\n", this)); + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p\n", this)); m_CreationFlags = pSocketInParam->dwFlags; - m_Lock.Init(__send_cb2, this); + m_Lock.Init(__send_cb2, __recv_cb2, this); pSocketOutParam->Errno = 0;// No error pSocketOutParam->pSocket = this; // give the user a handle to the socket KeInitializeEvent(&m_ShutdownCompleteEvent, NotificationEvent , FALSE ); @@ -99,57 +100,12 @@ NTSTATUS SdpSocket::Init( return rc; } -# if 0 -struct sdpc_buff { -// struct sdpc_buff *next; -// struct sdpc_buff *prev; -// u32 type; /* element type. (for generic queue) */ -// struct sdpc_buff_q *pool; /* pool currently holding this buffer. */ -// int (*release)(struct sdpc_buff *buff); /* release the object */ - /* - * primary generic data pointers - */ - void *head; /* first byte of data buffer */ - void *data; /* first byte of valid data in buffer */ - void *tail; /* last byte of valid data in buffer */ - void *end; /* last byte of data buffer */ - /* - * Experimental - */ - uint32_t flags; /* Buffer flags */ - /* - * Protocol specific data - */ - struct msg_hdr_bsdh *bsdh_hdr; /* SDP header (BSDH) */ - uint32_t data_size; /* size of just data in the buffer */ - uint64_t wrid; /* IB work request ID */ - /* - * IB specific data (The main buffer pool sets the lkey when - * it is created) - */ - uint64_t real; /* component of scather/gather list (address) */ - uint32_t size; /* component of scather/gather list (lenght) */ - uint32_t lkey; /* component of scather/gather list (key) */ -}; - -const int BUFFER_SIZE = 4000 + 16;//65536; - -#define SDP_BUFF_F_UNSIG 0x0001 /* unsignalled buffer */ - -#define SDP_BUFF_F_GET_UNSIG(buff) ((buff)->flags & SDP_BUFF_F_UNSIG) -#define SDP_BUFF_F_SET_UNSIG(buff) ((buff)->flags |= SDP_BUFF_F_UNSIG) -#define SDP_BUFF_F_CLR_UNSIG(buff) ((buff)->flags &= (~SDP_BUFF_F_UNSIG)) - -#endif - NTSTATUS SdpSocket::WSPSend( WspSendIn *pWspSendIn, WspSendOut *pWspSendOut ) { SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n",this)); - char temp[4000]; - memcpy(temp,"abcd",5); NTSTATUS rc = STATUS_SUCCESS; BufferDescriptor * pBufferDescriptor = NULL; @@ -158,6 +114,13 @@ NTSTATUS SdpSocket::WSPSend( bool Locked = false; PRKEVENT pBuffersEvent = NULL; + // For zero bytes send we currently don't do anything and return with status + // success + if (pWspSendIn->BufferSize == 0) { + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("this = 0x%p - zero size send \n",this)); + goto Cleanup; + } + while (Coppied < pWspSendIn->BufferSize) { if ((Locked == false) && !m_Lock.Lock()) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); @@ -242,7 +205,8 @@ NTSTATUS SdpSocket::WSPSend( Cleanup: if (NT_SUCCESS(rc) ) { pWspSendOut->Errno = 0; - pWspSendOut->NumberOfBytesSent = pWspSendIn->BufferSize; + ASSERT(pWspSendIn->BufferSize == Coppied); + pWspSendOut->NumberOfBytesSent = Coppied; } else { // Make sure that we have the error setted ASSERT(pWspSendOut->Errno != 0); // BUGBUG: Need to make sure that this @@ -251,160 +215,106 @@ Cleanup: return rc; } - -#if 0 -//Naive send implmentation. -NTSTATUS SdpSocket::WSPSend( - WspSendIn *pWspSendIn, - WspSendOut *pWspSendOut +NTSTATUS +SdpSocket::WSPRecv( + WspRecvIn *pWspRecvIn, + WspRecvOut *pWspRecvOut ) { - NTSTATUS rc = STATUS_SUCCESS; SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n",this)); - ib_mr_create_t mr_create; - - /* Memory registration parameters, returned by ib_reg_mem. */ - char *BufferStart = NULL; - uint32_t lkey; - uint32_t rkey; - ib_mr_handle_t mr_handle = NULL; - - - - // First allocate a buffer and a buffer descriptor - sdpc_buff *buff = new sdpc_buff; - ASSERT(buff != NULL); - BufferStart = new CHAR [BUFFER_SIZE]; - ASSERT(BufferStart != NULL); - buff->head = BufferStart; - - // we leave enough space for holding the header of the request - buff->end = (CHAR *)(buff->head) + BUFFER_SIZE; - buff->head = (char *)(buff->head) + 0x10; - - buff->data = buff->head; - buff->tail = buff->head; - buff->lkey = 0; - buff->real = 0; - buff->size = 0; - - // Copy the data to the buffer - memcpy(buff->data, "5678",5); - buff->tail = (char *)(buff->tail) + 5; - - - // Register the buffer - mr_create.vaddr = BufferStart; - mr_create.length = BUFFER_SIZE; - mr_create.access_ctrl = IB_AC_LOCAL_WRITE; - - ib_api_status_t ib_status = ib_reg_mem( m_pd, &mr_create, &lkey, &rkey, &mr_handle ); - ASSERT(ib_status == IB_SUCCESS); - - - - - - // Send the buffer. - buff->data = (char *)(buff->head) - sizeof(struct msg_hdr_bsdh); - buff->bsdh_hdr = (struct msg_hdr_bsdh *) buff->data; - buff->bsdh_hdr->mid = SDP_MID_DATA; - buff->bsdh_hdr->flags = SDP_MSG_FLAG_NON_FLAG; - buff->bsdh_hdr->size = BUFFER_SIZE; - - - /* - * signalled? With no delay turned off, data transmission may be - * waiting for a send completion. - */ - SDP_BUFF_F_SET_UNSIG(buff); - - buff->wrid = 0;//conn->send_wrid++; - - buff->lkey = lkey; - buff->bsdh_hdr->recv_bufs = QP_ATTRIB_RQ_DEPTH; //?????recv_bufs = conn->l_advt_bf; -//?????? put this in buff->bsdh_hdr->size = (char *)buff->tail - (char *)buff->data; - buff->bsdh_hdr->seq_num = 1;//?????++conn->send_seq; - buff->bsdh_hdr->seq_ack = 0;//????conn->advt_seq; + NTSTATUS rc = STATUS_SUCCESS; + bool First = true; + uint32_t Coppied = 0, ThisCopy = 0; + bool Locked = false; + PRKEVENT pBuffersEvent = NULL; - /* - * endian swap - */ - sdp_msg_swap_bsdh(buff->bsdh_hdr); - buff->real = (uint64_t)(void* __ptr64)BufferStart; - buff->size = BUFFER_SIZE; - - /* - * save the buffer for the event handler. - */ -#if 0 - result = sdp_buff_q_put_tail(&conn->send_post, buff); - if (result < 0) { - sdp_dbg_warn(conn, "Error <%d> queueing send buffer", result); - goto done; + if (pWspRecvIn->BufferSize == 0) { + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("this = 0x%p - zero size recv \n",this)); + goto Cleanup; } -#endif - /* - * post send - */ -/* - buff->size = buff->tail - buff->data; - buff->real = dma_map_single(conn->ca->dma_device, - buff->data, - buff->size, - PCI_DMA_TODEVICE); - send_param.next = NULL; - send_param.wr_id = buff->wrid; - send_param.sg_list = (struct ib_sge *)&buff->real; - send_param.num_sge = 1; - send_param.opcode = IB_WR_SEND; -*/ - ib_send_wr_t send_wr; - send_wr.p_next = NULL; - send_wr.wr_id = buff->wrid;//?????(uint64_t) (uintptr_t) wr; - send_wr.wr_type = WR_SEND; - send_wr.send_opt = IB_SEND_OPT_SIGNALED;//IB_SEND_OPT_INLINE;//socket_info->send_opt; - - - ib_local_ds_t ds_array; - ds_array.length = buff->size; - ds_array.lkey = buff->lkey; - ds_array.vaddr = buff->real; - - send_wr.num_ds = 1; - send_wr.ds_array = &ds_array; - - ib_status = ib_post_send(m_qp, &send_wr, NULL); - ASSERT(ib_status == IB_SUCCESS); + while (Coppied < pWspRecvIn->BufferSize) { + if ((Locked == false) && !m_Lock.Lock()) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); + rc = STATUS_SHUTDOWN_IN_PROGRESS; + goto Cleanup; + } + Locked = true; + ASSERT(pBuffersEvent == NULL); - + rc = m_RecvBufferPool.GetData( + pWspRecvIn->pData + Coppied, + pWspRecvIn->BufferSize - Coppied, + &ThisCopy, + &pBuffersEvent, + First + ); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_RecvBufferPool.GetData failed rc = 0x%x\n", rc )); + m_Lock.Unlock(); // Error ignored as this is already an error pass + Locked = false; + goto Cleanup; + } + First = false; + Coppied += ThisCopy; + + if (pBuffersEvent != NULL) { + // We are told to wait on this event + ASSERT(Coppied < pWspRecvIn->BufferSize); + rc = m_Lock.Unlock(); + Locked = false; + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); + goto Cleanup; + } - // Wait for the notification of send compleated ????? - rc = MyKeWaitForSingleObject( - &m_SendCompleteEvent, + rc = MyKeWaitForSingleObject( + pBuffersEvent, UserRequest, UserMode, FALSE, - NULL); - KeResetEvent(&m_SendCompleteEvent); + NULL + ); + pBuffersEvent = NULL; + if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { + // BUGBUG: Think what to do here, we should be able to stop the + // connect, and quit (probably shutdown should be enough) + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("MyKeWaitForSingleObject was alerted = 0x%x\n", rc )); + rc = STATUS_UNEXPECTED_IO_ERROR; + //pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error + Shutdown(); + goto Cleanup; + } + // try getting the buffer again + continue; + } + } + ASSERT(Locked == true); + rc = m_Lock.Unlock(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); + goto Cleanup; + } -//Cleanup: - if (mr_handle != NULL) { - ib_dereg_mr(mr_handle); + // This should happen only for good path anyway + m_RecvBufferPool.AllowOthersToGet(); +Cleanup: + if (NT_SUCCESS(rc) ) { + pWspRecvOut->Errno = 0; + ASSERT(pWspRecvIn->BufferSize == Coppied); + pWspRecvOut->NumberOfBytesRecieved = Coppied; + } else { + // Make sure that we have the error setted + ASSERT(pWspRecvOut->Errno != 0); // BUGBUG: Need to make sure that this + // is indeed the case. } - delete [] BufferStart; - delete buff; - pWspSendOut->Errno = 0; - pWspSendOut->NumberOfBytesSent = pWspSendIn->BufferSize; + // Currently in any case, the flags are not being used: + pWspRecvOut->dwFlags = 0; + return rc; - return rc; } -#endif - - NTSTATUS SdpSocket::WSPConnect( WspConnectIn *pWspConnectIn, WspConnectOut *pWspConnectOut @@ -602,12 +512,12 @@ NTSTATUS SdpSocket::CmSendRTU() ib_api_status_t ib_status; NTSTATUS rc = STATUS_SUCCESS; - SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("SdpSocket::CmSendRTU this = 0x%p \n", this)); + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n", this)); if (m_state != SS_CONNECTING_REQ_SENT) { // There was some error, we can release the waiting thread. // The error will be handled there - SDP_PRINT(SDP_WARN, SDP_SOCKET, ("SdpSocket::CmSendRTU this = 0x%p invalid state %s\n", this, SS2String(m_state))); + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("this = 0x%p invalid state %s\n", this, SS2String(m_state))); rc = STATUS_UNEXPECTED_IO_ERROR; goto Cleanup; } @@ -631,6 +541,12 @@ NTSTATUS SdpSocket::CmSendRTU() goto Cleanup; } + rc = m_RecvBufferPool.Init(MAX_RECV_PACKETS, QP_ATTRIB_RQ_DEPTH, MaxMessageSize, m_pd, m_qp, m_lkey, this); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_RecvBufferPool.Init failed rc = 0x%x\n", rc )); + goto Cleanup; + } + #if 0 /* * read remote information @@ -696,7 +612,19 @@ NTSTATUS SdpSocket::CmSendRTU() } // We now start the recieve processing - RecieveOnce(); + + rc = m_Lock.Lock(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_RecvBufferPool.Init failed rc = 0x%x\n", rc )); + goto Cleanup; + } + + rc = m_RecvBufferPool.ReceiveIfCan(); //??? error + m_Lock.Unlock(); // error ???? + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_RecvBufferPool.Init failed rc = 0x%x\n", rc )); + goto Cleanup; + } Cleanup: return rc; @@ -722,33 +650,29 @@ VOID SdpSocket::CmRepCallback(IN ib_cm_rep_rec_t *p_cm_rep_rec) } -//????????? Let's implmeant a naive read -// BUGBUG: based on __recv_cb - need to implment -static void -__recv_cb1( +VOID +SdpSocket::__recv_cb1( IN const ib_cq_handle_t h_cq, IN void *cq_context ) { SdpSocket *pSocket = (SdpSocket *) cq_context; - //pSocket->m_Lock.SignalCB(RECV_CB_CALLED); - - pSocket->__recv_cb2(); - + pSocket->m_Lock.SignalCB(RECV_CB_CALLED); } -char g_Recieve[6000]; -VOID -SdpSocket::__recv_cb2() +NTSTATUS +SdpSocket::recv_cb() { + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p \n", this)); + NTSTATUS rc = STATUS_SUCCESS, rc1 = STATUS_SUCCESS; if (m_Lock.IsShutdownSignaled()) { - return; + return 0; //?????????????????? this will cause a leak ?????? } - ib_api_status_t status; + ib_api_status_t ib_status; ib_wc_t wc[QP_ATTRIB_RQ_DEPTH], *p_free, *p_wc1; uint32_t pkt_cnt, recv_cnt = 0; size_t i; - + BufferDescriptor *pBufferDescriptor = NULL; for( i = 0; i < QP_ATTRIB_RQ_DEPTH; i++ ) wc[i].p_next = &wc[i + 1]; @@ -759,81 +683,92 @@ SdpSocket::__recv_cb2() /* If we get here, then the list of WCs is intact. */ p_free = wc; - status = ib_poll_cq( m_rcq, &p_free, &p_wc1 ); - CL_ASSERT( status == IB_SUCCESS || status == IB_NOT_FOUND ); - - /* Look at the payload now and filter ARP and DHCP packets. */ - //recv_cnt += __recv_mgr_filter( p_port, p_wc, &done_list, &bad_list ); - if (status == IB_SUCCESS) - { - ib_wc_t *p_wc; - for( p_wc = p_wc1; p_wc; p_wc = p_wc->p_next ) { - ASSERT( p_wc->status == IB_WCS_SUCCESS ); + ib_status = ib_poll_cq( m_rcq, &p_free, &p_wc1 ); + if( (ib_status != IB_SUCCESS) && (ib_status != IB_NOT_FOUND) ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_poll_cq failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + goto Cleanup; + } + if (ib_status == IB_NOT_FOUND) { + // Nothing to do in this case + ASSERT(NT_SUCCESS(rc)); + goto Cleanup; + } + + ASSERT (ib_status == IB_SUCCESS); + ib_wc_t *p_wc; + for( p_wc = p_wc1; p_wc; p_wc = p_wc->p_next ) { + ASSERT( p_wc->status == IB_WCS_SUCCESS || p_wc->status == IB_WCS_WR_FLUSHED_ERR); + if (p_wc->status == IB_WCS_WR_FLUSHED_ERR) { + // We have an error, but we still need to return the packet to the caller + pBufferDescriptor = (BufferDescriptor *)p_wc->wr_id; + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("p_wc->status == IB_WCS_WR_FLUSHED_ERR \n" )); + // we can not handle it, but we can and should return it to the pool of recieved buffers + rc1 = m_RecvBufferPool.RecievedBuffer(pBufferDescriptor, true); + ASSERT(rc1 == STATUS_SUCCESS); // return with error can not fail + UpdateRc(&rc, rc1); + continue; + } + if (p_wc->status == IB_WCS_SUCCESS) { int len = p_wc->length; + pBufferDescriptor = (BufferDescriptor *)p_wc->wr_id; ASSERT(len >= sizeof msg_hdr_bsdh); - msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *)g_Recieve; + if (len < sizeof msg_hdr_bsdh) { + // This is a message that is not big enough + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Recieved a message with a buffer that is too short len = %d\n", len )); + // we can not handle it, but we can and should return it to the pool of recieved buffers + rc1 = m_RecvBufferPool.RecievedBuffer(pBufferDescriptor, true); + ASSERT(rc1 == STATUS_SUCCESS); // return with error can not fail + UpdateRc(&rc, rc1); + continue; + } + + msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *)pBufferDescriptor->pBuffer; sdp_msg_swap_bsdh(pHeader); - ASSERT(pHeader->mid == 0xff && pHeader->size == 0x10); + ASSERT(pHeader->size >= 0x10); + if (len != pHeader->size) { + // This is a message that is not formated well + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Recieved a message with a len != pHeader->size = %d : %d\n", len , pHeader->size )); + // we can not handle it, but we can and should return it to the pool of recieved buffers + rc1 = m_RecvBufferPool.RecievedBuffer(pBufferDescriptor, true); + ASSERT(rc1 == STATUS_SUCCESS); // return with error can not fail + UpdateRc(&rc, rc1); + continue; + } + // BUGBUG: currently we only handle this messages, we should handle + // them better ??????????? + ASSERT(pHeader->mid == 0xff || + pHeader->mid == 2 || + pHeader->mid == 7); + + rc1 = m_RecvBufferPool.RecievedBuffer(pBufferDescriptor, false); + if (!NT_SUCCESS(rc1)) { + // We have an error, but we should continue, or we will have a leak + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_RecvBufferPool.RecievedBuffer failed rc = 0x%x\n", rc1 )); + UpdateRc(&rc, rc1); + continue; + } } } } while( !p_free ); - - /* Update our posted depth. */ - - /* Notify NDIS of any and all possible receive buffers. */ - - /* Only indicate receives if we actually had any. */ -// if( pkt_cnt ) - { -// NdisMIndicateReceivePacket( p_port->p_adapter->h_adapter, -// p_port->recv_mgr.recv_pkt_array, pkt_cnt ); + // Rearm after filtering +Cleanup: + if (NT_SUCCESS(rc)) { + ib_status = ib_rearm_cq(m_rcq, FALSE ); + if( ib_status != IB_SUCCESS ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_rearm_cq failed ib_status = 0x%d\n", ib_status )); + rc = IB2Status(ib_status); + // get out of this function + } } - /* Return any discarded receives to the pool */ - - /* Repost receives. */ - - RecieveOnce(); - - - /* - * Rearm after filtering to prevent contention on the enpoint maps - * and eliminate the possibility of having a call to - * __endpt_mgr_insert find a duplicate. - */ - status = ib_rearm_cq(m_rcq, FALSE ); - CL_ASSERT( status == IB_SUCCESS ); - - - -} - - - -VOID -SdpSocket::RecieveOnce() -{ - ib_local_ds_t l_ds; - - ib_recv_wr_t recv_wr; - recv_wr.p_next = NULL; - recv_wr.num_ds = 1; - recv_wr.wr_id = 123; - recv_wr.ds_array = & l_ds; + return rc; - l_ds.length = sizeof(g_Recieve); - l_ds.lkey = m_lkey; - l_ds.vaddr = MmGetPhysicalAddress( g_Recieve ).QuadPart; - - ib_api_status_t ib_status = ib_post_recv(m_qp, &recv_wr, NULL); - ASSERT(ib_status == IB_SUCCESS); - } - // TODO: Clear the callback functions mess void SdpSocket::__send_cb1( @@ -844,12 +779,20 @@ SdpSocket::__send_cb1( pSocket->m_Lock.SignalCB(SEND_CB_CALLED); } + // This function is here so it's addresses can be taken static NTSTATUS __send_cb2(SdpSocket * pSdpSocket) { return pSdpSocket->send_cb(); } +// This function is here so it's addresses can be taken +static NTSTATUS __recv_cb2(SdpSocket * pSdpSocket) +{ + return pSdpSocket->recv_cb(); +} + + NTSTATUS SdpSocket::send_cb() { SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("called this =0x%x\n", this)); @@ -1122,8 +1065,8 @@ VOID SdpSocket::CreateHelloHeader( hello_msg->bsdh.flags = SDP_MSG_FLAG_NON_FLAG; hello_msg->bsdh.mid = SDP_MID_HELLO; hello_msg->bsdh.size = sizeof(struct sdp_msg_hello); - hello_msg->bsdh.seq_num = 0;//conn->send_seq; ??? - hello_msg->bsdh.seq_ack = 0;//conn->advt_seq; ??? + hello_msg->bsdh.seq_num = m_SendBufferPool.GetSendSeq();//conn->send_seq; ??? + hello_msg->bsdh.seq_ack = m_RecvBufferPool.GetRecvSeq();//conn->advt_seq; ??? hello_msg->hh.max_adv = QP_ATTRIB_RQ_DEPTH;// ??? conn->l_max_adv; hello_msg->hh.ip_ver = SDP_MSG_IPVER; @@ -1213,7 +1156,8 @@ VOID WaitForShutdownEvent(KEVENT *ShutdownCompleteEvent) ); ASSERT(NT_SUCCESS(rc)); - ASSERT(rc == STATUS_SUCCESS); + ASSERT(rc == STATUS_SUCCESS || rc == STATUS_USER_APC); //???????? what to do + //???????? the wait fails KeClearEvent(ShutdownCompleteEvent); } @@ -1262,6 +1206,7 @@ VOID SdpSocket::Shutdown() // Now that all ibal operations have finished we can free the memory m_SendBufferPool.ShutDown(); + m_RecvBufferPool.ShutDown(); /* diff --git a/trunk/ulp/sdp/kernel/SdpSocket.h b/trunk/ulp/sdp/kernel/SdpSocket.h index 466c65f9..b31ce87d 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.h +++ b/trunk/ulp/sdp/kernel/SdpSocket.h @@ -12,9 +12,9 @@ It keeps a list of all the objects so we know when to remove them. #define _SDP_SOCKET_H const int MAX_SEND_BUFFER_SIZE = 32768; // This is the maximum send packet size -const int MAX_SEND_PACKETS = 800; // This is the maximum number of packets allocated per send - - +const int MAX_SEND_PACKETS = 200; // This is the maximum number of packets allocated per send +const int MAX_RECV_PACKETS = 200; // This is the maximum number of packets allocated per send + #define QP_ATTRIB_SQ_DEPTH 32 #define QP_ATTRIB_SQ_SGE 1 /* Set based on inline data requirements */ @@ -70,7 +70,6 @@ private: net32_t m_lkey; - BufferPool m_SendBufferPool; KEVENT m_ConnectCmCompleteEvent; KEVENT m_ShutdownCompleteEvent; @@ -78,10 +77,19 @@ private: VOID SignalShutdown(); static VOID __send_cb1( - IN const ib_cq_handle_t h_cq, - IN void *cq_context ); + IN const ib_cq_handle_t h_cq, + IN void *cq_context ); + + static VOID __recv_cb1( + IN const ib_cq_handle_t h_cq, + IN void *cq_context ); public: + + BufferPool m_SendBufferPool; + RecvPool m_RecvBufferPool; + + SdpSocket(); NTSTATUS Init( @@ -99,6 +107,11 @@ public: WspSendOut *pWspSendOut ); + NTSTATUS WSPRecv( + WspRecvIn *pWspRecvIn, + WspRecvOut *pWspRecvOut + ); + VOID Shutdown(); static VOID ShutdownCB(VOID* pContext); @@ -126,6 +139,7 @@ public: ib_cm_handle_t m_cm_handle_t; // BUGBUG: Check how this is used / locked NTSTATUS send_cb(); + NTSTATUS recv_cb(); // Used to allow the user file to remember us LIST_ENTRY m_UserFileList; @@ -147,11 +161,7 @@ public: #endif - VOID AssertLocked(); - - VOID RecieveOnce(); //???????? remove me - VOID __recv_cb2(); - + VOID AssertLocked(); }; #endif // _SDP_SOCKET_H diff --git a/trunk/ulp/sdp/kernel/SdpTrace.cpp b/trunk/ulp/sdp/kernel/SdpTrace.cpp index 564a6ed2..cba394c2 100644 --- a/trunk/ulp/sdp/kernel/SdpTrace.cpp +++ b/trunk/ulp/sdp/kernel/SdpTrace.cpp @@ -6,6 +6,8 @@ BOOLEAN CheckCondition(int sev, int top, char *file, int line, char * func) { if (sev < SDP_WARN) return FALSE; + if (top == SDP_PERFORMANCE) return FALSE; + DbgPrint ("%s: ", func); if (sev == SDP_ERR) DbgPrint ("ERROR - "); return TRUE; diff --git a/trunk/ulp/sdp/todo b/trunk/ulp/sdp/todo index 42b8391d..37a4ff56 100644 --- a/trunk/ulp/sdp/todo +++ b/trunk/ulp/sdp/todo @@ -2,22 +2,34 @@ This file includes things that should be impreoved in the sdp implmentation ============================================================================= KERNEL MODE: + 1) What to do with zero size send (currently I send nothing) and recieve. + 2) How to handle shutdown? the waits fail with user Apc. + + connect: + 1) Clean error path. + send: 1) On send: implmeant some kind of a negal algorithm. 2) On send: Create some kind of mechanism that will allow to recieve complitions on more than one send. 3) If possibale, post more than one send. - 4) + 4) Consider copying big packets from the DPC handler, instead of using the users thread + for the copy + recv: + 1) What to do when I don’t have all the buffer to return? +general: + USER MODE: * Check the lifetime of the SdpSocket (when is it deleted and so)?? -* make sure that the asserts are implmented in debug * check with intel that we can remove their lisence from the files. +* check the way that errors are reported to the user mode. It seems that returning an error +in rc means that the output buffer won't pass out. + -* make sure that the SDP socket is deleted at least in the main path \ No newline at end of file -- 2.41.0