*/\r
\r
#include <comp_channel.h>\r
+#include <process.h>\r
\r
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry);\r
static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
\r
+\r
+/*\r
+ * Completion manager\r
+ */\r
+\r
+static unsigned __stdcall CompThreadPoll(void *Context)\r
+{\r
+ COMP_MANAGER *mgr = (COMP_MANAGER *) Context;\r
+ COMP_ENTRY *entry;\r
+ OVERLAPPED *overlap;\r
+ DWORD bytes;\r
+ ULONG_PTR key;\r
+\r
+ while (mgr->Run) {\r
+ GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key,\r
+ &overlap, INFINITE);\r
+ entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);\r
+\r
+ if (entry->Channel) {\r
+ CompChannelQueue(entry->Channel, entry);\r
+ } else {\r
+ CompManagerQueue(mgr, entry);\r
+ }\r
+ }\r
+\r
+ _endthreadex(0);\r
+ return 0;\r
+}\r
+\r
DWORD CompManagerOpen(COMP_MANAGER *pMgr)\r
{\r
+ DWORD ret;\r
+\r
+ InitializeCriticalSection(&pMgr->Lock);\r
+ pMgr->Busy = 0;\r
+ DListInit(&pMgr->DoneList);\r
+ CompEntryInit(NULL, &pMgr->Entry);\r
+\r
pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);\r
if (pMgr->CompQueue == NULL) {\r
- return GetLastError();\r
+ ret = GetLastError();\r
+ goto err1;\r
}\r
\r
pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);\r
if (pMgr->Event == NULL) {\r
- return GetLastError();\r
+ ret = GetLastError();\r
+ goto err2;\r
}\r
\r
- pMgr->Lock = 0;\r
+ pMgr->Run = TRUE;\r
+ pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 0, NULL);\r
+ if (pMgr->Thread == NULL) {\r
+ ret = GetLastError();\r
+ goto err3;\r
+ }\r
return 0;\r
+\r
+err3:\r
+ CloseHandle(pMgr->Event);\r
+err2:\r
+ CloseHandle(pMgr->CompQueue);\r
+err1:\r
+ DeleteCriticalSection(&pMgr->Lock); \r
+ return ret;\r
}\r
\r
void CompManagerClose(COMP_MANAGER *pMgr)\r
{\r
+ pMgr->Run = FALSE;\r
+ CompManagerCancel(pMgr);\r
+ WaitForSingleObject(pMgr->Thread, INFINITE);\r
+ CloseHandle(pMgr->Thread);\r
+\r
CloseHandle(pMgr->CompQueue);\r
CloseHandle(pMgr->Event);\r
+ DeleteCriticalSection(&pMgr->Lock); \r
}\r
\r
DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)\r
return (cq == NULL) ? GetLastError() : 0;\r
}\r
\r
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)\r
+{\r
+ EnterCriticalSection(&pMgr->Lock);\r
+ DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList);\r
+ SetEvent(pMgr->Event);\r
+ LeaveCriticalSection(&pMgr->Lock);\r
+}\r
+\r
+static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)\r
+{\r
+ EnterCriticalSection(&pMgr->Lock);\r
+ DListRemove(&pEntry->MgrEntry);\r
+ LeaveCriticalSection(&pMgr->Lock);\r
+}\r
+\r
DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,\r
COMP_CHANNEL **ppChannel)\r
{\r
COMP_ENTRY *entry;\r
- OVERLAPPED *overlap;\r
- DWORD bytes, ret;\r
- ULONG_PTR key;\r
+ DWORD ret = 0;\r
\r
- if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,\r
- Milliseconds)) {\r
- entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);\r
- *ppChannel = entry->Channel;\r
- CompChannelQueue(entry->Channel, entry);\r
- ret = 0;\r
- } else {\r
- ret = GetLastError();\r
+ EnterCriticalSection(&pMgr->Lock);\r
+ while (DListEmpty(&pMgr->DoneList)) {\r
+ ResetEvent(pMgr->Event);\r
+ LeaveCriticalSection(&pMgr->Lock);\r
+ \r
+ ret = WaitForSingleObject(pMgr->Event, Milliseconds);\r
+ if (ret) {\r
+ return ret;\r
+ }\r
+\r
+ EnterCriticalSection(&pMgr->Lock);\r
}\r
+\r
+ entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry);\r
+ *ppChannel = entry->Channel;\r
+ if (entry->Channel == NULL) {\r
+ DListRemove(&entry->MgrEntry);\r
+ InterlockedExchange(&entry->Busy, 0);\r
+ ret = ERROR_CANCELLED;\r
+ }\r
+ LeaveCriticalSection(&pMgr->Lock);\r
+\r
return ret;\r
}\r
\r
+void CompManagerCancel(COMP_MANAGER *pMgr)\r
+{\r
+ if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) {\r
+ PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr,\r
+ &pMgr->Entry.Overlap);\r
+ }\r
+}\r
+\r
+\r
+/*\r
+ * Completion channel\r
+ */\r
\r
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)\r
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)\r
{\r
pChannel->Manager = pMgr;\r
pChannel->Head = NULL;\r
pChannel->TailPtr = &pChannel->Head;\r
- InitializeCriticalSection(&pChannel->Lock);\r
pChannel->Milliseconds = Milliseconds;\r
+\r
+ pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL);\r
+ if (pChannel->Event == NULL) {\r
+ return GetLastError();\r
+ }\r
+\r
+ InitializeCriticalSection(&pChannel->Lock);\r
+ CompEntryInit(pChannel, &pChannel->Entry);\r
+ return 0;\r
}\r
\r
void CompChannelCleanup(COMP_CHANNEL *pChannel)\r
{\r
+ CloseHandle(pChannel->Event);\r
DeleteCriticalSection(&pChannel->Lock); \r
}\r
\r
return entry;\r
}\r
\r
+static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+ COMP_ENTRY **entry_ptr, *entry;\r
+\r
+ EnterCriticalSection(&pChannel->Lock);\r
+ entry_ptr = &pChannel->Head;\r
+ while (*entry_ptr && *entry_ptr != pEntry) {\r
+ entry_ptr = &(*entry_ptr)->Next;\r
+ }\r
+\r
+ if (*entry_ptr != NULL) {\r
+ *entry_ptr = pEntry->Next;\r
+ if (pChannel->TailPtr == &pEntry->Next) {\r
+ pChannel->TailPtr = entry_ptr;\r
+ }\r
+ CompManagerRemoveEntry(pChannel->Manager, pEntry);\r
+ InterlockedExchange(&pEntry->Busy, 0);\r
+ }\r
+ LeaveCriticalSection(&pChannel->Lock);\r
+ return *entry_ptr;\r
+}\r
+\r
static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
{\r
pEntry->Next = NULL;\r
EnterCriticalSection(&pChannel->Lock);\r
+ CompManagerQueue(pChannel->Manager, pEntry);\r
CompChannelInsertTail(pChannel, pEntry);\r
+ SetEvent(pChannel->Event);\r
LeaveCriticalSection(&pChannel->Lock);\r
}\r
\r
DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)\r
{\r
- COMP_MANAGER *mgr = pChannel->Manager;\r
- COMP_CHANNEL *chan;\r
- DWORD ret = 0;\r
- ULONG locked;\r
+ COMP_ENTRY *entry;\r
+ DWORD ret;\r
\r
EnterCriticalSection(&pChannel->Lock);\r
while (pChannel->Head == NULL) {\r
+ ResetEvent(pChannel->Event);\r
LeaveCriticalSection(&pChannel->Lock);\r
\r
- locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);\r
- if (locked == 0) {\r
- ResetEvent(mgr->Event);\r
- ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);\r
- InterlockedExchange(&mgr->Lock, 0);\r
- SetEvent(mgr->Event);\r
- } else {\r
- ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);\r
- }\r
+ ret = WaitForSingleObject(pChannel->Event, pChannel->Milliseconds);\r
if (ret) {\r
- goto out;\r
+ return ret;\r
}\r
\r
EnterCriticalSection(&pChannel->Lock);\r
}\r
- *ppEntry = CompChannelRemoveHead(pChannel);\r
+ entry = CompChannelRemoveHead(pChannel);\r
+ CompManagerRemoveEntry(pChannel->Manager, entry);\r
LeaveCriticalSection(&pChannel->Lock);\r
\r
-out:\r
+ InterlockedExchange(&entry->Busy, 0);\r
+ *ppEntry = entry;\r
+ ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0;\r
+\r
return ret;\r
}\r
\r
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+void CompChannelCancel(COMP_CHANNEL *pChannel)\r
{\r
- COMP_CHANNEL *chan;\r
- COMP_ENTRY **entry_ptr;\r
- DWORD ret;\r
-\r
- do {\r
- ret = CompManagerPoll(pChannel->Manager, 0, &chan);\r
- } while (!ret);\r
- SetEvent(pChannel->Manager->Event);\r
-\r
- EnterCriticalSection(&pChannel->Lock);\r
- entry_ptr = &pChannel->Head;\r
- while (*entry_ptr && *entry_ptr != pEntry) {\r
- entry_ptr = &(*entry_ptr)->Next;\r
- }\r
-\r
- if (*entry_ptr != NULL) {\r
- *entry_ptr = pEntry->Next;\r
- if (pChannel->TailPtr == &pEntry->Next) {\r
- pChannel->TailPtr = entry_ptr;\r
- }\r
+ if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) {\r
+ PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0,\r
+ (ULONG_PTR) pChannel, &pChannel->Entry.Overlap);\r
}\r
- LeaveCriticalSection(&pChannel->Lock);\r
}\r
\r
+\r
+/*\r
+ * Completion entry\r
+ */\r
+\r
void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
{\r
+ RtlZeroMemory(pEntry, sizeof *pEntry);\r
pEntry->Channel = pChannel;\r
}\r
\r
DWORD CompEntryPost(COMP_ENTRY *pEntry)\r
{\r
- if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,\r
- &pEntry->Overlap)) {\r
- return 0;\r
- } else {\r
- return GetLastError();\r
+ if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) {\r
+ if (!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue,\r
+ 0, 0, &pEntry->Overlap)) {\r
+ InterlockedExchange(&pEntry->Busy, 0);\r
+ return GetLastError();\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+void CompEntryCancel(COMP_ENTRY *pEntry)\r
+{\r
+ while (pEntry->Busy) {\r
+ Sleep(0);\r
+ CompChannelFindRemove(pEntry->Channel, pEntry);\r
}\r
}\r