Quote:If you provide properly aligned and lockable buffers in the read, the kernel can read straight into your buffer, instead of going through a separate buffer, so a 0-byte read is by no means a guaranteed win IMO.
Ah, ok, that makes sense. Thanks for that explanation. I went ahead and just switched to the regular way of passing in the buffer and size to WSARecv. I also reread the topic on Scalable Server Architecture and see I was misinterpreting some of the information there; I should also not touch the SO_SNDBUF/SO_RCVBUF options on a socket since I will always have overlapped reads posted on a socket in this setup. I can see now in other setups, that might not be the case.
Quote:What kind of CPU and how much RAM are you using for this test server?
Here is information from CPU-Z (I'm running XP 32bit though, so 3.5gb is usable)
In terms of the test:
* Program is ran in release mode.
* The Initial memory is: 8.1mb (1000 preallocated connections)
* 500 connections from one laptop via LAN
Server:
* Memory usage to 9.5mb
* ~42kb/s traffic in to the server (reported by NetNeter)
* ~28kb/s traffic out of the server (not sure why though)
Clients
* ~42kb/s total traffic out
* ~28kb/s traffic in to the clients (not sure why either)
* Each client takes up 2.3mb on laptop, virtually no CPU time
* CPU utilization on laptop is ~2-4% from other system applications running
* Each client sends 32 bytes and Sleeps 1 second, 32 bytes/s output consistently
Results:
* Up to the ~500 connection mark, there is no service time of > 5s. As soon as 500 is hit, one or two will fire occasionally. If I try adding 100 more or so, more begin to have longer service times.
I also ran Wireshark during the test and I think I might see the real problem at hand. I ran a capture for just under 2 minutes to get the 500 clients connected (takes about 70s on one computer) and then added 100 more on another computer until they started timing out.
I applied a filter to the traffic, "tcp.analysis.ack_rtt > 1 && tcp.dstport == 15779" and starting around 50 seconds (which a little over 450 clients would be connected) the RTT to ACK for the packets raises to 1.5s. Towards the 80s mark, where a little over 600 clients would be connected, there are a whole bunch of "retransmissions" (lines that are black and red in Wireshark) and their RTT to ACK is 2s. Getting towards the 90s mark, there are a couple of entries that hit a RTT to ACK of 3s!
That would explain why the service time is gradually getting longer as more connections are being added and clients sporadically disconnect. I was running 500 clients per laptop, which it seems the network can't handle from one source.
If I split the test up into 2 x 250 connection parts and watch Wireshark, I see far fewer retransmits and never get any notifications of the delayed reads. If I try running 400 clients per computer, then right towards the 600 mark, I start getting more retransmits and longer service time delays in my program. As I hit almost 800 connections, the retransmits were filling up Wireshark and most of the connections were failing in my program due to longer service times.
I see now that my code seems to be more than suitable for handling a lot more connection and traffic, but my network and my current test setup is not. What would you suggest is the best way to go about testing code like this in general to avoid problems like this? I mean, when you are in an early stage and want to test the upper bounds of your code but have nothing much to lure random testers in and you need lots of traffic, do you just have to wait?
Let's say I wanted to try and pull off a larger test, would the problem lie in my router? I.e., if I setup a dedicated server to run on at home and got let's say 10 or so older computers (you know those P4 512mb Dell Optiplex ones) connected via LAN, do you think the router still couldn't handle it or are the computers themselves the problem?
Thanks for your continued help [smile]
Code wise, I've cleaned up the code a bit and fixed a few things that were no longer necessary. This code still is far from usable for anything serious, but I'm just adding it for anyone who stumbles upon the thread:
/* A lot of resources were consulted and used in this code. Major resources used include: MSDN http://win32.mvps.org/network/sockhim.html Network Programming for Microsoft Windows CodeProject's IOCP articles http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx http://www.codeproject.com/KB/IP/iocp.aspx Larger blocks of comments are mostly from the tbe second reference. I used comments from that project to help understand the particulars of IOCP.*/#include <winsock2.h>#include <mswsock.h>#include <windows.h>#include <list>#include <vector>#include <algorithm>#include <iostream>#pragma comment(lib, "ws2_32.lib")HANDLE hPacketProcessingThread = INVALID_HANDLE_VALUE;// Logical states for the overlapped structureconst int HPS_CONNECTION_STATE_CLOSED = 0;const int HPS_CONNECTION_STATE_ACCEPT = 1;const int HPS_CONNECTION_STATE_READ = 2;// Max bytes for the recv bufferconst int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 8192;// Max bytes for the send bufferconst int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 8192;// The size of the sockaddr_in parameterconst int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam);DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam);struct tHighPerformanceServerData;struct tWorkerThreadData;struct tWorkerThreadWrapperData{ tHighPerformanceServerData * serverData; tWorkerThreadData * threadData;};struct tConnectionLocalData{ DWORD dwUid; tConnectionLocalData() : dwUid(-1) { } ~tConnectionLocalData() { }};struct tConnectionGlobalData{ LPFN_ACCEPTEX lpfnAcceptEx; LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; SOCKET listenSocket; HANDLE hCompletionPort; DWORD dwNumberOfConcurrentThreads; DWORD dwReadTimeTimeout; DWORD dwAcceptTimeTimeout; int initialReceiveSize; tConnectionGlobalData() : lpfnAcceptEx(NULL), listenSocket(INVALID_SOCKET), hCompletionPort(INVALID_HANDLE_VALUE), dwNumberOfConcurrentThreads(0), lpfnGetAcceptExSockaddrs(NULL), dwReadTimeTimeout(-1), dwAcceptTimeTimeout(5000), initialReceiveSize(0) { }};struct tConnectionData{public: OVERLAPPED overlapped; SOCKET socket_; sockaddr_in address; WORD sendBufferSize; BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE]; INT connectionState; DWORD dwLastReadTime; tConnectionGlobalData * globalDataPtr; tConnectionLocalData * localDataPtr;public: tConnectionData(tConnectionGlobalData * gblData) : socket_(INVALID_SOCKET), connectionState(HPS_CONNECTION_STATE_CLOSED), sendBufferSize(0), dwLastReadTime(0), globalDataPtr(gblData), localDataPtr(0) { memset(&overlapped, 0, sizeof(overlapped)); memset(&address, 0, sizeof(address)); localDataPtr = new tConnectionLocalData; } ~tConnectionData() { delete localDataPtr; } bool Initialize() { connectionState = HPS_CONNECTION_STATE_CLOSED; if(socket_ != INVALID_SOCKET) // Prevent resource leaks { return Close(true, true); } socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if(socket_ == INVALID_SOCKET) { // TODO: Handle error return false; } // We still need to associate the newly connected socket to our IOCP: HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads); if(hResult != globalDataPtr->hCompletionPort) { // TODO: Handle error return false; } DWORD numberOfBytes = 0; // Not used in this mode if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE) { DWORD dwError = GetLastError(); if(dwError != ERROR_IO_PENDING) { closesocket(socket_); socket_ = INVALID_SOCKET; // TODO: Handle error return false; } } // Update the state the connection is in connectionState = HPS_CONNECTION_STATE_ACCEPT; // Success return true; } bool Close(bool force, bool reuse) { if(socket_ != INVALID_SOCKET) { struct linger li = {0, 0}; if(force == true) // Default: SO_DONTLINGER { li.l_onoff = 1; // SO_LINGER, timeout = 0 } setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li)); closesocket(socket_); socket_ = INVALID_SOCKET; } connectionState = HPS_CONNECTION_STATE_CLOSED; if(reuse == true) { return Initialize(); } return true; } void ProcessIO(DWORD numberOfBytes) { if(connectionState == HPS_CONNECTION_STATE_READ) { if(numberOfBytes == SOCKET_ERROR) { // TODO: Log error Close(true, true); return; } else if(numberOfBytes == 0) // connection closing? { // TODO: Log error Close(false, true); return; } dwLastReadTime = GetTickCount(); // // TODO: Process data sent from the client here // PostRead(); } else if(connectionState == HPS_CONNECTION_STATE_ACCEPT) { // On Windows XP and later, once the AcceptEx function completes and the SO_UPDATE_ACCEPT_CONTEXT option is set on the accepted socket, // the local address associated with the accepted socket can also be retrieved using the getsockname function. Likewise, the remote // address associated with the accepted socket can be retrieved using the getpeername function. setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket)); dwLastReadTime = GetTickCount(); if(globalDataPtr->initialReceiveSize != 0) { // // TODO: Process data sent from a ConnectEx call here // } // We are ready to start receiving from the client PostRead(); } } // This function will post a read operation on the socket. This means that an IOCP event // notification will be fired when the socket has data available for reading to it. void PostRead() { connectionState = HPS_CONNECTION_STATE_READ; WSABUF recvBufferDescriptor = {HPS_OVERLAPPED_BUFFER_RECV_SIZE, (char *)recvBufferData}; DWORD numberOfBytes = 0; DWORD recvFlags = 0; BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, NULL); if(result == SOCKET_ERROR) { if(GetLastError() != ERROR_IO_PENDING) { // TODO: Handle error Close(true, true); } } }};struct tWorkerThreadData{public: HANDLE hThread; DWORD dwThreadId;public: tWorkerThreadData() : hThread(INVALID_HANDLE_VALUE), dwThreadId(0) { } ~tWorkerThreadData() { }};struct tHighPerformanceServerData{public: WORD wPort; int backLog; HANDLE hCompletionPort; DWORD dwNumberOfConcurrentThreads; DWORD dwNumberOfWorkerThreads; LONG lRunningWorkerThreadCount; SOCKET sListenSocket; SOCKADDR_IN saInternetAddr; GUID GuidAcceptEx; LPFN_ACCEPTEX lpfnAcceptEx; GUID GuidGetAcceptExSockaddrs; LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; CRITICAL_SECTION workerThreadCS; std::list<tWorkerThreadData *> workerThreads; DWORD dwInitialConnectionPoolCount; std::list<tConnectionData *> connectionPool; HANDLE hScavengerThread; DWORD dwScavengerThreadId; DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger HANDLE hScavengerExitEvent; // tells scavenger thread when to die DWORD dwWorkerThreadScaleValue; tConnectionGlobalData globalData;public: tHighPerformanceServerData() : hCompletionPort(INVALID_HANDLE_VALUE), dwNumberOfConcurrentThreads(0), dwNumberOfWorkerThreads(0), lRunningWorkerThreadCount(0), sListenSocket(INVALID_SOCKET), wPort(0), lpfnAcceptEx(NULL), lpfnGetAcceptExSockaddrs(NULL), dwInitialConnectionPoolCount(1000), dwScavengerDelay(1000), hScavengerExitEvent(NULL), hScavengerThread(INVALID_HANDLE_VALUE), dwScavengerThreadId(0), dwWorkerThreadScaleValue(1), backLog(SOMAXCONN) { GUID guidAcceptEx = WSAID_ACCEPTEX; memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx)); GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS; memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs)); InitializeCriticalSection(&workerThreadCS); } ~tHighPerformanceServerData() { DeleteCriticalSection(&workerThreadCS); } int WorkerThread() { BOOL result = 0; DWORD numberOfBytes = 0; ULONG key = 0; OVERLAPPED * lpOverlapped = 0; InterlockedIncrement(&lRunningWorkerThreadCount); while(true) { tConnectionData * connectionData = 0; InterlockedDecrement(&lRunningWorkerThreadCount); result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE); if(key == -1) { break; // Time to exit the worker thread } connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped); if(connectionData == 0) { // TODO: Handle error continue; } InterlockedIncrement(&lRunningWorkerThreadCount); if(result == TRUE) { // We have an I/O to process connectionData->ProcessIO(numberOfBytes); } else { // Close this socket and make space for a new one if we are still listening connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true)); } } return 0; } int ScavengerThread() { while(true) { int count[4] = {0}; std::list<tConnectionData *>::iterator itr = connectionPool.begin(); while(itr != connectionPool.end()) { tConnectionData * connection = (*itr); count[connection->connectionState]++; // AcceptEx() called, but no completion yet if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT) { // determine if the socket is connected int seconds = 0; int length = sizeof(seconds); if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length)) { if(seconds != -1) { seconds *= 1000; if(seconds > (int)globalData.dwAcceptTimeTimeout) { printf("[%i][Accept] idle timeout after %i ms.\n", connection->socket_, seconds); // closesocket() here causes an immediate IOCP notification with an error indication; // that will cause our worker thread to call Close(). closesocket(connection->socket_); connection->socket_ = INVALID_SOCKET; connection->connectionState = HPS_CONNECTION_STATE_CLOSED; } } // No connection made on this socket yet else if(seconds == -1) { // Nothing to do } } } // The client is in a read or write state, doesn't matter which. We want to make sure // activity still exists as desired. else { bool doClose = false; DWORD tick = GetTickCount(); DWORD dwLastTime = tick - connection->dwLastReadTime; if(dwLastTime > globalData.dwReadTimeTimeout) { printf("[%i][Read] idle timeout after %i ms.\n", connection->socket_, dwLastTime); doClose = true; } else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5)) { printf("[%i][Read] %i ms\n", connection->socket_, dwLastTime); } if(doClose) { closesocket(connection->socket_); connection->socket_ = INVALID_SOCKET; connection->connectionState = HPS_CONNECTION_STATE_CLOSED; } } itr++; } printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]); // Pause until next run due DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay); if(result != WAIT_TIMEOUT) { break; } } return 0; } DWORD AddConnectionsToPool(long count) { // We cannot add more connections once the server has started if(hScavengerThread != INVALID_HANDLE_VALUE) { return 0; } DWORD total = 0; for(long index = 0; index < count; ++index) { tConnectionData * connection = new tConnectionData(&globalData); bool result = connection->Initialize(); if(result == true) { connectionPool.push_back(connection); total++; } else { // TODO: Handle error delete connection; } } return total; } DWORD AddWorkerThreads(DWORD count) { DWORD total = 0; for(DWORD index = 0; index < count; ++index) { tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData; tWorkerThreadData * threadData = new tWorkerThreadData; threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId); if(threadData->hThread != NULL) { total++; EnterCriticalSection(&workerThreadCS); workerThreads.push_back(threadData); LeaveCriticalSection(&workerThreadCS); workerThreadData->serverData = this; workerThreadData->threadData = threadData; DWORD dwResult = ResumeThread(threadData->hThread); if(dwResult == (DWORD)-1) { // TODO: Handle error __asm nop } } else { delete workerThreadData; delete threadData; } } return total; }};DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam){ tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam; DWORD dwResult = data->serverData->WorkerThread(); LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS; EnterCriticalSection(pCS); std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin(); while(itr != data->serverData->workerThreads.end()) { tWorkerThreadData * td = (*itr); if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread) { printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId); data->serverData->workerThreads.erase(itr); break; } itr++; } delete data->threadData; delete data; LeaveCriticalSection(pCS); return dwResult;}DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam){ return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();}bool InitializeWinsock(){ WSADATA wd = { 0 }; if(WSAStartup(MAKEWORD(2, 2), &wd) != 0) { // TODO: Handle error return false; } if(LOBYTE( wd.wVersion ) < 2) { WSACleanup(); // TODO: Handle error return false; } return true;}void DeinitializeWinsock(){ WSACleanup();}// Our high performance server :)class cHighPerformanceServer{private: tHighPerformanceServerData * internalData;public: cHighPerformanceServer() { internalData = new tHighPerformanceServerData; } ~cHighPerformanceServer() { delete internalData; } bool Create(unsigned short port) { // Get the system information SYSTEM_INFO SystemInfo; GetSystemInfo(&SystemInfo); // Try to create an I/O completion port internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads); if(internalData->hCompletionPort == NULL) { // TODO: Log error Destroy(); return false; } // Calculate how many worker threads we should create to process IOCP events DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads; if(internalData->dwNumberOfWorkerThreads == 0) { if(internalData->dwNumberOfConcurrentThreads == 0) { dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue; } else { dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue; } } // Create the worker threads! DWORD dwWorkerTotal = internalData->AddWorkerThreads(dwNumberOfWorkerThreads); if(dwWorkerTotal != dwNumberOfWorkerThreads) { // TODO: Log error Destroy(); return false; } internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if(internalData->sListenSocket == INVALID_SOCKET) { // TODO: Log error Destroy(); return false; } // Bind the socket to the port internalData->wPort = port; internalData->saInternetAddr.sin_family = AF_INET; internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY); internalData->saInternetAddr.sin_port = htons(internalData->wPort); int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr)); if(bindResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } int listenResult = listen(internalData->sListenSocket, internalData->backLog); if(listenResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } DWORD dwBytes = 0; int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx, sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL); if(ioctlResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } dwBytes = 0; ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs, sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL); if(ioctlResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } // Assign the global data for our connections internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx; internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs; internalData->globalData.listenSocket = internalData->sListenSocket; internalData->globalData.hCompletionPort = internalData->hCompletionPort; internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads; internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx // If we wanted to accept data sent from ConnectEx via AcceptEx //internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2); DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount); if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount) { // TODO: Log error Destroy(); return false; } // Connect the listener socket to IOCP if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0) { // TODO: Log error Destroy(); return false; } internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0); if(internalData->hScavengerExitEvent == NULL) { // TODO: Log error Destroy(); return false; } internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId); if(internalData->hScavengerThread == NULL) { // TODO: Log error Destroy(); return false; } DWORD dwResult = ResumeThread(internalData->hScavengerThread); if(dwResult == (DWORD)-1) { // TODO: Log error Destroy(); __asm nop } // Success! return true; } void Destroy() { if(internalData->hScavengerExitEvent != NULL) { SetEvent(internalData->hScavengerExitEvent); if(internalData->hScavengerThread != INVALID_HANDLE_VALUE) { int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2); if(result != WAIT_OBJECT_0) { // TODO: Log error __asm nop } CloseHandle(internalData->hScavengerThread); internalData->hScavengerThread = INVALID_HANDLE_VALUE; } CloseHandle(internalData->hScavengerExitEvent); internalData->hScavengerExitEvent = NULL; } if(internalData->sListenSocket != INVALID_SOCKET) { closesocket(internalData->sListenSocket); internalData->sListenSocket = INVALID_SOCKET; } std::vector<HANDLE> workerThreadHandles; std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin(); while(itr != internalData->workerThreads.end()) { workerThreadHandles.push_back((*itr)->hThread); itr++; } // Clean up the worker threads waiting on the IOCP if(internalData->hCompletionPort != INVALID_HANDLE_VALUE) { EnterCriticalSection(&internalData->workerThreadCS); size_t count = internalData->workerThreads.size(); for(size_t x = 0; x < count; ++x) { PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0); } LeaveCriticalSection(&internalData->workerThreadCS); } // Wait for all worker threads to close for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS) { DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x); DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000); if(dwResult != WAIT_OBJECT_0) { // TODO: Log error __asm nop } } // Sanity check if(internalData->workerThreads.size()) { // TODO: Log error printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size()); } if(internalData->connectionPool.size()) { std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin(); while(itr != internalData->connectionPool.end()) { closesocket((*itr)->socket_); delete (*itr); itr++; } internalData->connectionPool.clear(); } if(internalData->hCompletionPort != INVALID_HANDLE_VALUE) { CloseHandle(internalData->hCompletionPort); internalData->hCompletionPort = INVALID_HANDLE_VALUE; } }};HANDLE exitEvent = 0;BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent){ switch (ConsoleEvent) { case CTRL_LOGOFF_EVENT: case CTRL_C_EVENT: case CTRL_BREAK_EVENT: case CTRL_CLOSE_EVENT: case CTRL_SHUTDOWN_EVENT: { if(exitEvent != 0) { SetEvent(exitEvent); return TRUE; } } } return FALSE;}int main(int argc, char * argv[]){ printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData)); if(InitializeWinsock() == false) return 0; cHighPerformanceServer server; if(server.Create(15779) == false) { return 0; } exitEvent = CreateEvent(0, TRUE, FALSE, 0); SetConsoleCtrlHandler(ConsoleHandler, TRUE); WaitForSingleObject(exitEvent, INFINITE); SetConsoleCtrlHandler(ConsoleHandler, FALSE); server.Destroy(); DeinitializeWinsock(); CloseHandle(exitEvent); return 0;}