Thanks a lot for the replies Washu and Antheus.
Quote:First off, try and reduce the number of synchronizations to zero. Things like heap allocation management (new/delete) will quickly bog you down as each aquires and releases a lock.
This was one of the first mistakes I made at the time of the post. I had not only a critical section but also a map I was accessing in debug mode. I removed both of those, since they were not needed as well as a second critical section I was using to manage the connection pool (list). I changed my design to not allow any more connection be added to the pool once the server was running, which is fine for me. As of right now, I don't think I have any additional synchronizations in the code. I'll keep that in mind for the future as I make improvements in the code.
Quote:Also, try attaching a profiler and seeing where you're spending the majority of your time. One critical thing to understand is that Windows XP/Vista does not behave the same as say Windows Server 2003. A prime example is that of the thread quantum, which in windows XP is set to 2, and on windows server 2003 is at 12. Or in other words, a Windows XP thread gets around 20 - 30ms of processor time, while a Server 2003 thread gets around 120 - 180ms of processor time. Note that you can change your PC to use long quantums, but doing so will impact the general performance of foreground applications such as VS.
Thanks for pointing out those numbers between the two OS's. I was thinking a real server would have a better network card than I have, but I wasn't considering the software implications. I'll spend some time today checking out profilers as I've not done any profiling yet, just looking at the results I am generating. I'm not trying to optimize everything yet without having profiled, I'm just trying to understand the results I am getting so I don't feel too bad having not profiled yet. Those numbers to help a lot in understanding why my testing setup is a bit inadequate for this type of code.
Quote:Check memory usage too, if you're paging frequently then you'll find things will slow down significantly. There are ways to manage this, such as maintaining locality of data. Try and keep everything relevant to an event nearby, rather than scattered all over the place.
As connections are accepted in the server (I ran 700 total), the Page Fault Delta was between 1-9 with no real average except it was more on the low side than high most of the time. As soon as the connections were done accepting, the page fault delta remained at 0. From server start to before accepting the first connection it generated a few thousand page faults, but since that was only on start up I assume that's 'ok'.
I should also mention that the design I am using is based on preallocating the maximum number of connections resources at startup. Once the server is running, the only real resource usages have to do with the sockets being closed and recreated for reuse and the new IOCP association on the new socket. The network connection data is never deallocated and reallocted, it's simply reused.
Quote:See how you pass around messages and process them. Avoid large loops or other similar structures that take a while to run through. Something as simple as finding all players within a certain radius of a point can be quite time consuming unless you use a proper data structure to reduce the amount of searching you do.
I'll keep this in mind for when I add more code that is game related. Originally I was using Boost's Circular Buffer, Singleton Pool, and Windows APCs to handle passing raw network data into a single network processing thread that did all the work. The reason for that was to keep the networking system only doing the network event handling and not have worker threads do any data processing themselves (which I was reading is something to avoid).
Quote:32 bytes per second per client is not that realistic of a number unless you're looking at a mostly event driven RPG of some kind. Eve-Online can get away with that (or less) in many cases, but once combat begins, you can expect surges of packets as players (or macros) attempt to perform operations fairly rapidly. Also, chat text is generally longer than 32 bytes, and may appear at any time (generally).
Good point, I'll write a few more tests that add more data in shorter send periods to try and stress test those conditions. I don't actually have a target game in mind, but I'd like to be able to say: you can expect to have acceptable results if you are handling X players and on average they are sending Y bytes per Z time intervals. I want to be able to understand how to test such code and be able to explain the results in a meaningful way, as opposed to how it's mostly done nowadays without context. For example, Torque3D claims up to 1000+ players per server and I know that's more like a marketing gimmick as it has no context.
Quote:Avoid using sleep or other methods to surrender your time-slice. Design your system away from calls like that, as long lasting completion calls tend to bog down the system. Look at using lock free containers if you need something like tasklet message passing.
I'll definitely avoid Sleep. Right now I do not have any calls in the server. I do not have any other functions that "wait" aside from the GetQueuedCompletionStatus in each worker thread and a WaitForSingleObject in the main thread to allow me to run a console server. I had been looking into lock free containers, but only briefly. I started with ApochPiQ's thread
Double-check my lock-free voodoo and ventured into Win32 Asynchronous Procedure Calls. For this testing though, I went ahead and just removed all that stuff and only did data churning.
Quote:Original post by Antheus
How are you throttling the sends? Did you try increasing socket receive buffer
I experienced that throttling and then the cascading failure as clients just obliterated the network with my traffic (trying to send 512 bytes each 100ms). I was originally running a couple of exes that created a whole bunch of connections, but that resulted in the bursting you mentioned. At the time I also had tried adding a the random Sleep between sends, but with a high number of connections, it just didn't seem to be working like it should.
I switched over to simply creating one process per connection on two laptops via CreateProcess (to cut back on resources used compared to launching via a batch script) and that seemed to give me more of a "natural" testing case.
I do understand the problems of having 500 processes per PC all trying to do nothing but send network data continuously, but since it's all on the LAN, I was thinking it should work out well since the data does not have to travel too far. This is the complete testing code for reference, it's pretty simple:
#include <winsock2.h>#include <windows.h>#include <conio.h>#include <stdio.h>#include <iostream>#pragma comment(lib, "ws2_32.lib")bool bExit = false;BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent){ switch (ConsoleEvent) { case CTRL_LOGOFF_EVENT: case CTRL_C_EVENT: case CTRL_BREAK_EVENT: case CTRL_CLOSE_EVENT: case CTRL_SHUTDOWN_EVENT: { bExit = true; return TRUE; } } return FALSE;}int main(int argc, char * argv[]){ srand(GetTickCount()); int connectionCount = 1; if(argc > 1) // Limit connections to [1, 4096] { connectionCount = atoi(argv[1]); if(connectionCount < 0) connectionCount = 1; if(connectionCount > 4096) connectionCount = 4096; } sockaddr_in server; server.sin_addr.s_addr = inet_addr(""); // hard coded my internet IP so I can run on another network too server.sin_family = AF_INET; server.sin_port = htons((u_short)15779); // hard code port WSADATA wsadata = {0}; WSAStartup(MAKEWORD(2,2), &wsadata); SetConsoleCtrlHandler(ConsoleHandler, TRUE); SOCKET * sockets = new SOCKET[connectionCount]; for(int x = 0; x < connectionCount; ++x) { sockets[x] = INVALID_SOCKET; } for(int x = 0; x < connectionCount && !bExit; ++x) { sockets[x] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(sockets[x] != INVALID_SOCKET) { if(connect(sockets[x], (struct sockaddr*)&server, sizeof(server))) { closesocket(sockets[x]); sockets[x] = INVALID_SOCKET; printf("Could not establish a connection on socket %i / %i\n", x + 1, connectionCount); } } else { printf("Could not create socket %i / %i\n", x + 1, connectionCount); continue; } printf("Adding connection %i / %i\n", x + 1, connectionCount); //Sleep(50); // Delay a little so we do not flood the AcceptEx calls on the server // This was taken out to further stress test the server in processing many // AcceptExs at once; it seems to be able to handle it just fine. } const int MAX_SEND_BYTES = 32; char buffer[MAX_SEND_BYTES] = {0}; while(!bExit) { // Even though I've coded for multiple connections, I am only running // one connection per program for(int x = 0; x < connectionCount && !bExit; ++x) { if(sockets[x] != INVALID_SOCKET) { int sendBytes = MAX_SEND_BYTES; // Sometimes made this 'rand() % ' to simulate different actions for(int q = 0; q < sendBytes; ++q) // Randomize data to make it look more legitimate across network buffer[q] = rand() % 256; int result = send(sockets[x], buffer, sendBytes, 0); if(result == -1) { printf("Socket %i failed to send.\n", x); bExit = true; } //Sleep(100 + rand() % 100); // Originally I was trying to slow down sends this way, but // that resulted in a serial behavior and not a concurrent send behavior. } } Sleep(1000); // This is the sleep I use now between } for(int x = 0; x < connectionCount; ++x) { if(sockets[x] != INVALID_SOCKET) { shutdown(sockets[x], SD_BOTH); closesocket(sockets[x]); sockets[x] = INVALID_SOCKET; } printf("Removing connection %i / %i\n", x + 1, connectionCount); } delete [] sockets; WSACleanup(); SetConsoleCtrlHandler(ConsoleHandler, FALSE); return 0;}
In my previous post I also mentioned a second testing program. Here is that one I use to launch the previous program to simulate load:
#include <windows.h>#include <stdio.h>int main(int argc, char * argv[]){ srand(GetTickCount()); int count = 1; if(argc >= 2) { count = atoi(argv[1]); if(count < 1) count = 1; } char curDir[MAX_PATH + 1] = {0}; GetCurrentDirectoryA(MAX_PATH, curDir); printf("Creating %i processes.\n", count); for(int x = 0; x < count; ++x) { STARTUPINFOA si = {0}; PROCESS_INFORMATION pi = {0}; si.cb = sizeof(STARTUPINFO); CreateProcessA(0, "IocpTestClient.exe", 0, NULL, FALSE, 0, NULL, curDir, &si, π); Sleep(rand() % 500); } return 0;}
The second program allows me to pump out new connections as the connections already launched are already sending data to the server so I'm getting a test case of "the server just opened, the gamers are flooding in to play". I just run a batch file that passes the desired program count down the line (I'm only spawning 1 connection per program launched). Even on my dual core laptops (one XP via ethernet, one 64bit Vista via wireless) it seems to handle the process load fine. I have ran into some slowdowns from Nod32 and SpyBot checking data so I try to turn those off at times, but sometimes I just leave them on.
As for the receive buffers on the server, I've not touched the internal TCP buffers. I just left them at their defaults. On my machine, 8kb is reported by getsockopt for both. My actual receiving buffer that I pass to WSARecv is 4kb. I didn't try making it 8kb simply because I didn't think it'd help since I'm only sending out 32b/s now and 320b/s on the older saturating test.
Based on what you see here, is my test setup "alright"? I know the results will be somewhat skewed since it's all on the local network, but I viewed that as a perfect test case of assuming everyone on the server had low latency to the server and was able to pump out a decent amount of data at once.
Quote:Original post by hplus0603
First: That's an awesome list of quotes! I'm flattered :-)
Saying "thanks" isn't enough for all the advice you've given people over the years, but thanks! Likewise to Washu and Antheus, whom have contributed greatly to this section with their real world experiences. I actually added all those quotes to have them in one thread to refer people to in the future when they ask me about specifics in this area. But anyways,
Quote:Getting a service time of 5 seconds for each client at 500 clients with 32 bytes per second sounds too long, unless your machine is an old single-core machine with too little memory. Or if you're running the test clients on the same machine as the server, that would also severely impact the numbers.
I thought the same thing. As mentioned above (I append my reply to your reply, so I know you've not gotten a chance to read what I've not yet posted), I was indeed running the test clients on the same machine as the server, bad me, I know. I moved them over to laptops and noticed better performance. That is, after fixing some serious mistakes in my code that I had added before making the post and forgot to remove.
Quote:Just do the math: if you get one read every 5 seconds, for 500 clients, that means 100 clients per second, or 20 milliseconds per client if you're using a dual-core server. What on Earth are you doing for 20 milliseconds when you're processing 32 bytes of data!? That sounds like the time needed to serve several page faults. Are you running out of physical memory?
That's the funny thing about it and I can't figure it out. After moving the test clients to the laptops, I can get about 600 before the long service times kick in; 500 stay under 5s processing. I know because I have code that checks to see if the time between reads is over 50% the timeout and it only triggers once I start approaching 600+ (spread out across 2 laptops, not all running on one). Memory usage stays at ~13mb, which is how much space is needed to setup the server to handle 1500 connections. Each connection structure is pretty light, only 112 bytes, but both have a 4kb send/recv buffer for data processing that runs up the total. The page faults, as mentioned in response to Washu is also constantly 0 (well as reported by Process Explorer on the Virtual Memory section). The only time page faults happen it seems is when I'm accepting connections.
I don't think I'm over stressing my network. I'm on a 8/2 Comcast Cable plan and have a decent router that's yet to fail on me (the
60->600 one featured on Life Hacker). I can on average upload 300-350kb/s and downloads usually cap out around 2MB/s depending on the server. I am still running all those clients across 2 dual core laptops, so perhaps the problem lies I'm just over-taxing them? I don't think I am watching Task Manager as the CPUs never max out on creating all the processes and they don't ever get fully utilized once they are all in their send loops. In fact, CPU usage drops to 2-4% total when running 500 clients on that laptop and service time is under 5s (not sure exact values as I've not made a visual client that reported the data, only consoles that scale better)
This desktop computer is of a server grade, it's just running XP. Quad core Xeon 3350 @ 2.66ghz (can take to 3.0 stably) and 3.5 gb ram usable by Windows (8gb actually installed). Ram is running at 400mhz with 5-5-5-15 timings. Motherboard is X38-DS4. I know the system is under utilized with XP, but due to other development reasons, I've had to stay on 32bit Windows throughout the end of 2008. I will probably be upgrading to 64bit Windows 7 in the next couple of months when the RC comes out.
Quote:However, I think this thread is too sprawling. Let's focus on one thing per thread :-)
I'd be interested in seeing what Intel VTune or a similar tool says about the CPU usage on your server when you're getting 20 ms per client processing times. 20 ms on a 2 GHz machine is 40,000,000 instructions. Wooah! You can probably ray-trace a sphere-over-checkerboard scene in that amount of computation :-)
I agree, I stuffed too much into it. I'll edit the title to reflect IOCP related stuff and save the rest for later. I'll get VTune soon and post those results after getting it setup. Intel has a 30 day trial I'll checkout.
In addition. I'll go ahead and just post code since I'm beginning to think it's just my testing setup that is the problem now rather than the code itself. I added in references used in the code, it is pretty much minimal It's all self contained in one file as well. I choose to go the 0-byte posted recv to cater tiwards more connections, although after learning the natural connection limit, I might switch to the other method as discussed in Network Programming for Microsoft Windows.
I'm not asking anyone to waste their time checking my code, but if you are interested and see anything that just looks 'wrong', I'd love to know. I've been following the advice presented in the Network Programming book as well as having consulted other resources and threads on how it should be done and I didn't see anything that looked bad, but I am still new to all this.
/* A lot of resources were consulted and used in this code. Major resources used include: MSDN http://win32.mvps.org/network/sockhim.html Network Programming for Microsoft Windows CodeProject's IOCP articles http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx http://www.codeproject.com/KB/IP/iocp.aspx Larger blocks of comments are mostly from the tbe second reference. I used comments from that project to help understand the particulars of IOCP.*/#include <winsock2.h>#include <mswsock.h>#include <windows.h>#include <list>#include <map>#include <vector>#include <algorithm>#include <iostream>#pragma comment(lib, "ws2_32.lib")// Logical states for the overlapped structureconst int HPS_CONNECTION_STATE_CLOSED = 0;const int HPS_CONNECTION_STATE_ACCEPT = 1;const int HPS_CONNECTION_STATE_READ = 2;const int HPS_CONNECTION_STATE_WRITE = 3;// Max bytes for the recv bufferconst int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 4096;// Max bytes for the send bufferconst int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 4096;// The size of the sockaddr_in parameterconst int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);struct tHighPerformanceServerData;struct tWorkerThreadData;struct tWorkerThreadWrapperData{ tHighPerformanceServerData * serverData; tWorkerThreadData * threadData;};struct tConnectionGlobalData{ LPFN_ACCEPTEX lpfnAcceptEx; LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; SOCKET listenSocket; HANDLE hCompletionPort; DWORD dwNumberOfConcurrentThreads; DWORD dwReadTimeTimeout; DWORD dwWriteTimeTimeout; DWORD dwAcceptTimeTimeout; int initialReceiveSize; LONG * plUidBase; tConnectionGlobalData() : lpfnAcceptEx(NULL), listenSocket(INVALID_SOCKET), hCompletionPort(INVALID_HANDLE_VALUE), dwNumberOfConcurrentThreads(0), lpfnGetAcceptExSockaddrs(NULL), dwReadTimeTimeout(-1), dwWriteTimeTimeout(-1), dwAcceptTimeTimeout(5000), plUidBase(0), initialReceiveSize(0) { }};struct tConnectionData{public: OVERLAPPED overlapped; DWORD dwUid; SOCKET socket_; sockaddr_in address; WORD sendBufferSize; BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE]; BYTE sendBufferData[HPS_OVERLAPPED_BUFFER_SEND_SIZE]; INT connectionState; sockaddr_in localPeer; sockaddr_in remotePeer; DWORD dwLastReadTime; DWORD dwLastWriteTime; tConnectionGlobalData * globalDataPtr;public: tConnectionData() : socket_(INVALID_SOCKET), connectionState(HPS_CONNECTION_STATE_CLOSED), sendBufferSize(0), dwLastReadTime(0), dwLastWriteTime(0), dwUid(-1), globalDataPtr(0) { memset(&overlapped, 0, sizeof(overlapped)); memset(&address, 0, sizeof(address)); memset(&localPeer, 0, sizeof(localPeer)); memset(&remotePeer, 0, sizeof(remotePeer)); } ~tConnectionData() { } bool Initialize() { int result = 0; connectionState = HPS_CONNECTION_STATE_CLOSED; memset(&overlapped, 0, sizeof(overlapped)); memset(&localPeer, 0, sizeof(localPeer)); memset(&remotePeer, 0, sizeof(remotePeer)); socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if(socket_ == INVALID_SOCKET) { // TODO: Handle error return false; } // Set the socket to non-blocking so we can post 0 byte recvs and handle more connections u_long iMode = 1; ioctlsocket(socket_, FIONBIO, &iMode); DWORD numberOfBytes = 0; // Not used in this mode if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE) { DWORD dwError = GetLastError(); if(dwError != ERROR_IO_PENDING) { closesocket(socket_); socket_ = INVALID_SOCKET; // TODO: Handle error return false; } } // Update the state the connection is in connectionState = HPS_CONNECTION_STATE_ACCEPT; // Success return true; } void Close(bool force, bool reuse) { if(socket_ != INVALID_SOCKET) { struct linger li = {0, 0}; if(force == true) // Default: SO_DONTLINGER { li.l_onoff = 1; // SO_LINGER, timeout = 0 } setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li)); closesocket(socket_); socket_ = INVALID_SOCKET; } connectionState = HPS_CONNECTION_STATE_CLOSED; if(reuse == true) { if(Initialize() == false) { // TODO: Handle error __asm nop } } } void ProcessIO(DWORD numberOfBytes) { switch(connectionState) { case HPS_CONNECTION_STATE_CLOSED: { // We won't need to do anything since the scavenger thread // will handle this for us. __asm nop } break; case HPS_CONNECTION_STATE_ACCEPT: { sockaddr_in *plocal = 0; sockaddr_in *premote = 0; int locallen = 0; int remotelen = 0; globalDataPtr->lpfnGetAcceptExSockaddrs(recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, (sockaddr **)&plocal, &locallen, (sockaddr **)&premote, &remotelen); memcpy(&localPeer, plocal, sizeof(sockaddr_in)); memcpy(&remotePeer, premote, sizeof(sockaddr_in)); setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket)); // we still need to associate the newly connected socket to our IOCP: HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads); if(hResult != globalDataPtr->hCompletionPort) { // TODO: Handle error return; } //printf("Incoming Connection - [%s:%i] to [%s:%i]\n", inet_ntoa(remotePeer.sin_addr), ntohs(remotePeer.sin_port), inet_ntoa(localPeer.sin_addr), ntohs(localPeer.sin_port)); dwLastReadTime = dwLastWriteTime = GetTickCount(); if(globalDataPtr->initialReceiveSize == 0) { // if we get here, then the initial receive size must have been 0, therefore n must be 0, with nothing at all in our receive buffer. // we can therefore blithely allow SendReply() to overwrite n (and other things). dwUid = InterlockedIncrement(globalDataPtr->plUidBase); _snprintf_s((char*)sendBufferData, HPS_OVERLAPPED_BUFFER_SEND_SIZE, HPS_OVERLAPPED_BUFFER_SEND_SIZE - 1, "Hello, world.\r\n"); sendBufferSize = 1 + strlen((char*)sendBufferData); PostWrite(); } else { // We received something during AcceptEx() if(numberOfBytes != 0) { dwUid = InterlockedIncrement(globalDataPtr->plUidBase); dwLastReadTime = GetTickCount(); // // TODO: Process Data // PostRead(); return; } // we should never get here: if SEND_BANNER_FIRST was undefined, // then AcceptEx() was told to receive at least one byte, and it // would not have returned without that byte (unless the scavenger // force-closed the socket, in which case the AcceptEx() result // would have been an error, handled before ever calling DoIo(). // TODO: Handle error __asm nop } } break; case HPS_CONNECTION_STATE_READ: { int result = 0; DWORD byteCount = 0; DWORD recvFlags = 0; WSABUF recvBufferDescriptor = {0}; while(result != -1) // Empty our the recv buffers { recvFlags = 0; byteCount = 0; recvBufferDescriptor.len = HPS_OVERLAPPED_BUFFER_RECV_SIZE - numberOfBytes; recvBufferDescriptor.buf = (char*)(recvBufferData + numberOfBytes); result = WSARecv(socket_, &recvBufferDescriptor, 1, &byteCount, &recvFlags, 0, 0); if(byteCount == 0) break; // No more data to read numberOfBytes += byteCount; } if(WSAGetLastError() != WSAEWOULDBLOCK) { // TODO: Handle error __asm nop } dwLastReadTime = GetTickCount(); if(numberOfBytes == SOCKET_ERROR) { // TODO: Log error Close(true, true); return; } else if(numberOfBytes == 0) // connection closing? { // TODO: Log error Close(false, true); return; } // // TODO: Process Data // PostRead(); } break; case HPS_CONNECTION_STATE_WRITE: { dwLastWriteTime = GetTickCount(); if(numberOfBytes == SOCKET_ERROR) { // TODO: Log error Close(true, true); return; } else if(numberOfBytes == 0) // connection closing? { // TODO: Log error Close(false, true); return; } PostRead(); } break; } } void PostRead() { connectionState = HPS_CONNECTION_STATE_READ; WSABUF recvBufferDescriptor = {0}; DWORD numberOfBytes = 0; DWORD recvFlags = 0; // needed by WSARecv() BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, 0); if(result != SOCKET_ERROR) // everything is OK { return; } else { if(GetLastError() != ERROR_IO_PENDING) { // TODO: Handle error Close(true, true); } else { // (else branch intentionally empty) // if we get here, gle == ERROR_IO_PENDING, which is fine by me } } } void PostWrite() // TODO: Rework this logic { BOOL result; DWORD numberOfBytes = 0; WSABUF sendBufferDescriptor; connectionState = HPS_CONNECTION_STATE_WRITE; sendBufferDescriptor.len = sendBufferSize; sendBufferDescriptor.buf = (char*)sendBufferData; result = WSASend(socket_, &sendBufferDescriptor, 1, &numberOfBytes, 0, &overlapped, 0); result = (result != SOCKET_ERROR); // WSASend() uses inverted logic wrt/WriteFile() if(!result) { DWORD err = WSAGetLastError(); if(err != ERROR_IO_PENDING) { // TODO: Handle error Close(true, true); // the fall-through does nothing because the caller // shouldn't post another read -- the reinitialized // socket has an AcceptEx() pending } else { // (else branch intentionally empty) // if we get here, gle == ERROR_IO_PENDING; nothing // left to do but return. Caller loops back to GQCS(). } } else // WriteFile() { // the write completed immediately // this doesn't bother us -- we will still // get the completion packet } }};struct tWorkerThreadData{public: HANDLE hThread; DWORD dwThreadId;public: tWorkerThreadData() : hThread(INVALID_HANDLE_VALUE), dwThreadId(0) { } ~tWorkerThreadData() { }};struct tHighPerformanceServerData{public: WORD wPort; int backLog; HANDLE hCompletionPort; DWORD dwNumberOfConcurrentThreads; DWORD dwNumberOfWorkerThreads; LONG lRunningWorkerThreadCount; SOCKET sListenSocket; SOCKADDR_IN saInternetAddr; GUID GuidAcceptEx; LPFN_ACCEPTEX lpfnAcceptEx; GUID GuidGetAcceptExSockaddrs; LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; CRITICAL_SECTION workerThreadCS; std::list<tWorkerThreadData *> workerThreads; DWORD dwInitialConnectionPoolCount; std::list<tConnectionData *> connectionPool; HANDLE hScavengerThread; DWORD dwScavengerThreadId; DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger HANDLE hScavengerExitEvent; // tells scavenger thread when to die DWORD dwWorkerThreadScaleValue; LONG lUidBase; tConnectionGlobalData globalData;public: tHighPerformanceServerData() : hCompletionPort(INVALID_HANDLE_VALUE), dwNumberOfConcurrentThreads(1), dwNumberOfWorkerThreads(2), lRunningWorkerThreadCount(0), sListenSocket(INVALID_SOCKET), wPort(0), lpfnAcceptEx(NULL), lpfnGetAcceptExSockaddrs(NULL), dwInitialConnectionPoolCount(1500), dwScavengerDelay(1000), hScavengerExitEvent(NULL), hScavengerThread(INVALID_HANDLE_VALUE), dwScavengerThreadId(0), dwWorkerThreadScaleValue(1), lUidBase(0), backLog(SOMAXCONN) { GUID guidAcceptEx = WSAID_ACCEPTEX; memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx)); GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS; memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs)); InitializeCriticalSection(&workerThreadCS); } ~tHighPerformanceServerData() { DeleteCriticalSection(&workerThreadCS); } int WorkerThread() { BOOL result = 0; DWORD numberOfBytes = 0; ULONG key = 0; OVERLAPPED * lpOverlapped = 0; InterlockedIncrement(&lRunningWorkerThreadCount); while(true) { tConnectionData * connectionData = 0; InterlockedDecrement(&lRunningWorkerThreadCount); result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE); if(key == -1) { break; // Time to exit the worker thread } connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped); if(connectionData == 0) { // TODO: Handle error continue; } InterlockedIncrement(&lRunningWorkerThreadCount); if(result == TRUE) { // We have an I/O to process connectionData->ProcessIO(numberOfBytes); } else { // Close this socket and make space for a new one if we are still listening connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true)); } } return 0; } int ScavengerThread() { while(true) { int count[4] = {0}; std::list<tConnectionData *>::iterator itr = connectionPool.begin(); while(itr != connectionPool.end()) { if(WaitForSingleObject(hScavengerExitEvent, 0) != WAIT_TIMEOUT) { printf("ScavengerThread requested to quit during logic loop.\n"); break; } tConnectionData * connection = (*itr); count[connection->connectionState]++; // AcceptEx() called, but no completion yet if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT) { // determine if the socket is connected int seconds = 0; int length = sizeof(seconds); if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length)) { if(seconds != -1) { seconds *= 1000; if(seconds > (int)globalData.dwAcceptTimeTimeout) { printf("[%i][Accept] idle timeout after %i ms.\n", connection->dwUid, seconds); // closesocket() here causes an immediate IOCP notification with an error indication; // that will cause our worker thread to call Close(). closesocket(connection->socket_); connection->socket_ = INVALID_SOCKET; connection->connectionState = HPS_CONNECTION_STATE_CLOSED; } } // No connection made on this socket yet else if(seconds == -1) { // Nothing to do } } } // At times, a connection can fail and not be able to be reconnected due to connection // flooding. This check will reclaim our connections that are locked in the closed state // and never get a chance to recover. else if(connection->connectionState == HPS_CONNECTION_STATE_CLOSED) { connection->Close(true, true); } // The client is in a read or write state, doesn't matter which. We want to make sure // activity still exists as desired. else { bool doClose = false; DWORD tick = GetTickCount(); DWORD dwLastTime; dwLastTime = tick - connection->dwLastReadTime; if(dwLastTime > globalData.dwReadTimeTimeout) { printf("[%i][Read] idle timeout after %i ms.\n", connection->dwUid, dwLastTime); doClose = true; } else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5)) { printf("[%i][Read] %i ms\n", connection->dwUid, dwLastTime); } dwLastTime = tick - connection->dwLastWriteTime; if(dwLastTime > globalData.dwWriteTimeTimeout) { printf("[%X][Write] idle timeout after %i ms.\n", connection->dwUid, dwLastTime); doClose = true; } else if(dwLastTime > ((float)globalData.dwWriteTimeTimeout * .5)) { printf("[%i][Write] %i ms\n", connection->dwUid, dwLastTime); } if(doClose) { closesocket(connection->socket_); connection->socket_ = INVALID_SOCKET; connection->connectionState = HPS_CONNECTION_STATE_CLOSED; } } itr++; } printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]); // Pause until next run due DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay); if(result != WAIT_TIMEOUT) { break; } } return 0; } DWORD AddConnectionsToPool(long count) { // We cannot add more connections once the server has started if(hScavengerThread != INVALID_HANDLE_VALUE) { return 0; } DWORD total = 0; for(long index = 0; index < count; ++index) { tConnectionData * connection = new tConnectionData; connection->globalDataPtr = &globalData; bool result = connection->Initialize(); if(result == true) { connectionPool.push_back(connection); total++; } else { // TODO: Handle error delete connection; } } return total; }};DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam){ tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam; DWORD dwResult = data->serverData->WorkerThread(); LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS; EnterCriticalSection(pCS); std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin(); while(itr != data->serverData->workerThreads.end()) { tWorkerThreadData * td = (*itr); if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread) { printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId); data->serverData->workerThreads.erase(itr); break; } itr++; } delete data->threadData; delete data; LeaveCriticalSection(pCS); return dwResult;}DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam){ return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();}bool InitializeWinsock(){ WSADATA wd = { 0 }; if(WSAStartup(MAKEWORD(2, 2), &wd) != 0) { // TODO: Handle error return false; } if(LOBYTE( wd.wVersion ) < 2) { WSACleanup(); // TODO: Handle error return false; } return true;}void DeinitializeWinsock(){ WSACleanup();}// Our high performance server :)class cHighPerformanceServer{private: tHighPerformanceServerData * internalData;public: cHighPerformanceServer() { internalData = new tHighPerformanceServerData; } ~cHighPerformanceServer() { delete internalData; } bool Create(unsigned short port) { // Get the system information SYSTEM_INFO SystemInfo; GetSystemInfo(&SystemInfo); // Try to create an I/O completion port internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads); if(internalData->hCompletionPort == NULL) { // TODO: Log error Destroy(); return false; } // Calculate how many worker threads we should create to process IOCP events DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads; if(internalData->dwNumberOfWorkerThreads == 0) { if(internalData->dwNumberOfConcurrentThreads == 0) { dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue; } else { dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue; } } // Create the worker threads! DWORD dwWorkerTotal = AddWorkerThreads(dwNumberOfWorkerThreads); if(dwWorkerTotal != dwNumberOfWorkerThreads) { // TODO: Log error Destroy(); return false; } internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if(internalData->sListenSocket == INVALID_SOCKET) { // TODO: Log error Destroy(); return false; } // Bind the socket to the port internalData->wPort = port; internalData->saInternetAddr.sin_family = AF_INET; internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY); internalData->saInternetAddr.sin_port = htons(internalData->wPort); int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr)); if(bindResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } int listenResult = listen(internalData->sListenSocket, internalData->backLog); if(listenResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } DWORD dwBytes = 0; int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx, sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL); if(ioctlResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } dwBytes = 0; ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs, sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL); if(ioctlResult == SOCKET_ERROR) { // TODO: Log error Destroy(); return false; } // Assign the global data for our connections internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx; internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs; internalData->globalData.listenSocket = internalData->sListenSocket; internalData->globalData.hCompletionPort = internalData->hCompletionPort; internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads; internalData->globalData.plUidBase = &internalData->lUidBase; internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable internalData->globalData.dwWriteTimeTimeout = -1; // TODO: Variable internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx // If we wanted to accept data from AcceptEx //internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2); DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount); if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount) { // TODO: Log error Destroy(); return false; } // Connect the listener socket to IOCP if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0) { // TODO: Log error Destroy(); return false; } internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0); if(internalData->hScavengerExitEvent == NULL) { // TODO: Log error Destroy(); return false; } internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId); if(internalData->hScavengerThread == NULL) { // TODO: Log error Destroy(); return false; } DWORD dwResult = ResumeThread(internalData->hScavengerThread); if(dwResult == (DWORD)-1) { // TODO: Log error Destroy(); __asm nop } // Success! return true; } void Destroy() { if(internalData->hScavengerExitEvent != NULL) { SetEvent(internalData->hScavengerExitEvent); if(internalData->hScavengerThread != INVALID_HANDLE_VALUE) { int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2); if(result != WAIT_OBJECT_0) { // TODO: Log error __asm nop } CloseHandle(internalData->hScavengerThread); internalData->hScavengerThread = INVALID_HANDLE_VALUE; } CloseHandle(internalData->hScavengerExitEvent); internalData->hScavengerExitEvent = NULL; } if(internalData->sListenSocket != INVALID_SOCKET) { closesocket(internalData->sListenSocket); internalData->sListenSocket = INVALID_SOCKET; } std::vector<HANDLE> workerThreadHandles; std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin(); while(itr != internalData->workerThreads.end()) { workerThreadHandles.push_back((*itr)->hThread); itr++; } // if(internalData->hCompletionPort != INVALID_HANDLE_VALUE) { EnterCriticalSection(&internalData->workerThreadCS); size_t count = internalData->workerThreads.size(); for(size_t x = 0; x < count; ++x) { PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0); } LeaveCriticalSection(&internalData->workerThreadCS); } // Wait for all worker threads to close for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS) { DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x); DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000); if(dwResult != WAIT_OBJECT_0) { // TODO: Log error __asm nop } } if(internalData->workerThreads.size()) { // TODO: Log error printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size()); } if(internalData->connectionPool.size()) { std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin(); while(itr != internalData->connectionPool.end()) { closesocket((*itr)->socket_); delete (*itr); itr++; } internalData->connectionPool.clear(); } if(internalData->hCompletionPort != INVALID_HANDLE_VALUE) { CloseHandle(internalData->hCompletionPort); internalData->hCompletionPort = INVALID_HANDLE_VALUE; } } DWORD AddWorkerThreads(DWORD count) { DWORD total = 0; for(DWORD index = 0; index < count; ++index) { tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData; tWorkerThreadData * threadData = new tWorkerThreadData; threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId); if(threadData->hThread != NULL) { total++; EnterCriticalSection(&internalData->workerThreadCS); internalData->workerThreads.push_back(threadData); LeaveCriticalSection(&internalData->workerThreadCS); workerThreadData->serverData = internalData; workerThreadData->threadData = threadData; DWORD dwResult = ResumeThread(threadData->hThread); if(dwResult == (DWORD)-1) { // TODO: Handle error __asm nop } } else { delete workerThreadData; delete threadData; } } return total; } void RemoveWorkerThreads(DWORD count) { EnterCriticalSection(&internalData->workerThreadCS); count = min(count, internalData->workerThreads.size()); for(DWORD index = 0; index < count; ++index) { // Signal one worker to exit, whomever is unlucky enough to process this request dies PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0); } LeaveCriticalSection(&internalData->workerThreadCS); }};HANDLE exitEvent = 0;BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent){ switch (ConsoleEvent) { case CTRL_LOGOFF_EVENT: case CTRL_C_EVENT: case CTRL_BREAK_EVENT: case CTRL_CLOSE_EVENT: case CTRL_SHUTDOWN_EVENT: { if(exitEvent != 0) { SetEvent(exitEvent); return TRUE; } } } return FALSE;}int main(int argc, char * argv[]){ printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData)); if(InitializeWinsock() == false) return 0; cHighPerformanceServer server; if(server.Create(15779) == false) { return 0; } exitEvent = CreateEvent(0, TRUE, FALSE, 0); SetConsoleCtrlHandler(ConsoleHandler, TRUE); WaitForSingleObject(exitEvent, INFINITE); SetConsoleCtrlHandler(ConsoleHandler, FALSE); server.Destroy(); DeinitializeWinsock(); CloseHandle(exitEvent); return 0;}
Thanks again everyone [smile]