From 03412488fd467525f1c0ccd5880c471f0f2f21d1 Mon Sep 17 00:00:00 2001 From: tzachid Date: Mon, 19 Dec 2005 08:49:41 +0000 Subject: [PATCH] [SDP] Implementation of overlapped send (Rev 817) git-svn-id: svn://openib.tc.cornell.edu/gen1@205 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86 --- trunk/ulp/sdp/include/SdpShared.h | 1 + trunk/ulp/sdp/kernel/Precompile.h | 3 +- trunk/ulp/sdp/kernel/SdpArp.cpp | 59 ++--- trunk/ulp/sdp/kernel/SdpArp.h | 5 +- trunk/ulp/sdp/kernel/SdpBufferPool.cpp | 288 ++++++++++++++++++------- trunk/ulp/sdp/kernel/SdpBufferPool.h | 89 ++++++-- trunk/ulp/sdp/kernel/SdpDriver.cpp | 23 +- trunk/ulp/sdp/kernel/SdpGenUtils.cpp | 14 ++ trunk/ulp/sdp/kernel/SdpGenUtils.h | 6 +- trunk/ulp/sdp/kernel/SdpLock.h | 11 +- trunk/ulp/sdp/kernel/SdpRecvPool.cpp | 6 +- trunk/ulp/sdp/kernel/SdpSocket.cpp | 228 +++++++++++++------- trunk/ulp/sdp/kernel/SdpSocket.h | 25 ++- trunk/ulp/sdp/kernel/SdpUserFile.cpp | 120 ++++++++++- trunk/ulp/sdp/kernel/SdpUserFile.h | 15 +- trunk/ulp/sdp/todo | 17 +- 16 files changed, 679 insertions(+), 231 deletions(-) diff --git a/trunk/ulp/sdp/include/SdpShared.h b/trunk/ulp/sdp/include/SdpShared.h index 6f7374e2..317ddef5 100644 --- a/trunk/ulp/sdp/include/SdpShared.h +++ b/trunk/ulp/sdp/include/SdpShared.h @@ -50,6 +50,7 @@ #define IOCTL_WSP_ACCEPT CTL_CODE(FILE_DEVICE_UNKNOWN, 0x807, METHOD_BUFFERED ,FILE_ANY_ACCESS) #define IOCTL_WSP_GET_XXX_NAME CTL_CODE(FILE_DEVICE_UNKNOWN, 0x808, METHOD_BUFFERED ,FILE_ANY_ACCESS) #define IOCTL_WSP_CLOSE_SOCKET CTL_CODE(FILE_DEVICE_UNKNOWN, 0x809, METHOD_BUFFERED ,FILE_ANY_ACCESS) +#define IOCTL_WSP_USER_THREAD CTL_CODE(FILE_DEVICE_UNKNOWN, 0x80A, METHOD_BUFFERED ,FILE_ANY_ACCESS) // Data structures that are used for connect diff --git a/trunk/ulp/sdp/kernel/Precompile.h b/trunk/ulp/sdp/kernel/Precompile.h index 6b54a421..f4d07077 100644 --- a/trunk/ulp/sdp/kernel/Precompile.h +++ b/trunk/ulp/sdp/kernel/Precompile.h @@ -41,8 +41,7 @@ class SdpSocket; class SdpArp; #include "ib_al.h" -#include "..\..\ipoib\ip_addresses_shared.h" - +#include "..\..\..\inc\iba\ib_at_ioctl.h" #include "sdpMsgs.h" #include "SdpGenUtils.h" #include "SdpTrace.h" diff --git a/trunk/ulp/sdp/kernel/SdpArp.cpp b/trunk/ulp/sdp/kernel/SdpArp.cpp index eedf8cf5..75556008 100644 --- a/trunk/ulp/sdp/kernel/SdpArp.cpp +++ b/trunk/ulp/sdp/kernel/SdpArp.cpp @@ -46,7 +46,7 @@ SdpArp::Init(PDRIVER_OBJECT DriverObject) OBJECT_ATTRIBUTES objectAttributes; - RtlInitUnicodeString( &DevName1, IPOIB_DEV_NAME ); + RtlInitUnicodeString( &DevName1, IBAT_DEV_NAME ); InitializeObjectAttributes( &objectAttributes, &DevName1, @@ -138,20 +138,19 @@ SdpArp::SourcePortGidFromIP( IO_STATUS_BLOCK ioStatus; char temp [1000]; // BUGBUG: Handle the case of more IPs - IOCTL_IPOIB_PORTS_IN ipoib_ports_in; - IOCTL_IPOIB_PORTS_OUT *pipoib_ports_out; - IPOIB_AT_PORT_RECORD *ports_records; + IOCTL_IBAT_PORTS_IN ipoib_ports_in; + IOCTL_IBAT_PORTS_OUT *pipoib_ports_out; + IBAT_PORT_RECORD *ports_records; - ipoib_ports_in.Version = IPOIB_IOCTL_VERSION; - ipoib_ports_in.Size = sizeof temp; + ipoib_ports_in.Version = IBAT_IOCTL_VERSION; - pipoib_ports_out = (IOCTL_IPOIB_PORTS_OUT *)temp; + pipoib_ports_out = (IOCTL_IBAT_PORTS_OUT *)temp; ASSERT(m_DeviceObject != NULL); KeInitializeEvent(&event, NotificationEvent, FALSE); irp = IoBuildDeviceIoControlRequest( - IOCTL_IPOIB_PORTS , + IOCTL_IBAT_PORTS , m_DeviceObject, &ipoib_ports_in, sizeof ipoib_ports_in, @@ -180,7 +179,7 @@ SdpArp::SourcePortGidFromIP( SDP_PRINT(SDP_ERR, SDP_ARP, ("IoCallDriver failed rc = 0x%x\n", rc )); goto Cleanup; } - if (pipoib_ports_out->Size != 0) { + if (pipoib_ports_out->Size > sizeof temp) { // The number of bytes that we have allocated wasn't enough SDP_PRINT(SDP_ERR, SDP_ARP, ("pipoib_ports_out.Size = %d\n", pipoib_ports_out->Size )); rc = STATUS_INSUFFICIENT_RESOURCES; @@ -201,7 +200,7 @@ Cleanup: NTSTATUS SdpArp::SourcePortGidFromPorts( IN ULONG SourceAddr, - IN IOCTL_IPOIB_PORTS_OUT *pPorts, + IN IOCTL_IBAT_PORTS_OUT *pPorts, OUT ib_net64_t *SrcPortGuid, OUT ib_net64_t *SrcCaGuid ) @@ -212,14 +211,14 @@ SdpArp::SourcePortGidFromPorts( PIRP irp; IO_STATUS_BLOCK ioStatus; - unsigned int i = 0, j = 0; + int i = 0, j = 0; - struct IOCTL_IPOIB_IP_ADDRESSES_IN addresses_in; - struct IOCTL_IPOIB_IP_ADDRESSES_OUT *addresses_out; + IOCTL_IBAT_IP_ADDRESSES_IN addresses_in; + IOCTL_IBAT_IP_ADDRESSES_OUT *addresses_out; char temp[1000]; - addresses_out = (struct IOCTL_IPOIB_IP_ADDRESSES_OUT *)temp; + addresses_out = (IOCTL_IBAT_IP_ADDRESSES_OUT *)temp; - addresses_in.Version = IPOIB_IOCTL_VERSION; + addresses_in.Version = IBAT_IOCTL_VERSION; for (i = 0 ; i < pPorts->NumPorts; i++) { SDP_PRINT(SDP_TRACE, SDP_SOCKET, ( @@ -233,7 +232,7 @@ SdpArp::SourcePortGidFromPorts( KeInitializeEvent(&event, NotificationEvent, FALSE); irp = IoBuildDeviceIoControlRequest( - IOCTL_IPOIB_IP_ADDRESSES , + IOCTL_IBAT_IP_ADDRESSES , m_DeviceObject, &addresses_in, sizeof addresses_in, @@ -262,7 +261,7 @@ SdpArp::SourcePortGidFromPorts( SDP_PRINT(SDP_ERR, SDP_ARP, ("IoCallDriver failed rc = 0x%x\n", rc )); goto Cleanup; } - if (addresses_out->Size != 0) { + if (addresses_out->Size > sizeof temp) { // The number of bytes that we have allocated wasn't enough SDP_PRINT(SDP_ERR, SDP_ARP, ("addresses_out.Size = %d\n", addresses_out->Size )); rc = STATUS_INSUFFICIENT_RESOURCES; @@ -272,10 +271,10 @@ SdpArp::SourcePortGidFromPorts( // We now have the addreses, we can check if this is what we need - for (j = 0 ; j < addresses_out->NumIps; j++) { + for (j = 0 ; j < addresses_out->AddressCount; j++) { ULONG *pIp; - ASSERT(addresses_out->Addreses[j].IpVersion == 4); - pIp = (ULONG *) (&addresses_out->Addreses[j].Data[12]); + ASSERT(addresses_out->Address[j].IpVersion == 4); + pIp = (ULONG *) (&addresses_out->Address[j].Address[12]); if (*pIp == CL_NTOH32(SourceAddr)) { SDP_PRINT(SDP_TRACE, SDP_ARP, ("Found the IP: ca guid = 0x%I64x port guid=0x%I64x\n", @@ -290,6 +289,13 @@ SdpArp::SourcePortGidFromPorts( } // If we have reached here the data was not found + SDP_PRINT(SDP_WARN, SDP_ARP, + ("HCA not found for ip=%d.%d.%d.%d\n", + (SourceAddr & 0xff000000) >> 24, + (SourceAddr & 0xff0000) >> 16, + (SourceAddr & 0xff00) >> 8 , + SourceAddr & 0xff + )); rc = STATUS_NOT_FOUND; Cleanup: @@ -300,27 +306,32 @@ Cleanup: NTSTATUS SdpArp::DestPortGidFromMac( + IN ib_net64_t SrcPortGuid, IN MAC_ADDR DestMac, OUT ib_gid_t *pDestPortGid) { - SDP_PRINT(SDP_TRACE, SDP_SOCKET,("MAC = ????")); + SDP_PRINT(SDP_TRACE, SDP_SOCKET,("MAC = ????\n")); NTSTATUS rc = STATUS_SUCCESS; KEVENT event; PIRP irp; IO_STATUS_BLOCK ioStatus; - IOCTL_IPOIB_MAC_2_GID_IN ipoib_mac2gid_in; - IOCTL_IPOIB_MAC_2_GID_OUT ipoib_mac2gid_out; + IOCTL_IBAT_MAC_TO_GID_IN ipoib_mac2gid_in; + IOCTL_IBAT_MAC_TO_GID_OUT ipoib_mac2gid_out; C_ASSERT(MAC_ADDR_SIZE == sizeof (ipoib_mac2gid_in.DestMac)); + ipoib_mac2gid_in.Version = IBAT_IOCTL_VERSION; + ipoib_mac2gid_in.PortGuid = SrcPortGuid; memcpy(ipoib_mac2gid_in.DestMac, DestMac, MAC_ADDR_SIZE); + + ASSERT(m_DeviceObject != NULL); KeInitializeEvent(&event, NotificationEvent, FALSE); irp = IoBuildDeviceIoControlRequest( - IOCTL_IPOIB_MAC_2_GID , + IOCTL_IBAT_MAC_TO_GID , m_DeviceObject, &ipoib_mac2gid_in, sizeof ipoib_mac2gid_in, diff --git a/trunk/ulp/sdp/kernel/SdpArp.h b/trunk/ulp/sdp/kernel/SdpArp.h index 2ce70641..ec7e0b2f 100644 --- a/trunk/ulp/sdp/kernel/SdpArp.h +++ b/trunk/ulp/sdp/kernel/SdpArp.h @@ -73,7 +73,8 @@ public: OUT ib_net64_t *SrcCaGuid ); NTSTATUS DestPortGidFromMac( - IN MAC_ADDR DestMac, + IN ib_net64_t SrcPortGuid, + IN MAC_ADDR DestMac, OUT ib_gid_t *pDestPortGid); /* @@ -104,7 +105,7 @@ Synchronously query the SA for a GUID. (started from wsd - query_pr) NTSTATUS SourcePortGidFromPorts( IN ULONG SourceAddr, - IN IOCTL_IPOIB_PORTS_OUT *pPorts, + IN IOCTL_IBAT_PORTS_OUT *pPorts, OUT ib_net64_t *SrcPortGuid, OUT ib_net64_t *SrcCaGuid ); diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp index 3b40eef3..f8ccc62b 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp @@ -96,6 +96,7 @@ BufferDescriptor::DeAllocateBuffer(BufferDescriptor *pBufferDescriptor, int Tag) ExFreePoolWithTag(pBufferDescriptor, Tag); } + /* Currently the implmentation of shutdown should allow it to work, even without init being called @@ -104,10 +105,8 @@ BufferPool::BufferPool() { m_SendSeq = 0; m_AdvtSeq = 0; - m_ClientBeingServed = false; m_CurrentlySentBuffers = 0; m_CurrentlyAllocated = 0; - m_ClientWaiting = false; m_PostCreditsWhenCan = false; m_CreditsCurrentlyPosted = false; m_CreditdBufferDescriptor = NULL; @@ -131,7 +130,6 @@ BufferPool::Init( m_MaxBuffers = MaxBuffers; m_MaxConcurrentSends = MaxConcurrentSends; m_MaxMessageSize = MaxMessageSize; - KeInitializeEvent(&m_WaitingClients, NotificationEvent, FALSE); ASSERT(pd != NULL); m_pd = pd; ASSERT(qp != NULL); @@ -139,6 +137,10 @@ BufferPool::Init( ASSERT(lkey != NULL); m_lkey = lkey; m_pSdpSocket = pSdpSocket; + m_CallBackPending = false; + + + return STATUS_SUCCESS; } @@ -156,55 +158,32 @@ BufferPool::Init( */ NTSTATUS BufferPool::GetBuffer( - BufferDescriptor **ppBufferDescriptor, - KEVENT **ppEvent, - bool FirstBuffer + BufferDescriptor **ppBufferDescriptor ) { - SDP_PRINT(SDP_DEBUG, SDP_BUFFER_POOL, ("this = 0x%p FirstBuffer = %s\n",this, - FirstBuffer ? "TRUE" : "FALSE")); + SDP_PRINT(SDP_DEBUG, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); AssertLocked(); NTSTATUS rc = STATUS_SUCCESS; *ppBufferDescriptor = NULL; - if (m_ClientBeingServed == true && (FirstBuffer != false)) { - // The request can not be staisfied right now. We need to hold it - // until our request is being freed - // BUGBUG: iMPLMENT: create event and put it in the queue - // This might only happen when there are two threads calling us - ASSERT(FALSE); - return STATUS_UNEXPECTED_IO_ERROR; - } - - if (FirstBuffer == true) { - m_ClientBeingServed = true; - } - - // Can we supply a buffer right now ? - if (m_CurrentlySentBuffers < m_MaxConcurrentSends) { - // yes, supply a buffer - if (m_FreePackets.Size() > 0) { - LIST_ENTRY *item = m_FreePackets.RemoveHeadList(); - *ppBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); - goto Cleanup; - } else if (m_CurrentlyAllocated < m_MaxBuffers) { - // we need to alocate a new buffer - rc = BufferDescriptor::AllocateBuffer(ppBufferDescriptor, m_MaxMessageSize, SEND_BUFFERS_ALLOCATION_TAG); - if (!NT_SUCCESS(rc)) { - SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("AllocateBuffer failed rc = 0x%x\n", rc )); - ASSERT(*ppBufferDescriptor == NULL); - goto Cleanup; - } - m_CurrentlyAllocated++; + if (m_FreePackets.Size() > 0) { + LIST_ENTRY *item = m_FreePackets.RemoveHeadList(); + *ppBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); + goto Cleanup; + } else if (m_CurrentlyAllocated < m_MaxBuffers) { + // we need to alocate a new buffer + rc = BufferDescriptor::AllocateBuffer(ppBufferDescriptor, m_MaxMessageSize, SEND_BUFFERS_ALLOCATION_TAG); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("AllocateBuffer failed rc = 0x%x\n", rc )); + ASSERT(*ppBufferDescriptor == NULL); goto Cleanup; - } - } - // No buffers available, we have to wait - ASSERT(m_ClientWaiting == false); - KeClearEvent(&m_WaitingClients); - m_ClientWaiting = true; - *ppEvent = &m_WaitingClients; + } + m_CurrentlyAllocated++; + goto Cleanup; + } + // No buffers available, we return NULL + ASSERT(*ppBufferDescriptor == NULL); Cleanup: return rc; @@ -226,6 +205,12 @@ BufferPool::AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor) ASSERT(pBufferDescriptor->GetFlags() == 0 || pBufferDescriptor->GetFlags() == DISCONNECT_MESSAGE); + // Assert that we are not sending an empty buffer + if (pBufferDescriptor->DataSize == 0) { + msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *) pBufferDescriptor->pBuffer; + ASSERT(pHeader->mid == SDP_MID_DISCONNECT ); + } + m_QueuedPackets.InsertTailList(&pBufferDescriptor->BuffersList); rc = SendBuffersIfCan(); if (!NT_SUCCESS(rc)) { @@ -238,20 +223,6 @@ Cleanup: } -/* - This function is being called by a client that has asked for some buffers - when he has recieved all it's data -*/ -VOID -BufferPool::AllowOthersToGet() -{ - ASSERT(m_ClientBeingServed == true); - m_ClientBeingServed = false; - - // BUGBUG: this means that we should free the next waiter (Once we support more - // than one thread). -} - /* called when a send packet has finished. */ @@ -267,8 +238,7 @@ BufferPool::ReturnBuffer(BufferDescriptor *pBufferDescriptor) #if DBG if (m_CurrentlySentBuffers == 1) { - SDP_PRINT(SDP_WARN, SDP_PERFORMANCE, ("Currently no packets are bing sent m_ClientWaiting = %s\n", - m_ClientWaiting ? "true" : "false")); + SDP_PRINT(SDP_WARN, SDP_PERFORMANCE, ("Currently no packets are bing sent\n")); } #endif ASSERT( pBufferDescriptor->GetFlags() == CREDIT_UPDATE || @@ -294,23 +264,183 @@ BufferPool::ReturnBuffer(BufferDescriptor *pBufferDescriptor) } } else { m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); - // Is there a client waiting ? - if ( m_ClientWaiting) { - KeSetEvent( &m_WaitingClients, IO_NO_INCREMENT, FALSE ); - m_ClientWaiting = false; + // We have to ask for another thread to do the job, as + // we might be in a DPC context + + // Ask for a new callback only in the following conditions: + // 1) There is no request on the way AND + // 2) There is a buffer that can be used AND + // 3) We have enough free space to complete a packet + // 4) We have gone under some threshold +// TODO: 4 above, didn't seem to have a real influance, so it is not in +// the code now. It should be testsed in the future. + if (!m_CallBackPending && (m_UserPackets.Size() > 0)) { + // Now testing 3,4 + + LIST_ENTRY *item = m_UserPackets.Head(); + IRP * pIrp = CONTAINING_RECORD(item, IRP ,Tail.Overlay.ListEntry); + + if ((RemainingToCopy(pIrp) < m_MaxMessageSize * m_FreePackets.Size())) { + rc = m_pSdpSocket->RequestCallBack(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("PostCredits failed rc = 0x%x\n", rc )); + goto Cleanup; + } + m_CallBackPending = true; + } } + if (DissconnectMessage) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("We have recieved a DissconnectMessage complition\n" )); m_pSdpSocket->DisconectSentEvent(); } } -Cleanup: +Cleanup: + ASSERT(m_CurrentlySentBuffers != 0); m_CurrentlySentBuffers--; - ASSERT(m_CurrentlySentBuffers >= 0); return rc; } +NTSTATUS +BufferPool::AddToUserBuffers( + bool *pCopied, + bool ForceCopy, + char *pData, + uint32_t BufferSize, + uint32_t Coppied, + IRP* pIrp + ) +{ + SDP_PRINT(SDP_DEBUG, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); + NTSTATUS rc = STATUS_SUCCESS; + + if ((m_UserPackets.Size() == 0) && (ForceCopy == false) ){ + *pCopied = false; + goto Cleanup; + } + + // We have to queue this IRP (following logic from sample) + ASSERT(pData != NULL); + SetBufferSize(pIrp, BufferSize); + SetUserBuffer(pIrp, pData); + SetCoppied(pIrp,Coppied); + SetSocket(pIrp,m_pSdpSocket); + + + ASSERT(m_UserPackets.Size()==0); + IoMarkIrpPending(pIrp); + m_UserPackets.InsertTailList(&pIrp->Tail.Overlay.ListEntry); + *pCopied = true; + // We mark the IRP as pending + pIrp->IoStatus.Status = STATUS_PENDING; + rc = STATUS_PENDING; + +Cleanup: + + +#if 0 +The above code should be activated if we want to allow returning offsetof +the user mode thread as fast as possible. + + if (m_CallBackPending == false) { + NTSTATUS rc1 = m_pSdpSocket->RequestCallBack(); + if (!NT_SUCCESS(rc1)) { + SDP_PRINT(SDP_ERR, SDP_BUFFER_POOL, ("PostCredits failed rc = 0x%x\n", rc )); + ASSERT(FALSE); + } + m_CallBackPending = true; + } +#endif + + ASSERT(rc == STATUS_PENDING || rc == STATUS_SUCCESS); + return rc; +} + + +NTSTATUS +BufferPool::UsersThreadCallBack() +{ + SDP_PRINT(SDP_DEBUG, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); + ASSERT(KeGetCurrentIrql() == PASSIVE_LEVEL); + IRP *pIrp = NULL; + LIST_ENTRY *item; + + NTSTATUS rc = STATUS_SUCCESS, rc1; + BufferDescriptor *pBufferDescriptor = NULL; + ASSERT(m_CallBackPending == true); + ASSERT(m_UserPackets.Size() > 0 ); + + m_CallBackPending = false; + + while (m_UserPackets.Size() > 0) { + rc = GetBuffer(&pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("GetBuffer failed rc = 0x%x\n", rc )); + goto Cleanup; + } + + if (pBufferDescriptor == NULL) { + // We don't have a new buffer any more, we just + // wait for a new packet to be freed + ASSERT(rc == STATUS_SUCCESS); + goto Cleanup; + } + + item = m_UserPackets.Head(); + + pIrp = CONTAINING_RECORD(item, IRP ,Tail.Overlay.ListEntry); + + // copy the data from the user mode to the buffers + ULONG CopySize = pBufferDescriptor->BufferSize - sizeof msg_hdr_bsdh; + CopySize = min(CopySize, RemainingToCopy(pIrp)); + + rc = pBufferDescriptor->WriteData((CHAR *)GetUserBuffer(pIrp) + GetCoppied(pIrp), CopySize); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("pBufferDescriptor->WriteData failed rc = 0x%x\n", rc )); + // free the buffer that you have + rc1 = ReturnBuffer(pBufferDescriptor); + ASSERT(NT_SUCCESS(rc1)); + goto Cleanup; + } + // Update the user buffer + SetCoppied(pIrp, GetCoppied(pIrp) + CopySize); + + // send the data to the buffer + pBufferDescriptor->SetMid(SDP_MID_DATA); + rc = AddBufferToQueuedList(pBufferDescriptor); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("AddBufferToQueuedList failed rc = 0x%x\n", rc )); + // free the buffer that you have + rc1 = ReturnBuffer(pBufferDescriptor); + ASSERT(NT_SUCCESS(rc1)); + goto Cleanup; + } + + if (RemainingToCopy(pIrp) == 0) { + // We have finished with this users packet, we should + // compleate the IRP + + WspSendOut *pWspSendOut = (WspSendOut *) pIrp->AssociatedIrp.SystemBuffer; + m_UserPackets.RemoveHeadList(); + + pIrp->IoStatus.Status = STATUS_SUCCESS; + pIrp->IoStatus.Information = sizeof (WspSendOut); + pWspSendOut->Errno = 0; + pWspSendOut->NumberOfBytesSent = GetBufferSize(pIrp); + IoCompleteRequest(pIrp, IO_NETWORK_INCREMENT); + + } + } + +Cleanup: + + return rc; +} + + /* This function goes over the list of packets that we can send, and sends them. It is called under the lock, and might be called also from a DPC @@ -334,8 +464,6 @@ BufferPool::SendBuffersIfCan() } } - - while ((m_QueuedPackets.Size() > 0) && (m_CurrentlySentBuffers < m_MaxConcurrentSends) && (m_rRecvBuf > 2)) { @@ -365,16 +493,11 @@ Cleanup: VOID BufferPool::CloseSocket() { - SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p m_WaitingClients = %s\n", - m_ClientWaiting ? "true" : "false")); + SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n")); AssertLocked(); - if (m_ClientWaiting) { - KeSetEvent( &m_WaitingClients, IO_NO_INCREMENT, FALSE ); - m_ClientWaiting = false; - } - // The next time our client will try to get data, he will get - // the error + //??? Should we do something here + } @@ -388,9 +511,11 @@ VOID BufferPool::ShutDown() { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); - //???? AssertLocked(); + + //AssertLocked(); BufferDescriptor *pBufferDescriptor = NULL; LIST_ENTRY *item = NULL; + IRP *pIrp = NULL; while (m_FreePackets.Size() > 0 ) { item = m_FreePackets.RemoveHeadList(); @@ -404,6 +529,14 @@ BufferPool::ShutDown() BufferDescriptor::DeAllocateBuffer(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); } + while (m_UserPackets.Size() > 0 ) { + item = m_UserPackets.RemoveHeadList(); + pIrp = CONTAINING_RECORD(item, IRP ,Tail.Overlay.ListEntry); + pIrp->IoStatus.Status = STATUS_CANCELLED; + pIrp->IoStatus.Information = 0; + IoCompleteRequest (pIrp, IO_NO_INCREMENT); + } + if(m_CreditdBufferDescriptor != NULL) { BufferDescriptor::DeAllocateBuffer(m_CreditdBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); m_CreditdBufferDescriptor = NULL; @@ -426,6 +559,7 @@ BufferPool::SendBuffer(BufferDescriptor *pBufferDescriptor) pHeader->seq_ack = m_pSdpSocket->m_RecvBufferPool.GetRecvSeq(); m_AdvtSeq = pHeader->seq_ack;// Currently only for debug pHeader->flags = SDP_MSG_FLAG_NON_FLAG; + /* * endian swap */ diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.h b/trunk/ulp/sdp/kernel/SdpBufferPool.h index fb546ddd..f7f79898 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.h +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.h @@ -33,8 +33,6 @@ #ifndef H_SDP_BUFFER_POOL_H #define H_SDP_BUFFER_POOL_H - - // This is simply a wrapper to the LIST_ENTRY class that allows // easier work with this list class LinkedList { @@ -87,7 +85,6 @@ public: size--; } - private: int size; LIST_ENTRY m_Data; @@ -167,6 +164,54 @@ public: }; + +// We will define 4 pointers to store send data for the IRP: +// This will have to change one day() +// The users data + +inline VOID SetUserBuffer(IRP *pIrp, VOID * p) { + pIrp->Tail.Overlay.DriverContext[0] = (VOID *)p; +} +inline VOID* GetUserBuffer(IRP *pIrp) { + return (pIrp->Tail.Overlay.DriverContext[0]); +} + +inline VOID SetBufferSize(IRP *pIrp, uint32_t i) { + pIrp->Tail.Overlay.DriverContext[1] = (VOID *)(UINT_PTR)i; +} +inline uint32_t GetBufferSize(IRP *pIrp) { + return (uint32_t)(UINT_PTR) (pIrp->Tail.Overlay.DriverContext[1]); +} + +inline VOID SetCoppied(IRP *pIrp, uint32_t i) { + pIrp->Tail.Overlay.DriverContext[2] = (VOID *)(UINT_PTR)i; + ASSERT(i <= GetBufferSize(pIrp)); +} +inline uint32_t GetCoppied(IRP *pIrp) { + return (uint32_t)(UINT_PTR) (pIrp->Tail.Overlay.DriverContext[2]); +} + +/* + BUGBUG: + As I intend to change this in any case, + I will not use Referance count on the socket + here. +*/ // ??????????????????? +inline void SetSocket(IRP *pIrp, SdpSocket *pSdpSocket) { + pIrp->Tail.Overlay.DriverContext[3] = pSdpSocket; +} + +inline SdpSocket *GetSocket(IRP *pIrp) { + return (SdpSocket *)pIrp->Tail.Overlay.DriverContext[3]; +} + +inline uint32_t RemainingToCopy(IRP *pIrp) { + uint32_t Coppied = GetCoppied(pIrp); + uint32_t BufferSize = GetBufferSize(pIrp); + ASSERT(BufferSize >= Coppied); + return BufferSize - Coppied; +} + class BufferPool { public: @@ -184,17 +229,21 @@ public: ); NTSTATUS GetBuffer( - BufferDescriptor ** ppBufferDescriptor, - KEVENT **ppEvent, - bool FirstBuffer + BufferDescriptor ** ppBufferDescriptor ); NTSTATUS AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor); - VOID AllowOthersToGet(); - NTSTATUS ReturnBuffer(BufferDescriptor *pBufferDescriptor); + NTSTATUS AddToUserBuffers(bool *pCopied, bool ForceCopy,char *pData, uint32_t BufferSize, uint32_t Coppied, IRP* pIrp); + + VOID RemoveFromUserBuffers(PIRP pIrp) { + m_UserPackets.RemoveEntryList(&pIrp->Tail.Overlay.ListEntry); + } + + NTSTATUS UsersThreadCallBack(); + NTSTATUS SendBuffersIfCan(); VOID CloseSocket(); @@ -222,20 +271,24 @@ private: NTSTATUS SendBuffer(BufferDescriptor *pBufferDescriptor); // Global data about this connection - int m_MaxBuffers; // The maximum number of buffers that we allow for this QP (to be allocated) - int m_MaxConcurrentSends; // The total numbers of sends that are allowd for the QP - int m_MaxMessageSize; // The maximum buffer size that we allow - - int m_CurrentlySentBuffers; // Number of buffers that we have sent, and didn't get an ack yet - int m_CurrentlyAllocated; // The number of buffers that we have allocated + uint32_t m_MaxBuffers; // The maximum number of buffers that we allow for this QP (to be allocated) + uint32_t m_MaxConcurrentSends; // The total numbers of sends that are allowd for the QP + uint32_t m_MaxMessageSize; // The maximum buffer size that we allow - bool m_ClientBeingServed; // true if we have already started giving buffers to a client + uint32_t m_CurrentlySentBuffers; // Number of buffers that we have sent, and didn't get an ack yet + uint32_t m_CurrentlyAllocated; // The number of buffers that we have allocated LinkedList m_FreePackets; // This packets are free and might be used LinkedList m_QueuedPackets; // This packets were filled with data and should be sent + + +//????? + public: + LinkedList m_UserPackets; // This is a list of user packets that we should send + - // TODO: A queue of events for threads that are waiting for buffers. +private: //????????? // IBAL constants from the main socket structure // TODO: Should they stay here and be used like this ? @@ -244,8 +297,6 @@ private: net32_t m_lkey; // A list of events that the users has to wait on. ???? currently only one - KEVENT m_WaitingClients; // switch to a linked list - bool m_ClientWaiting; uint32_t m_SendSeq; //sequence number of last message sent (send_seq in linux) uint32_t m_AdvtSeq; // sequence number of last message acknowledged (advt_seq in linux) @@ -264,6 +315,8 @@ private: bool m_PostCreditsWhenCan; bool m_CreditsCurrentlyPosted; BufferDescriptor *m_CreditdBufferDescriptor; + bool m_CallBackPending; // Set to true if we have requesetd a callback from + // the users thread VOID AssertLocked(); diff --git a/trunk/ulp/sdp/kernel/SdpDriver.cpp b/trunk/ulp/sdp/kernel/SdpDriver.cpp index 07a3db9d..11585270 100644 --- a/trunk/ulp/sdp/kernel/SdpDriver.cpp +++ b/trunk/ulp/sdp/kernel/SdpDriver.cpp @@ -92,7 +92,7 @@ extern "C" NTSTATUS DriverEntry ( // fill the device functions pDriverObject->DriverUnload = DriverUnload; - pDriverObject->FastIoDispatch = NULL; + pDriverObject->FastIoDispatch = NULL; pDriverObject->DriverStartIo = NULL; for (i = 0; i < IRP_MJ_MAXIMUM_FUNCTION; i++) { pDriverObject->MajorFunction[i] = SdpDriver::Dispatch; @@ -492,7 +492,7 @@ SdpDriver::DispatchDeviceIoControl( SDP_PRINT(SDP_ERR, SDP_DRIVER, ("new SdpSocket failed rc = 0x%x\n", rc )); goto Cleanup; } - rc = pSdpSocket->Init(&wspSocketIn, pWspSocketOut); + rc = pSdpSocket->Init(&wspSocketIn, pWspSocketOut, pSdpUserFile); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->Init failed rc = 0x%x\n", rc )); goto Cleanup; @@ -548,7 +548,7 @@ SdpDriver::DispatchDeviceIoControl( pWspSendOut->Errno = WSAENOTSOCK; goto Cleanup; } - rc = pSdpSocket->WSPSend(&wspSendIn, pWspSendOut); + rc = pSdpSocket->WSPSend(&wspSendIn, pWspSendOut, pIrp); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->WSPSend failed rc = 0x%x\n", rc )); goto Cleanup; @@ -689,8 +689,6 @@ SdpDriver::DispatchDeviceIoControl( } } break; - - case IOCTL_WSP_CLOSE_SOCKET : { SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("IOCTL_WSP_CLOSE_SOCKET recieved\n" )); @@ -709,18 +707,21 @@ SdpDriver::DispatchDeviceIoControl( goto Cleanup; } rc = pSdpSocket->WSPCloseSocket(&wspSocketCloseIn, pWspSocketCloseOut); - // After closing a socket we "unlink" the kernel object, and it won't - // be accessable for the user. (currently succesfull or not) - // BUGBUG: Change this behavior while the linger don't linger staff is fixed - pSdpUserFile->RemoveSocket(pSdpSocket); // Must succed - // BUGBUG: are we taking the socket from the correct place - // It is possible that not, but the chanses of an error seems small if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("pSdpSocket->WSPCloseSocket failed rc = 0x%x\n", rc )); goto Cleanup; } } break; + + case IOCTL_WSP_USER_THREAD : + { + SDP_PRINT(SDP_TRACE, SDP_DRIVER, ("IOCTL_WSP_USER_THREAD recieved\n" )); + pSdpUserFile = (SdpUserFile *)pIrpSp->FileObject->FsContext; + + /* Ignore Error = */ pSdpUserFile->UsersThread(); + } + break; default: // This is an unrecgnized IOCTL diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp index 8a2ddcf0..651fb541 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.cpp +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.cpp @@ -32,6 +32,8 @@ #include "Precompile.h" +//#define DONT_COPY_DATA + USHORT ntohs(USHORT in) { return ((in & 0xff) << 8) | ((in & 0xff00) >> 8); @@ -122,7 +124,13 @@ CopyFromUser( __try { ProbeForRead( (void*)p_src, count, 1 ); +#ifdef DONT_COPY_DATA + if (count < 1000){ + RtlCopyMemory( p_dest, p_src, count ); + } +#else RtlCopyMemory( p_dest, p_src, count ); +#endif return STATUS_SUCCESS; } __except(EXCEPTION_EXECUTE_HANDLER) @@ -147,7 +155,13 @@ CopyToUser( __try { ProbeForWrite( p_dest, count, 1 ); +#ifdef DONT_COPY_DATA + if (count < 1000){ + RtlCopyMemory( p_dest, p_src, count ); + } +#else RtlCopyMemory( p_dest, p_src, count ); +#endif return CL_SUCCESS; } __except(EXCEPTION_EXECUTE_HANDLER) diff --git a/trunk/ulp/sdp/kernel/SdpGenUtils.h b/trunk/ulp/sdp/kernel/SdpGenUtils.h index 8440ba5c..4b655fd6 100644 --- a/trunk/ulp/sdp/kernel/SdpGenUtils.h +++ b/trunk/ulp/sdp/kernel/SdpGenUtils.h @@ -176,8 +176,8 @@ int IbalToWsaError(const ib_api_status_t ib_status ); #define WSAESTALE (WSABASEERR+70) #define WSAEREMOTE (WSABASEERR+71) - - - +// Used for IRP cancell +#define ERROR_OPERATION_ABORTED 995L +#define WSA_OPERATION_ABORTED (ERROR_OPERATION_ABORTED) #endif // _SDP_GEN_UTILS_H diff --git a/trunk/ulp/sdp/kernel/SdpLock.h b/trunk/ulp/sdp/kernel/SdpLock.h index b5c72081..4f8d122f 100644 --- a/trunk/ulp/sdp/kernel/SdpLock.h +++ b/trunk/ulp/sdp/kernel/SdpLock.h @@ -125,7 +125,7 @@ public: bool Lock(bool Force = false) { KIRQL OldIrql; int OldFlags = 0; - NTSTATUS rc = STATUS_SUCCESS; + NTSTATUS rc = STATUS_SUCCESS, rc1 = STATUS_SUCCESS; ASSERT(KeGetCurrentIrql() == PASSIVE_LEVEL); bool Locked = false; bool WaitedOnLock = false; @@ -158,13 +158,18 @@ public: ASSERT(m_NumberOfClientWaiting >= 0); KeReleaseSpinLock(&m_SpinLock, OldIrql); rc = HandleFlags(OldFlags); + if(!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + } if ((Force == false) && (!NT_SUCCESS(rc) || (m_flags & ERROR_SIGNALLED) || - (!NT_SUCCESS(rc = m_CheckSocketState(m_pSdpSocket))) + (!NT_SUCCESS(rc1 = m_CheckSocketState(m_pSdpSocket))) )) { // We have to signal the error to the calling side - SDP_PRINT(SDP_ERR, SDP_LOCK, ("HandleFlags failed rc = 0x%x\n", rc )); + if(!NT_SUCCESS(rc1)) { + SDP_PRINT(SDP_ERR, SDP_LOCK, ("m_CheckSocketState failed rc1 = 0x%x\n", rc1 )); + } Locked = false; KeAcquireSpinLock(&m_SpinLock, &OldIrql); m_InUse = false; diff --git a/trunk/ulp/sdp/kernel/SdpRecvPool.cpp b/trunk/ulp/sdp/kernel/SdpRecvPool.cpp index 661cac5d..f0442e4c 100644 --- a/trunk/ulp/sdp/kernel/SdpRecvPool.cpp +++ b/trunk/ulp/sdp/kernel/SdpRecvPool.cpp @@ -145,8 +145,10 @@ RecvPool::RecievedBuffer(BufferDescriptor *pBufferDescriptor, bool error) (int)pHeader->seq_ack)); m_pSdpSocket->m_SendBufferPool.SetRemoteRecvBuf(rRecvBuf); - // m_DisConnRecieved is the last message that should be recieved - ASSERT(m_DisConnRecieved == false); // BUGBUG: do a real check here + // m_DisConnRecieved is the last "real" message that should be recieved + // we might still get credits update + ASSERT(m_DisConnRecieved == false || + (pHeader->mid == SDP_MID_DATA && pHeader->size == sizeof msg_hdr_bsdh)); // BUGBUG: do a real check here // ???? Handle more state changes here ???? if (pHeader->mid != SDP_MID_DATA) { diff --git a/trunk/ulp/sdp/kernel/SdpSocket.cpp b/trunk/ulp/sdp/kernel/SdpSocket.cpp index 362dcda0..bf0d37e5 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.cpp +++ b/trunk/ulp/sdp/kernel/SdpSocket.cpp @@ -155,6 +155,8 @@ SdpSocket::SdpSocket() m_CloseSocketCalled = false; m_ShutdownCalled = false; m_DisconnectConnectionRecieved = false; + m_pSdpUserFile = NULL; + InitializeListHead(&m_CallBackRequestList); } @@ -167,7 +169,8 @@ VOID SdpSocket::AssertLocked() NTSTATUS SdpSocket::Init( WspSocketIn *pSocketInParam, - WspSocketOut *pSocketOutParam) + WspSocketOut *pSocketOutParam, + SdpUserFile *pSdpUserFile) { NTSTATUS rc = STATUS_SUCCESS; SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p\n", this)); @@ -190,6 +193,10 @@ NTSTATUS SdpSocket::Init( rc = STATUS_NO_MEMORY; goto Cleanup; } + + m_pSdpUserFile = pSdpUserFile; + m_pSdpUserFile->AddRef(); + Cleanup: return rc; @@ -200,25 +207,26 @@ NTSTATUS SdpSocket::AcceptRequests() { // Check if our state allows us to handle send/recv/accept ... if (m_ShutdownCalled) return STATUS_SHUTDOWN_IN_PROGRESS; - if (m_CloseSocketCalled) return STATUS_SHUTDOWN_IN_PROGRESS; + if (m_CloseSocketCalled) return STATUS_HANDLES_CLOSED; // Not the exact code + // But it seems relatively closest return STATUS_SUCCESS; } NTSTATUS SdpSocket::WSPSend( WspSendIn *pWspSendIn, - WspSendOut *pWspSendOut + WspSendOut *pWspSendOut, + IRP *pIrp ) { SDP_PRINT(SDP_DEBUG, SDP_SOCKET, ("this = 0x%p size = %d \n",this, pWspSendIn->BufferSize)); NTSTATUS rc = STATUS_SUCCESS; - NTSTATUS rc1; // used only to check that there are no more errors on the - // return path BufferDescriptor * pBufferDescriptor = NULL; bool First = true; ULONG Coppied = 0; - bool Locked = false; PRKEVENT pBuffersEvent = NULL; + bool BufferCopied; + NTSTATUS rc1; // For zero bytes send we currently don't do anything and return with status // success @@ -227,67 +235,84 @@ NTSTATUS SdpSocket::WSPSend( goto Cleanup; } - while (Coppied < pWspSendIn->BufferSize) { - if ((Locked == false) && !m_Lock.Lock()) { - SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); - rc = STATUS_SHUTDOWN_IN_PROGRESS; - goto Cleanup; - } - Locked = true; - ASSERT(pBuffersEvent == NULL); + if (!m_Lock.Lock()) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); + rc = STATUS_SHUTDOWN_IN_PROGRESS; + goto Cleanup; + } - if ((m_state != SS_CONNECTED)) { - // We can not send now. - SDP_PRINT(SDP_WARN, SDP_SOCKET, ("Can't send now, m_state = %s\n", - SS2String(m_state) - )); - rc = STATUS_SHUTDOWN_IN_PROGRESS; - pWspSendOut->Errno = WSAENOTCONN; - - m_Lock.Unlock(); // Error ignored as this is already an error pass - Locked = false; + if ((m_state != SS_CONNECTED)) { + // We can not send now. + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("Can't send now, m_state = %s\n", + SS2String(m_state) + )); + rc = STATUS_SHUTDOWN_IN_PROGRESS; + pWspSendOut->Errno = WSAENOTCONN; + + m_Lock.Unlock(); // Error ignored as this is already an error pass + goto Cleanup; + } + + // Check if there is already data in the queue, if yes we just + // increase the queue and leave. + + ASSERT(!m_CloseSocketCalled); + rc = m_SendBufferPool.AddToUserBuffers(&BufferCopied, false , pWspSendIn->pData, pWspSendIn->BufferSize,Coppied, pIrp); + ASSERT(rc == STATUS_PENDING || rc == STATUS_SUCCESS); + if (rc == STATUS_PENDING) { + ASSERT(BufferCopied); + // TODO: We already took the lock, and we are in the right context, + // We should probably do some work there + // Data was already copied to the buffer we are done. + rc = m_Lock.Unlock(); + if (!NT_SUCCESS(rc)) { + // No need to complete the IRP, as it will be be deleted + // when all other IRPs will be + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); goto Cleanup; } + // This IRP will be pending (make sure to change this after the unlock) + rc = STATUS_PENDING; + + goto Cleanup; + } + + ASSERT(rc == STATUS_SUCCESS); + ASSERT(BufferCopied == false); + // We now try to copy the data to the internal buffers + + while (Coppied < pWspSendIn->BufferSize) { - rc = m_SendBufferPool.GetBuffer(&pBufferDescriptor, &pBuffersEvent, First); + rc = m_SendBufferPool.GetBuffer(&pBufferDescriptor); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.GetBuffer failed rc = 0x%x\n", rc )); m_Lock.Unlock(); // Error ignored as this is already an error pass - Locked = false; goto Cleanup; } - First = false; - - if (pBuffersEvent != NULL) { - // We are told to wait on this event + + if (pBufferDescriptor == NULL) { + // We don't have a new buffer any more, we store the remaining + // buffer and quit + rc = m_SendBufferPool.AddToUserBuffers( + &BufferCopied, + true , + pWspSendIn->pData, + pWspSendIn->BufferSize, + Coppied, + pIrp); + ASSERT(rc == STATUS_PENDING); + ASSERT(BufferCopied == true); + rc = m_Lock.Unlock(); - Locked = false; if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); + // No need to complete the IRP, as it will be be deleted + // when all other IRPs will be goto Cleanup; } - - rc = MyKeWaitForSingleObject( - pBuffersEvent, - UserRequest, - UserMode, - FALSE, - NULL - ); - pBuffersEvent = NULL; - if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { - // BUGBUG: Think what to do here, we should be able to stop the - // connect, and quit (probably shutdown should be enough) - SDP_PRINT(SDP_WARN, SDP_SOCKET, ("MyKeWaitForSingleObject was alerted rc = 0x%x\n", rc )); - rc = STATUS_UNEXPECTED_IO_ERROR; - //pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error - Shutdown(); - goto Cleanup; - } - // try getting the buffer again - continue; + rc = STATUS_PENDING;// This IRP will be pending (make sure to change this after the unlock) + goto Cleanup; } - // copy the data from the user mode to the buffers ULONG CopySize = pBufferDescriptor->BufferSize - sizeof msg_hdr_bsdh; CopySize = min(CopySize, pWspSendIn->BufferSize - Coppied); @@ -315,23 +340,29 @@ NTSTATUS SdpSocket::WSPSend( goto Cleanup; } } - ASSERT(Locked == true); + rc = m_Lock.Unlock(); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock() failed rc = 0x%x\n", rc )); goto Cleanup; } - - m_SendBufferPool.AllowOthersToGet(); - + Cleanup: if (NT_SUCCESS(rc) ) { - pWspSendOut->Errno = 0; - ASSERT(pWspSendIn->BufferSize == Coppied); - pWspSendOut->NumberOfBytesSent = Coppied; + ASSERT((rc == STATUS_SUCCESS) || (rc == STATUS_PENDING)); + if (rc != STATUS_PENDING) { + pWspSendOut->Errno = 0; + ASSERT(pWspSendIn->BufferSize == Coppied); + pWspSendOut->NumberOfBytesSent = Coppied; + } } else { // Make sure that we have the error setted + Shutdown(); ASSERT(pWspSendOut->Errno != 0); // BUGBUG: Need to make sure that this + if(pWspSendOut->Errno == 0) { + // Some default value + pWspSendOut->Errno = WSAENOBUFS; + } SDP_PRINT(SDP_WARN, SDP_SOCKET, ("this = 0x%p rc = 0x%x\n",this, rc)); // is indeed the case. } @@ -544,7 +575,7 @@ NTSTATUS SdpSocket::WSPConnect( goto Cleanup; } - rc = g_pSdpDriver->m_pSdpArp->DestPortGidFromMac(pWspConnectIn->DestMac, &DestPortGid); + rc = g_pSdpDriver->m_pSdpArp->DestPortGidFromMac(m_SrcPortGuid, pWspConnectIn->DestMac, &DestPortGid); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_pSdpArp->DestPortGidFromIP failed rc = 0x%x\n", rc )); pWspConnectOut->Errno = WSAENETUNREACH; // BUGBUG: verify this error @@ -1066,7 +1097,13 @@ SdpSocket::WSPCloseSocket( SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("this = 0x%p state = %s \n",this, SS2String(m_state))); OBJECT_ATTRIBUTES attr; HANDLE ThreadHandle; - + bool sleep = false; +restart: + + if (sleep) { + Sleep(1*1000*1000);//??????? + } + sleep = true; if (!m_Lock.Lock()) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); @@ -1084,6 +1121,13 @@ SdpSocket::WSPCloseSocket( goto Cleanup; } + + //????????? + if (m_SendBufferPool.m_UserPackets.Size() > 0) { + m_Lock.Unlock(); + goto restart; + } + // This will force that no more calls will be allowed ASSERT(m_CloseSocketCalled == FALSE); // If this is not the case // We shouldn't be able to take the lock @@ -1152,7 +1196,7 @@ SdpSocket::WSPCloseSocket( m_pCloseSocketThread = NULL; // Will be delated when the callback thread is deleted rc = m_Lock.Unlock(); - if (rc == STATUS_SHUTDOWN_IN_PROGRESS) { + if (rc == STATUS_HANDLES_CLOSED) { // shutdown in progress is fine since we have started the shutdown ... rc = STATUS_SUCCESS; } @@ -1262,7 +1306,7 @@ NTSTATUS SdpSocket::CmSendRTU() int MaxMessageSize = min(m_hello_ack.hah.l_rcv_size, MAX_SEND_BUFFER_SIZE); - rc = m_SendBufferPool.Init(MAX_SEND_PACKETS, QP_ATTRIB_SQ_DEPTH, MaxMessageSize, m_pd, m_qp, m_lkey, this); + rc = m_SendBufferPool.Init(MAX_SEND_PACKETS, SDP_QP_ATTRIB_SQ_DEPTH, MaxMessageSize, m_pd, m_qp, m_lkey, this); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.Init failed rc = 0x%x\n", rc )); goto Cleanup; @@ -1428,7 +1472,7 @@ SdpSocket::CmReqCallback(IN ib_cm_req_rec_t *p_cm_req_rec) WspSocketIn SocketInParam; WspSocketOut SocketOutParam; SocketInParam.dwFlags = 0; - rc = pNewSocket->Init(&SocketInParam, &SocketOutParam); + rc = pNewSocket->Init(&SocketInParam, &SocketOutParam, m_pSdpUserFile); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("pNewSocket.Init() failed rc = 0x%x\n", rc )); goto ErrorLocked; @@ -1498,7 +1542,7 @@ SdpSocket::CmReqCallback(IN ib_cm_req_rec_t *p_cm_req_rec) // We will now call init on the sender and the reciever int MaxMessageSize = min(msg_hello->hh.l_rcv_size, MAX_SEND_BUFFER_SIZE); - rc = pNewSocket->m_SendBufferPool.Init(MAX_SEND_PACKETS, QP_ATTRIB_SQ_DEPTH, MaxMessageSize, pNewSocket->m_pd, pNewSocket->m_qp, pNewSocket->m_lkey, pNewSocket); + rc = pNewSocket->m_SendBufferPool.Init(MAX_SEND_PACKETS, SDP_QP_ATTRIB_SQ_DEPTH, MaxMessageSize, pNewSocket->m_pd, pNewSocket->m_qp, pNewSocket->m_lkey, pNewSocket); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.Init failed rc = 0x%x\n", rc )); goto ErrorLocked; @@ -1753,10 +1797,10 @@ SdpSocket::CmDreqCallback(IN ib_cm_dreq_rec_t *p_cm_dreq_rec) // Take the lock and verify the state - Locked = m_Lock.Lock(); + rc = m_Lock.LockRc(); // BUGBUG: It seems that even when the lock fails we should send // drep - if (!Locked) { + if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Lock failed rc = 0x%x\n", rc )); goto Cleanup; } @@ -1958,15 +2002,16 @@ NTSTATUS SdpSocket::send_cb() { SDP_PRINT(SDP_DEBUG, SDP_SOCKET, ("called this =0x%p\n", this)); NTSTATUS rc = STATUS_SUCCESS, rc1 = STATUS_SUCCESS, rc2 = STATUS_SUCCESS; + AssertLocked(); ib_api_status_t ib_status; ib_wc_t *p_wc, *p_free; size_t i; BufferDescriptor *pBufferDescriptor = NULL; - for( i = 0; i < QP_ATTRIB_SQ_DEPTH; i++ ) { + for( i = 0; i < SDP_QP_ATTRIB_SQ_DEPTH; i++ ) { m_SendComplitionWC[i].p_next = &m_SendComplitionWC[i + 1]; } - m_SendComplitionWC[QP_ATTRIB_SQ_DEPTH - 1].p_next = NULL; + m_SendComplitionWC[SDP_QP_ATTRIB_SQ_DEPTH - 1].p_next = NULL; do { @@ -2127,7 +2172,7 @@ NTSTATUS SdpSocket::CreateQp() } /* Allocate send CQ. */ - cq_create.size = QP_ATTRIB_SQ_DEPTH; + cq_create.size = SDP_QP_ATTRIB_SQ_DEPTH; cq_create.pfn_comp_cb = SdpSocket::__send_cb1; ib_status = ib_create_cq( @@ -2150,7 +2195,7 @@ NTSTATUS SdpSocket::CreateQp() qp_create.rq_depth = QP_ATTRIB_RQ_DEPTH; qp_create.rq_sge = QP_ATTRIB_RQ_SGE; /* To support buffers spanning pages. */ qp_create.h_rq_cq = m_rcq; - qp_create.sq_depth = QP_ATTRIB_SQ_DEPTH; + qp_create.sq_depth = SDP_QP_ATTRIB_SQ_DEPTH; //TODO: Figure out the right number of SGE entries for sends. qp_create.sq_sge = QP_ATTRIB_SQ_SGE; qp_create.h_sq_cq = m_scq; @@ -2318,6 +2363,41 @@ VOID SdpSocket::CreateCmRequest( cm_req->pfn_cm_rep_cb = cm_rep_callback; } + +VOID SdpSocket::UsersThreadCallBack(bool Send) +{ + NTSTATUS rc = STATUS_SUCCESS; + + if (!m_Lock.Lock()) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Failed to lock this = 0x%p \n",this)); + // Error is ignored, as it is a callback path, socket is already at an error state + goto Cleanup; + } + if (Send) { + InitializeListHead(&m_CallBackRequestList); + rc = m_SendBufferPool.UsersThreadCallBack(); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.UsersThreadCallBack failed this = 0x%p, rc = 0x%x \n", + this, rc)); + m_Lock.Unlock(); // Error is ignored, as this is already an error path + Shutdown(); + goto Cleanup; + } + + } + + rc = m_Lock.Unlock(); // Error is ignored, as it is a callback path + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_Lock.Unlock failed this = 0x%p, rc = 0x%x \n", + this, rc)); + Shutdown(); + goto Cleanup; + } + +Cleanup: + return; +} + // static VOID SdpSocket::ShutdownCB(VOID* pContext) { @@ -2485,12 +2565,16 @@ VOID SdpSocket::Shutdown() delete m_pCloseSocketThread; m_pCloseSocketThread = NULL; } - // Now that all ibal operations have finished we can free the memory m_SendBufferPool.ShutDown(); m_RecvBufferPool.ShutDown(); + if (m_pSdpUserFile != NULL) { + m_pSdpUserFile->RemoveSocket(this); + m_pSdpUserFile->Release(); + m_pSdpUserFile = NULL; + } /* Memory reagion probably cleans when the other handles are closed diff --git a/trunk/ulp/sdp/kernel/SdpSocket.h b/trunk/ulp/sdp/kernel/SdpSocket.h index 808aa2bd..3b1dbf43 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.h +++ b/trunk/ulp/sdp/kernel/SdpSocket.h @@ -43,13 +43,12 @@ It keeps a list of all the objects so we know when to remove them. const int MAX_SEND_BUFFER_SIZE = 1*4096; // This is the maximum send packet size -// BUGBUG: Check why changing this param crushes the system const int MAX_RECV_BUFFER_SIZE = 1*4096; // This is the maximum send packet size const int MAX_SEND_PACKETS = 200; // This is the maximum number of packets allocated per send const int MAX_RECV_PACKETS = 200; // This is the maximum number of packets allocated per send -const short QP_ATTRIB_SQ_DEPTH = 64; +const short SDP_QP_ATTRIB_SQ_DEPTH = 64; const short QP_ATTRIB_SQ_SGE = 1; /* Set based on inline data requirements */ //#define QP_ATTRIB_RESPONDER_RESOURCES 4 const short QP_ATTRIB_INITIATOR_DEPTH = 4; @@ -142,7 +141,7 @@ private: KEVENT m_ShutdownCompleteEvent; KEVENT m_DisconectSentEvent; - ib_wc_t m_SendComplitionWC[QP_ATTRIB_SQ_DEPTH]; + ib_wc_t m_SendComplitionWC[SDP_QP_ATTRIB_SQ_DEPTH]; ib_wc_t m_RecvComplitionWC[QP_ATTRIB_RQ_DEPTH]; // The following three falgs are used to shutdown a socket @@ -150,7 +149,9 @@ private: bool m_ShutdownCalled; bool m_DisconnectConnectionRecieved; - ThreadHandle* m_pCloseSocketThread; + ThreadHandle* m_pCloseSocketThread; + + SdpUserFile *m_pSdpUserFile; static VOID __send_cb1( IN const ib_cq_handle_t h_cq, @@ -166,6 +167,8 @@ public: RecvPool m_RecvBufferPool; ConnectionList m_ConnectionList; + LIST_ENTRY m_CallBackRequestList;// Used by the call back request thread to hold the request + SdpSocket(); @@ -175,7 +178,8 @@ public: NTSTATUS Init( WspSocketIn *pSocketInParam, - WspSocketOut *pSocketOutParam + WspSocketOut *pSocketOutParam, + SdpUserFile *pSdpUserFile ); NTSTATUS WSPConnect( @@ -185,7 +189,8 @@ public: NTSTATUS WSPSend( WspSendIn *pWspSendIn, - WspSendOut *pWspSendOut + WspSendOut *pWspSendOut, + IRP *pIrp ); NTSTATUS WSPRecv( @@ -244,6 +249,14 @@ public: USHORT DestPort ); + NTSTATUS RequestCallBack() { + AssertLocked(); + ASSERT(IsListEmpty(&m_CallBackRequestList)); + return m_pSdpUserFile->RequestCallBack(&m_CallBackRequestList); + } + + VOID UsersThreadCallBack(bool Send); + VOID CmRepCallback(IN ib_cm_rep_rec_t *p_cm_rep_rec); VOID CmReqCallback(IN ib_cm_req_rec_t *p_cm_req_rec); VOID CmRtuCallback(IN ib_cm_rtu_rec_t *p_cm_rtu_rec); diff --git a/trunk/ulp/sdp/kernel/SdpUserFile.cpp b/trunk/ulp/sdp/kernel/SdpUserFile.cpp index 898b32fc..5a62b452 100644 --- a/trunk/ulp/sdp/kernel/SdpUserFile.cpp +++ b/trunk/ulp/sdp/kernel/SdpUserFile.cpp @@ -36,19 +36,21 @@ SdpUserFile::SdpUserFile() { InitializeListHead(&m_SocketsList); m_shutdown = false; + m_NumberOfUserThreads = 0; } NTSTATUS SdpUserFile::Init() { - KeInitializeSpinLock(&m_Lock); InitializeListHead(&m_SocketsList); + KeInitializeEvent(&m_UsersCallEvent, SynchronizationEvent , FALSE ); m_shutdown = false; - + KeInitializeSpinLock(&m_Lock); return STATUS_SUCCESS; } void SdpUserFile::Shutdown() { + SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("Called this = 0x%p \n",this)); // go over the entire list, and release it's objects CSpinLockWrapper Lock(m_Lock); Lock.Lock(); @@ -57,6 +59,22 @@ void SdpUserFile::Shutdown() Lock.Unlock(); return; } + m_shutdown = true; + + // Go over the list of callbacks that you have to make and remove + // them. + while (m_UsersCallList.Size() > 0) { + PLIST_ENTRY pTemp = m_UsersCallList.RemoveHeadList(); + SdpSocket * pSdpSocket = CONTAINING_RECORD(pTemp, SdpSocket , m_CallBackRequestList); + + // Don't call release with the lock being hold + Lock.Unlock(); + pSdpSocket->Release(); + // It seems that we shoule be protected by the m_shutdown + // flag, but will take the lock just in case + Lock.Lock(); + } + while (!IsListEmpty(&m_SocketsList)) { PLIST_ENTRY pTemp = RemoveHeadList(&m_SocketsList); SdpSocket *pSdpSocket = CONTAINING_RECORD(pTemp, SdpSocket, m_UserFileList ); @@ -68,9 +86,11 @@ void SdpUserFile::Shutdown() // It seems that we shoule be protected by the m_shutdown // flag, but will take the lock just in case Lock.Lock(); - } + Lock.Unlock(); + // Free the users thread + KePulseEvent(&m_UsersCallEvent,IO_NO_INCREMENT ,FALSE); } NTSTATUS SdpUserFile::AddSocket(SdpSocket *pSdpSocket) @@ -143,4 +163,98 @@ SdpSocket *SdpUserFile::SocketByPointer(VOID *Socket) return pSdpSocket; } +NTSTATUS +SdpUserFile::RequestCallBack(LIST_ENTRY *pList) +{ + NTSTATUS rc = STATUS_SUCCESS; + CSpinLockWrapper Lock(m_Lock); + SdpSocket *pSdpSocket = NULL; + + pSdpSocket = CONTAINING_RECORD(pList, SdpSocket , m_CallBackRequestList); + // Take the lock and add the wanted event + Lock.Lock(); + if (m_shutdown) { + Lock.Unlock(); + return STATUS_SHUTDOWN_IN_PROGRESS; + } + pSdpSocket->AddRef(); + ASSERT(IsListEmpty(pList)); + m_UsersCallList.InsertTailList(pList); + + Lock.Unlock(); + // Make sure someone tries to read our objects + KeSetEvent(&m_UsersCallEvent,IO_NETWORK_INCREMENT ,FALSE); + return rc; + +} + +/* + This function is being called by a thread that reaches us from + the user, and is responisble for copying data from user buffers + to kernel memory. + + Only in the case of shutdown, the thread will exit. + Since we only do the cleanup of the SdpUserFile on IRP_MJ_CLOSE + there is no feer of working on a removed object. +*/ + +NTSTATUS +SdpUserFile::UsersThread() +{ + NTSTATUS rc = STATUS_SUCCESS; + CSpinLockWrapper Lock(m_Lock); + LIST_ENTRY *item = NULL; + SdpSocket *pSdpSocket = NULL; + bool ShutdownCalled = false; // This will only change from false + // to true + long NumberOfThreads = InterlockedIncrement(&m_NumberOfUserThreads); + if(NumberOfThreads != 1) { + // It seems that more than one uesr is here, don't let him + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("More than one user thread !!! \n")); + ASSERT(FALSE); + return STATUS_ACCESS_DENIED; + } + + int count = 0; + while (true) { + // Take the lock, and see the state: + Lock.Lock(); + ShutdownCalled = m_shutdown; + // Process all data that exists + if(m_UsersCallList.Size() > 0) { + item = m_UsersCallList.RemoveHeadList(); + pSdpSocket = CONTAINING_RECORD(item, SdpSocket , m_CallBackRequestList); + + Lock.Unlock(); + + // Do the call back + if (!ShutdownCalled) { + pSdpSocket->UsersThreadCallBack(true); + } else { + //Currently, we don't call on shutdown. + } + + // Release everything + pSdpSocket->Release(); + continue; + } + if (ShutdownCalled) { + // Is there some other thread that is hidding ? Shouldn't happen + // but we can't trust the user + KePulseEvent(&m_UsersCallEvent,IO_NO_INCREMENT ,FALSE); + Lock.Unlock(); + return STATUS_SUCCESS; + } + // Wait for a new event to arrive + Lock.Unlock(); + rc = MyKeWaitForSingleObject(&m_UsersCallEvent, UserRequest, UserMode, TRUE, NULL); + + if (rc == STATUS_USER_APC ) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("Worker thread has recieved a user APC, shuting down the process \n")); + Shutdown(); + return STATUS_SUCCESS; + } + } + +} diff --git a/trunk/ulp/sdp/kernel/SdpUserFile.h b/trunk/ulp/sdp/kernel/SdpUserFile.h index c115de66..17263126 100644 --- a/trunk/ulp/sdp/kernel/SdpUserFile.h +++ b/trunk/ulp/sdp/kernel/SdpUserFile.h @@ -41,8 +41,6 @@ It keeps a list of all the objects so we know when to remove them. #ifndef _SDP_USER_FILE_H #define _SDP_USER_FILE_H - - class SdpUserFile : public RefCountImpl { private: KSPIN_LOCK m_Lock; @@ -50,7 +48,12 @@ private: // BUGBUG: Use something more effiecent for this storage LIST_ENTRY m_SocketsList; - + + LinkedList m_UsersCallList; // This list holds all the requests of users for callback + KEVENT m_UsersCallEvent; // The users thread waits on this event + + volatile long m_NumberOfUserThreads;// Make sure that there is only one user thread + public: SdpUserFile(); @@ -64,6 +67,12 @@ public: VOID RemoveSocket(SdpSocket *pSdpSocket); +// VOID MoveSocketT(SdpSocket *pSdpSocket); + + NTSTATUS RequestCallBack(LIST_ENTRY *pList); + + NTSTATUS UsersThread(); + }; #endif //_SDP_USER_FILE_H diff --git a/trunk/ulp/sdp/todo b/trunk/ulp/sdp/todo index d44cafce..bf3e60eb 100644 --- a/trunk/ulp/sdp/todo +++ b/trunk/ulp/sdp/todo @@ -9,12 +9,11 @@ KERNEL MODE: 1) Clean error path. send: - 1) On send: implmeant some kind of a negal algorithm. - 2) On send: Create some kind of mechanism that will allow to recieve complitions on more than + 1) implmeant some kind of a negal algorithm. + 2) Create some kind of mechanism that will allow to recieve complitions on more than one send. - 3) If possibale, post more than one send. - 4) Consider copying big packets from the DPC handler, instead of using the users thread - for the copy + 3) If possibale, post more than one send, at a time + 4) [Critical] Use refferance count when queing the socket in an IRP structure. recv: 1) Find and fix the race when the socket is being initialized @@ -34,6 +33,14 @@ general: Check the ArpCache problems (on a native windows machine) and decide what to do. +Overlapped IO: +1) Split to two types of overlapped operations +2) Use referance counting on our type? If not, make sure that the life time of objects is well. +3) Replace InterlockedIncrement with _InterlockedIncrement +4) Make sure that in UserFile module, operations can start only when there is a user mode thread + that is already waiting. + + USER MODE: * Check the lifetime of the SdpSocket (when is it deleted and so)?? -- 2.41.0