From 07f6c809145bbd9cf50a150b84a70e8137c01bc3 Mon Sep 17 00:00:00 2001 From: tzachid Date: Mon, 21 Nov 2005 12:13:24 +0000 Subject: [PATCH] Some fixes to send code. (Rev 82) git-svn-id: svn://openib.tc.cornell.edu/gen1@166 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86 --- trunk/ulp/sdp/kernel/SdpBufferPool.cpp | 51 +++++++-- trunk/ulp/sdp/kernel/SdpBufferPool.h | 12 ++- trunk/ulp/sdp/kernel/SdpDriver.cpp | 2 +- trunk/ulp/sdp/kernel/SdpLock.h | 33 ++++-- trunk/ulp/sdp/kernel/SdpSocket.cpp | 144 +++++++++++++++++++++++-- trunk/ulp/sdp/kernel/SdpSocket.h | 12 ++- trunk/ulp/sdp/kernel/SdpTrace.cpp | 3 +- trunk/ulp/sdp/kernel/SdpTrace.h | 5 +- trunk/ulp/sdp/todo | 10 +- 9 files changed, 232 insertions(+), 40 deletions(-) diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp index e801a4dc..4f131018 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.cpp +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.cpp @@ -3,6 +3,7 @@ #include "preCompile.h" + NTSTATUS BufferPool::Init( int MaxBuffers, @@ -10,7 +11,9 @@ BufferPool::Init( int MaxMessageSize, ib_pd_handle_t pd, ib_qp_handle_t qp, - net32_t lkey + net32_t lkey, + SdpSocket *pSdpSocket + ) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); @@ -28,6 +31,10 @@ BufferPool::Init( m_qp = qp; ASSERT(lkey != NULL); m_lkey = lkey; +#if DBG + m_pSdpSocket = pSdpSocket; +#endif + return STATUS_SUCCESS; } @@ -53,6 +60,8 @@ BufferPool::GetBuffer( { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p FirstBuffer = %s\n",this, FirstBuffer ? "TRUE" : "FALSE")); + AssertLocked(); + NTSTATUS rc = STATUS_SUCCESS; *ppBufferDescriptor = NULL; @@ -68,13 +77,13 @@ BufferPool::GetBuffer( } // Can we supply a buffer right now ? - if (m_CurrentlyAllocated < m_MaxBuffers) { + if (m_CurrentlySentBuffers < m_MaxConcurrentSends) { // yes, supply a buffer if (m_FreePackets.Size() > 0) { LIST_ENTRY *item = m_FreePackets.RemoveHeadList(); *ppBufferDescriptor = CONTAINING_RECORD(item, BufferDescriptor , BuffersList); goto Cleanup; - } else { + } else if (m_CurrentlyAllocated < m_MaxBuffers) { // we need to alocate a new buffer rc = AllocateBuffer(ppBufferDescriptor); if (!NT_SUCCESS(rc)) { @@ -85,13 +94,12 @@ BufferPool::GetBuffer( m_CurrentlyAllocated++; goto Cleanup; } - } else { - // No buffers available, we have to wait - ASSERT(m_ClientWaiting == false); - KeClearEvent(&m_WaitingClients); - m_ClientWaiting = true; - *ppEvent = &m_WaitingClients; - } + } + // No buffers available, we have to wait + ASSERT(m_ClientWaiting == false); + KeClearEvent(&m_WaitingClients); + m_ClientWaiting = true; + *ppEvent = &m_WaitingClients; Cleanup: return rc; @@ -102,6 +110,8 @@ BufferPool::AddBufferToQueuedList(BufferDescriptor *pBufferDescriptor) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p pBufferDescriptor = 0x%x\n",this, pBufferDescriptor)); + AssertLocked(); + NTSTATUS rc = STATUS_SUCCESS; if ((m_CurrentlySentBuffers < m_MaxConcurrentSends) && @@ -138,6 +148,15 @@ VOID BufferPool::ReturnBuffer(BufferDescriptor *pBufferDescriptor) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p buffer=0x%p\n",this, pBufferDescriptor)); + +#if DBG + if (m_CurrentlySentBuffers == 1) { + SDP_PRINT(SDP_WARN, SDP_BUFFER_POOL, ("Currently no packets are bing sent m_ClientWaiting = %s\n", + m_ClientWaiting ? "true" : "false")); + } +#endif + + AssertLocked(); m_FreePackets.InsertTailList(&pBufferDescriptor->BuffersList); // Is there a client waiting ? if ( m_ClientWaiting) { @@ -158,6 +177,7 @@ NTSTATUS BufferPool::SendBuffersIfCan() { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); NTSTATUS rc = STATUS_SUCCESS; while ((m_QueuedPackets.Size() > 0) && @@ -186,6 +206,7 @@ VOID BufferPool::ShutDown() { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + //???? AssertLocked(); BufferDescriptor *pBufferDescriptor = NULL; LIST_ENTRY *item = NULL; @@ -207,6 +228,7 @@ NTSTATUS BufferPool::AllocateBuffer(BufferDescriptor ** ppBufferDescriptor) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); NTSTATUS rc = STATUS_SUCCESS; BufferDescriptor *pBufferDescriptor = NULL; @@ -250,6 +272,7 @@ Cleanup: ExFreePoolWithTag(pBufferDescriptor->pBuffer, SEND_BUFFERS_ALLOCATION_TAG); } ExFreePoolWithTag(pBufferDescriptor, SEND_BUFFERS_ALLOCATION_TAG); + pBufferDescriptor = NULL; } } *ppBufferDescriptor = pBufferDescriptor; @@ -267,6 +290,7 @@ NTSTATUS BufferPool::SendBuffer(BufferDescriptor *pBufferDescriptor) { SDP_PRINT(SDP_TRACE, SDP_BUFFER_POOL, ("this = 0x%p \n",this)); + AssertLocked(); NTSTATUS rc = STATUS_SUCCESS; msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *) pBufferDescriptor->pBuffer; @@ -308,4 +332,11 @@ Cleanup: return rc; } +VOID +BufferPool::AssertLocked() { +#if DBG + m_pSdpSocket->AssertLocked(); +#endif +} + diff --git a/trunk/ulp/sdp/kernel/SdpBufferPool.h b/trunk/ulp/sdp/kernel/SdpBufferPool.h index a73e6464..8ad0da57 100644 --- a/trunk/ulp/sdp/kernel/SdpBufferPool.h +++ b/trunk/ulp/sdp/kernel/SdpBufferPool.h @@ -4,6 +4,7 @@ #define H_SDP_BUFFER_POOL_H + // This is simply a wrapper to the LIST_ENTRY class that allows // easier work with this list class LinkedList { @@ -78,7 +79,8 @@ public: int MaxMessageSize, ib_pd_handle_t pd, ib_qp_handle_t qp, - net32_t lkey + net32_t lkey, + SdpSocket *pSdpSocket ); NTSTATUS GetBuffer( @@ -116,7 +118,7 @@ private: bool m_ClientBeingServed; // true if we have already started giving buffers to a client LinkedList m_FreePackets; // This packets are free and might be used - LinkedList m_QueuedPackets; // This packets were filled with data and should be filled + LinkedList m_QueuedPackets; // This packets were filled with data and should be sent // TODO: A queue of events for threads that are waiting for buffers. @@ -131,6 +133,12 @@ private: KEVENT m_WaitingClients; // switch to a linked list bool m_ClientWaiting; +#if DBG + SdpSocket *m_pSdpSocket; +#endif //DBG + +VOID AssertLocked(); + }; #endif // H_SDP_BUFFER_POOL_H diff --git a/trunk/ulp/sdp/kernel/SdpDriver.cpp b/trunk/ulp/sdp/kernel/SdpDriver.cpp index d78e8df8..556328ac 100644 --- a/trunk/ulp/sdp/kernel/SdpDriver.cpp +++ b/trunk/ulp/sdp/kernel/SdpDriver.cpp @@ -66,7 +66,7 @@ extern "C" NTSTATUS DriverEntry ( &DevName, FILE_DEVICE_UNKNOWN, 0, - TRUE, + FALSE, &pDevObj ); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_DRIVER, ("IoCreateDevice failed rc = 0x%x\n", rc )); diff --git a/trunk/ulp/sdp/kernel/SdpLock.h b/trunk/ulp/sdp/kernel/SdpLock.h index 9ec2ea8c..c920d691 100644 --- a/trunk/ulp/sdp/kernel/SdpLock.h +++ b/trunk/ulp/sdp/kernel/SdpLock.h @@ -66,6 +66,7 @@ public: KeInitializeEvent(&m_Event, NotificationEvent , TRUE); KeInitializeSpinLock(&m_SpinLock); m_SendCBHandler = NULL; + m_ClientWaiting = false; } VOID Init(SendCBHandler SendCB, SdpSocket *pSdpSocket) @@ -92,6 +93,7 @@ public: if (m_InUse) { // We have to release the spinlock and wait on the event + m_ClientWaiting = true; KeReleaseSpinLock(&m_SpinLock, OldIrql); rc = MyKeWaitForSingleObject(&m_Event, UserRequest, UserMode, false, NULL); if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { @@ -107,6 +109,7 @@ public: KeClearEvent(&m_Event); OldFlags = m_flags; ResetFlags(m_flags); + m_ClientWaiting = false; KeReleaseSpinLock(&m_SpinLock, OldIrql); rc = HandleFlags(OldFlags); if (!NT_SUCCESS(rc)) { @@ -154,6 +157,7 @@ Cleanup: } KeReleaseSpinLock(&m_SpinLock, OldIrql); if (SomethingToHandle(OldFlags)) { + ASSERT(m_InUse); rc = HandleFlags(OldFlags); if (!NT_SUCCESS(rc)) { // We have to signal the error to the calling side @@ -184,7 +188,7 @@ Cleanup: NTSTATUS rc = STATUS_SUCCESS; ASSERT(KeGetCurrentIrql() == DISPATCH_LEVEL); KeAcquireSpinLock(&m_SpinLock, &OldIrql); - if (m_InUse) { + if (m_InUse || m_ClientWaiting ) { m_flags |= flags; KeReleaseSpinLock(&m_SpinLock, OldIrql); return false; @@ -215,6 +219,7 @@ Cleanup: */ NTSTATUS HandleFlags(int flags) { NTSTATUS rc = STATUS_SUCCESS; + AssertLocked(); if (flags & SEND_CB_CALLED) { // We need to handle the send CB rc = m_SendCBHandler(m_pSdpSocket); @@ -228,18 +233,30 @@ Cleanup: return rc; } - VOID SignalShutdown() {ASSERT (FALSE);} //????????????? Make sure this is used - VOID SignalError(NTSTATUS rc) {ASSERT (FALSE);} //????????????? + VOID SignalShutdown() { + //??????? Verify use and correctnes + m_flags |= SHUTDOWN_SIGNALLED; + } + + bool IsShutdownSignaled() + { + return m_flags & SHUTDOWN_SIGNALLED ? true : false; + } + + VOID SignalError(NTSTATUS rc) {ASSERT (FALSE);} //???????????? + + VOID AssertLocked() {ASSERT(m_InUse);} - KEVENT m_Event; // the event for passive level threads - KSPIN_LOCK m_SpinLock; // The real guard of the lock + KEVENT m_Event; // the event for passive level threads + KSPIN_LOCK m_SpinLock; // The real guard of the lock SendCBHandler m_SendCBHandler; - bool m_InUse; // Tells if this lock has any user - int m_flags; // call backs that were recieved + bool m_InUse; // Tells if this lock has any user + int m_flags; // call backs that were recieved + bool m_ClientWaiting; // True if there is a client waiting to be served - SdpSocket *m_pSdpSocket; // The socket that this class depends on + SdpSocket *m_pSdpSocket;// The socket that this class depends on }; #endif // _SDP_LOCK_H diff --git a/trunk/ulp/sdp/kernel/SdpSocket.cpp b/trunk/ulp/sdp/kernel/SdpSocket.cpp index 807eee60..0a4e3271 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.cpp +++ b/trunk/ulp/sdp/kernel/SdpSocket.cpp @@ -75,6 +75,13 @@ SdpSocket::SdpSocket() m_state = SS_IDLE; } +VOID SdpSocket::AssertLocked() +{ +#if DBG + m_Lock.AssertLocked(); +#endif +} + NTSTATUS SdpSocket::Init( WspSocketIn *pSocketInParam, WspSocketOut *pSocketOutParam) @@ -157,6 +164,7 @@ NTSTATUS SdpSocket::WSPSend( goto Cleanup; } Locked = true; + ASSERT(pBuffersEvent == NULL); rc = m_SendBufferPool.GetBuffer(&pBufferDescriptor, &pBuffersEvent, First); if (!NT_SUCCESS(rc)) { @@ -183,7 +191,7 @@ NTSTATUS SdpSocket::WSPSend( FALSE, NULL ); - + pBuffersEvent = NULL; if (( rc == STATUS_ALERTED ) ||( rc == STATUS_USER_APC )) { // BUGBUG: Think what to do here, we should be able to stop the // connect, and quit (probably shutdown should be enough) @@ -201,7 +209,14 @@ NTSTATUS SdpSocket::WSPSend( ULONG CopySize = pBufferDescriptor->BufferSize - sizeof msg_hdr_bsdh; CopySize = min(CopySize, pWspSendIn->BufferSize - Coppied); - pBufferDescriptor->WriteData(pWspSendIn->pData + Coppied, CopySize); + rc = pBufferDescriptor->WriteData(pWspSendIn->pData + Coppied, CopySize); + if (!NT_SUCCESS(rc)) { + SDP_PRINT(SDP_ERR, SDP_SOCKET, ("pBufferDescriptor->WriteData failed rc = 0x%x\n", rc )); + // free the buffer that you have + m_SendBufferPool.ReturnBuffer(pBufferDescriptor); + m_Lock.Unlock(); // Error ignored as this is already an error pass + goto Cleanup; + } Coppied += CopySize; // return the data to the buffer @@ -235,6 +250,7 @@ Cleanup: return rc; } + #if 0 //Naive send implmentation. NTSTATUS SdpSocket::WSPSend( @@ -608,7 +624,7 @@ NTSTATUS SdpSocket::CmSendRTU() int MaxMessageSize = min(m_hello_ack.hah.l_rcv_size, MAX_SEND_BUFFER_SIZE); - rc = m_SendBufferPool.Init(MAX_SEND_PACKETS, QP_ATTRIB_SQ_DEPTH, MaxMessageSize, m_pd, m_qp, m_lkey); + rc = m_SendBufferPool.Init(MAX_SEND_PACKETS, QP_ATTRIB_SQ_DEPTH, MaxMessageSize, m_pd, m_qp, m_lkey, this); if (!NT_SUCCESS(rc)) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("m_SendBufferPool.Init failed rc = 0x%x\n", rc )); goto Cleanup; @@ -678,6 +694,9 @@ NTSTATUS SdpSocket::CmSendRTU() goto Cleanup; } + // We now start the recieve processing + RecieveOnce(); + Cleanup: return rc; } @@ -702,17 +721,117 @@ VOID SdpSocket::CmRepCallback(IN ib_cm_rep_rec_t *p_cm_rep_rec) } +//????????? Let's implmeant a naive read // BUGBUG: based on __recv_cb - need to implment static void __recv_cb1( - IN const ib_cq_handle_t h_cq, - IN void *cq_context ) + IN const ib_cq_handle_t h_cq, + IN void *cq_context ) { -// status = ib_rearm_cq( -// p_port->ib_mgr.h_recv_cq, FALSE ); -// ASSERT(exp)( status == IB_SUCCESS ); - ASSERT(FALSE); + SdpSocket *pSocket = (SdpSocket *) cq_context; + //pSocket->m_Lock.SignalCB(RECV_CB_CALLED); + + pSocket->__recv_cb2(); + } +char g_Recieve[6000]; + +VOID +SdpSocket::__recv_cb2() +{ + if (m_Lock.IsShutdownSignaled()) { + return; + } + + ib_api_status_t status; + ib_wc_t wc[QP_ATTRIB_RQ_DEPTH], *p_free, *p_wc1; + uint32_t pkt_cnt, recv_cnt = 0; + size_t i; + + + for( i = 0; i < QP_ATTRIB_RQ_DEPTH; i++ ) + wc[i].p_next = &wc[i + 1]; + wc[QP_ATTRIB_RQ_DEPTH - 1].p_next = NULL; + + do + { + /* If we get here, then the list of WCs is intact. */ + p_free = wc; + + status = ib_poll_cq( m_rcq, &p_free, &p_wc1 ); + CL_ASSERT( status == IB_SUCCESS || status == IB_NOT_FOUND ); + + /* Look at the payload now and filter ARP and DHCP packets. */ + //recv_cnt += __recv_mgr_filter( p_port, p_wc, &done_list, &bad_list ); + if (status == IB_SUCCESS) + { + ib_wc_t *p_wc; + for( p_wc = p_wc1; p_wc; p_wc = p_wc->p_next ) { + ASSERT( p_wc->status == IB_WCS_SUCCESS ); + int len = p_wc->length; + ASSERT(len >= sizeof msg_hdr_bsdh); + msg_hdr_bsdh *pHeader = (msg_hdr_bsdh *)g_Recieve; + sdp_msg_swap_bsdh(pHeader); + ASSERT(pHeader->mid == 0xff && pHeader->size == 0x10); + } + } + + } while( !p_free ); + + + /* Update our posted depth. */ + + /* Notify NDIS of any and all possible receive buffers. */ + + /* Only indicate receives if we actually had any. */ +// if( pkt_cnt ) + { +// NdisMIndicateReceivePacket( p_port->p_adapter->h_adapter, +// p_port->recv_mgr.recv_pkt_array, pkt_cnt ); + } + + /* Return any discarded receives to the pool */ + + /* Repost receives. */ + + RecieveOnce(); + + + /* + * Rearm after filtering to prevent contention on the enpoint maps + * and eliminate the possibility of having a call to + * __endpt_mgr_insert find a duplicate. + */ + status = ib_rearm_cq(m_rcq, FALSE ); + CL_ASSERT( status == IB_SUCCESS ); + + + +} + + + +VOID +SdpSocket::RecieveOnce() +{ + ib_local_ds_t l_ds; + + ib_recv_wr_t recv_wr; + recv_wr.p_next = NULL; + recv_wr.num_ds = 1; + recv_wr.wr_id = 123; + recv_wr.ds_array = & l_ds; + + l_ds.length = sizeof(g_Recieve); + l_ds.lkey = m_lkey; + l_ds.vaddr = MmGetPhysicalAddress( g_Recieve ).QuadPart; + + ib_api_status_t ib_status = ib_post_recv(m_qp, &recv_wr, NULL); + ASSERT(ib_status == IB_SUCCESS); + +} + + // TODO: Clear the callback functions mess void @@ -749,6 +868,10 @@ NTSTATUS SdpSocket::send_cb() p_free = wc; ib_status = ib_poll_cq( m_scq, &p_free, &p_wc ); ASSERT( ib_status == IB_SUCCESS || ib_status == IB_NOT_FOUND); + if (ib_status == IB_NOT_FOUND) { + SDP_PRINT(SDP_WARN, SDP_SOCKET, ("ib_poll_cq returned IB_NOT_FOUND, this =0x%x\n", this)); + break; + } if (ib_status != IB_SUCCESS) { SDP_PRINT(SDP_ERR, SDP_SOCKET, ("ib_poll_cq failed ib_status=%d, this =0x%x\n", ib_status,this)); ASSERT(ib_status == IB_INVALID_CQ_HANDLE || ib_status == IB_NOT_FOUND); @@ -1069,10 +1192,11 @@ VOID SdpSocket::CreateCmRequest( VOID SdpSocket::Shutdown() { + //???? locking // if(m_shutdown - on the lock) ??? ib_api_status_t ib_status; - + m_Lock.SignalShutdown(); SDP_PRINT(SDP_TRACE, SDP_SOCKET, ("SdpSocket::Shutdown called this = 0x%p\n", this)); diff --git a/trunk/ulp/sdp/kernel/SdpSocket.h b/trunk/ulp/sdp/kernel/SdpSocket.h index 97ffdea3..8895c395 100644 --- a/trunk/ulp/sdp/kernel/SdpSocket.h +++ b/trunk/ulp/sdp/kernel/SdpSocket.h @@ -12,11 +12,11 @@ It keeps a list of all the objects so we know when to remove them. #define _SDP_SOCKET_H const int MAX_SEND_BUFFER_SIZE = 32768; // This is the maximum send packet size -const int MAX_SEND_PACKETS = 40; // This is the maximum number of packets allocated per send +const int MAX_SEND_PACKETS = 800; // This is the maximum number of packets allocated per send -#define QP_ATTRIB_SQ_DEPTH 16 +#define QP_ATTRIB_SQ_DEPTH 32 #define QP_ATTRIB_SQ_SGE 1 /* Set based on inline data requirements */ /* @@ -127,7 +127,7 @@ public: // Used to allow the user file to remember us LIST_ENTRY m_UserFileList; -#ifdef DBG +#if DBG char * SS2String(SocketStates state) { switch (state) { case SS_IDLE : return "SS_IDLE"; @@ -141,7 +141,13 @@ public: } return "Unknown state"; } + #endif + + VOID AssertLocked(); + + VOID RecieveOnce(); //???????? remove me + VOID __recv_cb2(); }; diff --git a/trunk/ulp/sdp/kernel/SdpTrace.cpp b/trunk/ulp/sdp/kernel/SdpTrace.cpp index d69c2a2f..564a6ed2 100644 --- a/trunk/ulp/sdp/kernel/SdpTrace.cpp +++ b/trunk/ulp/sdp/kernel/SdpTrace.cpp @@ -5,7 +5,8 @@ BOOLEAN CheckCondition(int sev, int top, char *file, int line, char * func) { -// return FALSE; + if (sev < SDP_WARN) return FALSE; DbgPrint ("%s: ", func); + if (sev == SDP_ERR) DbgPrint ("ERROR - "); return TRUE; } \ No newline at end of file diff --git a/trunk/ulp/sdp/kernel/SdpTrace.h b/trunk/ulp/sdp/kernel/SdpTrace.h index 27526bd7..e3e642eb 100644 --- a/trunk/ulp/sdp/kernel/SdpTrace.h +++ b/trunk/ulp/sdp/kernel/SdpTrace.h @@ -23,10 +23,13 @@ BOOLEAN CheckCondition(int sev, int top, char *file, int line, char * func); +#if DBG #define SDP_PRINT(sev, toppic ,var_args) \ if (CheckCondition(sev, toppic ,__FILE__, __LINE__,__FUNCTION__) != FALSE) \ DbgPrint var_args - +#else +#define SDP_PRINT(sev, toppic ,var_args) +#endif #endif //H_SDP_TRACE_H diff --git a/trunk/ulp/sdp/todo b/trunk/ulp/sdp/todo index 3fcf50d1..42b8391d 100644 --- a/trunk/ulp/sdp/todo +++ b/trunk/ulp/sdp/todo @@ -2,10 +2,12 @@ This file includes things that should be impreoved in the sdp implmentation ============================================================================= KERNEL MODE: -1) On send: implmeant some kind of a negal algorithm. -2) On send: Create some kind of mechanism that will allow to recieve complitions on more than -one send. - + send: + 1) On send: implmeant some kind of a negal algorithm. + 2) On send: Create some kind of mechanism that will allow to recieve complitions on more than + one send. + 3) If possibale, post more than one send. + 4) -- 2.41.0