Trying to get my IOCP code on par with expected performance

Started by
21 comments, last by aissp 15 years ago
Quote:If you provide properly aligned and lockable buffers in the read, the kernel can read straight into your buffer, instead of going through a separate buffer, so a 0-byte read is by no means a guaranteed win IMO.


Ah, ok, that makes sense. Thanks for that explanation. I went ahead and just switched to the regular way of passing in the buffer and size to WSARecv. I also reread the topic on Scalable Server Architecture and see I was misinterpreting some of the information there; I should also not touch the SO_SNDBUF/SO_RCVBUF options on a socket since I will always have overlapped reads posted on a socket in this setup. I can see now in other setups, that might not be the case.

Quote:What kind of CPU and how much RAM are you using for this test server?


Here is information from CPU-Z (I'm running XP 32bit though, so 3.5gb is usable)



In terms of the test:
* Program is ran in release mode.
* The Initial memory is: 8.1mb (1000 preallocated connections)
* 500 connections from one laptop via LAN

Server:
* Memory usage to 9.5mb
* ~42kb/s traffic in to the server (reported by NetNeter)
* ~28kb/s traffic out of the server (not sure why though)

Clients
* ~42kb/s total traffic out
* ~28kb/s traffic in to the clients (not sure why either)
* Each client takes up 2.3mb on laptop, virtually no CPU time
* CPU utilization on laptop is ~2-4% from other system applications running
* Each client sends 32 bytes and Sleeps 1 second, 32 bytes/s output consistently

Results:
* Up to the ~500 connection mark, there is no service time of > 5s. As soon as 500 is hit, one or two will fire occasionally. If I try adding 100 more or so, more begin to have longer service times.

I also ran Wireshark during the test and I think I might see the real problem at hand. I ran a capture for just under 2 minutes to get the 500 clients connected (takes about 70s on one computer) and then added 100 more on another computer until they started timing out.

I applied a filter to the traffic, "tcp.analysis.ack_rtt > 1 && tcp.dstport == 15779" and starting around 50 seconds (which a little over 450 clients would be connected) the RTT to ACK for the packets raises to 1.5s. Towards the 80s mark, where a little over 600 clients would be connected, there are a whole bunch of "retransmissions" (lines that are black and red in Wireshark) and their RTT to ACK is 2s. Getting towards the 90s mark, there are a couple of entries that hit a RTT to ACK of 3s!

That would explain why the service time is gradually getting longer as more connections are being added and clients sporadically disconnect. I was running 500 clients per laptop, which it seems the network can't handle from one source.

If I split the test up into 2 x 250 connection parts and watch Wireshark, I see far fewer retransmits and never get any notifications of the delayed reads. If I try running 400 clients per computer, then right towards the 600 mark, I start getting more retransmits and longer service time delays in my program. As I hit almost 800 connections, the retransmits were filling up Wireshark and most of the connections were failing in my program due to longer service times.

I see now that my code seems to be more than suitable for handling a lot more connection and traffic, but my network and my current test setup is not. What would you suggest is the best way to go about testing code like this in general to avoid problems like this? I mean, when you are in an early stage and want to test the upper bounds of your code but have nothing much to lure random testers in and you need lots of traffic, do you just have to wait?

Let's say I wanted to try and pull off a larger test, would the problem lie in my router? I.e., if I setup a dedicated server to run on at home and got let's say 10 or so older computers (you know those P4 512mb Dell Optiplex ones) connected via LAN, do you think the router still couldn't handle it or are the computers themselves the problem?

Thanks for your continued help [smile]

Code wise, I've cleaned up the code a bit and fixed a few things that were no longer necessary. This code still is far from usable for anything serious, but I'm just adding it for anyone who stumbles upon the thread:
/*	A lot of resources were consulted and used in this code. Major resources	used include:		MSDN		http://win32.mvps.org/network/sockhim.html		Network Programming for Microsoft Windows		CodeProject's IOCP articles			http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx			http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx			http://www.codeproject.com/KB/IP/iocp.aspx	Larger blocks of comments are mostly from the tbe second reference.	I used comments from that project to help understand the particulars 	of IOCP.*/#include <winsock2.h>#include <mswsock.h>#include <windows.h>#include <list>#include <vector>#include <algorithm>#include <iostream>#pragma comment(lib, "ws2_32.lib")HANDLE hPacketProcessingThread = INVALID_HANDLE_VALUE;// Logical states for the overlapped structureconst int HPS_CONNECTION_STATE_CLOSED = 0;const int HPS_CONNECTION_STATE_ACCEPT = 1;const int HPS_CONNECTION_STATE_READ = 2;// Max bytes for the recv bufferconst int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 8192;// Max bytes for the send bufferconst int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 8192;// The size of the sockaddr_in parameterconst int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam);DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam);struct tHighPerformanceServerData;struct tWorkerThreadData;struct tWorkerThreadWrapperData{	tHighPerformanceServerData * serverData;	tWorkerThreadData * threadData;};struct tConnectionLocalData{	DWORD dwUid;	tConnectionLocalData() :		dwUid(-1)	{	}	~tConnectionLocalData()	{	}};struct tConnectionGlobalData{	LPFN_ACCEPTEX lpfnAcceptEx;	LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;	SOCKET listenSocket;	HANDLE hCompletionPort;	DWORD dwNumberOfConcurrentThreads;	DWORD dwReadTimeTimeout;	DWORD dwAcceptTimeTimeout;	int initialReceiveSize;	tConnectionGlobalData() :		lpfnAcceptEx(NULL),		listenSocket(INVALID_SOCKET),		hCompletionPort(INVALID_HANDLE_VALUE),		dwNumberOfConcurrentThreads(0),		lpfnGetAcceptExSockaddrs(NULL),		dwReadTimeTimeout(-1),		dwAcceptTimeTimeout(5000),		initialReceiveSize(0)	{	}};struct tConnectionData{public:	OVERLAPPED overlapped;	SOCKET socket_;	sockaddr_in address;	WORD sendBufferSize;	BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE];	INT connectionState;	DWORD dwLastReadTime;	tConnectionGlobalData * globalDataPtr;	tConnectionLocalData * localDataPtr;public:	tConnectionData(tConnectionGlobalData * gblData) : 		socket_(INVALID_SOCKET),		connectionState(HPS_CONNECTION_STATE_CLOSED),		sendBufferSize(0),		dwLastReadTime(0),		globalDataPtr(gblData),		localDataPtr(0)	{		memset(&overlapped, 0, sizeof(overlapped));		memset(&address, 0, sizeof(address));		localDataPtr = new tConnectionLocalData;	}	~tConnectionData()	{		delete localDataPtr;	}	bool Initialize()	{		connectionState = HPS_CONNECTION_STATE_CLOSED;		if(socket_ != INVALID_SOCKET) // Prevent resource leaks		{			return Close(true, true);		}		socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);		if(socket_ == INVALID_SOCKET)		{			// TODO: Handle error			return false;		}		// We still need to associate the newly connected socket to our IOCP:		HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads);		if(hResult != globalDataPtr->hCompletionPort)		{			// TODO: Handle error			return false;		}		DWORD numberOfBytes = 0; // Not used in this mode		if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE)		{			DWORD dwError = GetLastError();			if(dwError != ERROR_IO_PENDING)			{				closesocket(socket_);				socket_ = INVALID_SOCKET;				// TODO: Handle error				return false;			}		}		// Update the state the connection is in		connectionState = HPS_CONNECTION_STATE_ACCEPT;		// Success		return true;	}	bool Close(bool force, bool reuse)	{		if(socket_ != INVALID_SOCKET)		{			struct linger li = {0, 0};			if(force == true) // Default: SO_DONTLINGER			{				li.l_onoff = 1; // SO_LINGER, timeout = 0			}			setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li));			closesocket(socket_);			socket_ = INVALID_SOCKET;		}		connectionState = HPS_CONNECTION_STATE_CLOSED;		if(reuse == true)		{			return Initialize();		}		return true;	}	void ProcessIO(DWORD numberOfBytes)	{		if(connectionState == HPS_CONNECTION_STATE_READ)		{			if(numberOfBytes == SOCKET_ERROR)			{				// TODO: Log error				Close(true, true);				return;			}			else if(numberOfBytes == 0) // connection closing?			{				// TODO: Log error				Close(false, true);				return;			}			dwLastReadTime = GetTickCount();			//			// TODO: Process data sent from the client here			//			PostRead();		}		else if(connectionState == HPS_CONNECTION_STATE_ACCEPT)		{			// On Windows XP and later, once the AcceptEx function completes and the SO_UPDATE_ACCEPT_CONTEXT option is set on the accepted socket, 			// the local address associated with the accepted socket can also be retrieved using the getsockname function. Likewise, the remote 			// address associated with the accepted socket can be retrieved using the getpeername function.			setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket));			dwLastReadTime = GetTickCount();			if(globalDataPtr->initialReceiveSize != 0)			{				//				// TODO: Process data sent from a ConnectEx call here				//			}			// We are ready to start receiving from the client			PostRead();		}	}	// This function will post a read operation on the socket. This means that an IOCP event	// notification will be fired when the socket has data available for reading to it.	void PostRead()	{		connectionState = HPS_CONNECTION_STATE_READ;		WSABUF recvBufferDescriptor = {HPS_OVERLAPPED_BUFFER_RECV_SIZE, (char *)recvBufferData};		DWORD numberOfBytes = 0;		DWORD recvFlags = 0;		BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, NULL);		if(result == SOCKET_ERROR)		{			if(GetLastError() != ERROR_IO_PENDING)			{				// TODO: Handle error				Close(true, true);			}		}	}};struct tWorkerThreadData{public:	HANDLE hThread;	DWORD dwThreadId;public:	tWorkerThreadData() : 		hThread(INVALID_HANDLE_VALUE),		dwThreadId(0)	{	}	~tWorkerThreadData()	{	}};struct tHighPerformanceServerData{public:	WORD wPort;	int backLog;	HANDLE hCompletionPort;	DWORD dwNumberOfConcurrentThreads;	DWORD dwNumberOfWorkerThreads;	LONG lRunningWorkerThreadCount;	SOCKET sListenSocket;	SOCKADDR_IN saInternetAddr;		GUID GuidAcceptEx;	LPFN_ACCEPTEX lpfnAcceptEx;		GUID GuidGetAcceptExSockaddrs;	LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;		CRITICAL_SECTION workerThreadCS;	std::list<tWorkerThreadData *> workerThreads;	DWORD dwInitialConnectionPoolCount;	std::list<tConnectionData *> connectionPool;	HANDLE hScavengerThread;	DWORD dwScavengerThreadId;	DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger	HANDLE hScavengerExitEvent; // tells scavenger thread when to die	DWORD dwWorkerThreadScaleValue;	tConnectionGlobalData globalData;public:	tHighPerformanceServerData() : 		hCompletionPort(INVALID_HANDLE_VALUE),		dwNumberOfConcurrentThreads(0),		dwNumberOfWorkerThreads(0),		lRunningWorkerThreadCount(0),		sListenSocket(INVALID_SOCKET),		wPort(0),		lpfnAcceptEx(NULL),		lpfnGetAcceptExSockaddrs(NULL),		dwInitialConnectionPoolCount(1000),		dwScavengerDelay(1000),		hScavengerExitEvent(NULL),		hScavengerThread(INVALID_HANDLE_VALUE),		dwScavengerThreadId(0),		dwWorkerThreadScaleValue(1),		backLog(SOMAXCONN)	{		GUID guidAcceptEx = WSAID_ACCEPTEX;		memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx));		GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;		memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs));		InitializeCriticalSection(&workerThreadCS);	}	~tHighPerformanceServerData()	{		DeleteCriticalSection(&workerThreadCS);	}	int WorkerThread()	{		BOOL result = 0;		DWORD numberOfBytes = 0;		ULONG key = 0;		OVERLAPPED * lpOverlapped = 0;		InterlockedIncrement(&lRunningWorkerThreadCount);		while(true)		{			tConnectionData * connectionData = 0;			InterlockedDecrement(&lRunningWorkerThreadCount);			result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE);			if(key == -1)			{				break; // Time to exit the worker thread			}			connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped);			if(connectionData == 0)			{				// TODO: Handle error				continue;			}			InterlockedIncrement(&lRunningWorkerThreadCount);			if(result == TRUE)			{				// We have an I/O to process				connectionData->ProcessIO(numberOfBytes);			}			else			{				// Close this socket and make space for a new one if we are still listening				connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true));			}		}		return 0;	}	int ScavengerThread()	{		while(true)		{			int count[4] = {0};			std::list<tConnectionData *>::iterator itr = connectionPool.begin();			while(itr != connectionPool.end())			{				tConnectionData * connection = (*itr);				count[connection->connectionState]++;				// AcceptEx() called, but no completion yet				if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT)				{					// determine if the socket is connected					int seconds = 0;					int length = sizeof(seconds);					if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length))					{						if(seconds != -1)						{							seconds *= 1000;							if(seconds > (int)globalData.dwAcceptTimeTimeout)							{								printf("[%i][Accept] idle timeout after %i ms.\n", connection->socket_, seconds);								// closesocket() here causes an immediate IOCP notification with an error indication; 								// that will cause our worker thread to call Close().								closesocket(connection->socket_);								connection->socket_ = INVALID_SOCKET;								connection->connectionState = HPS_CONNECTION_STATE_CLOSED;							}						}						// No connection made on this socket yet						else if(seconds == -1)						{							// Nothing to do						}					}				}				// The client is in a read or write state, doesn't matter which. We want to make sure				// activity still exists as desired.				else				{					bool doClose = false;					DWORD tick = GetTickCount();					DWORD dwLastTime = tick - connection->dwLastReadTime;					if(dwLastTime > globalData.dwReadTimeTimeout)					{						printf("[%i][Read] idle timeout after %i ms.\n", connection->socket_, dwLastTime);						doClose = true;					}					else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5))					{						printf("[%i][Read] %i ms\n", connection->socket_, dwLastTime);					}					if(doClose)					{						closesocket(connection->socket_);						connection->socket_ = INVALID_SOCKET;						connection->connectionState = HPS_CONNECTION_STATE_CLOSED;					}				}				itr++;			}			printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]);			// Pause until next run due			DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay);			if(result != WAIT_TIMEOUT)			{				break;			}		}		return 0;	}	DWORD AddConnectionsToPool(long count)	{		// We cannot add more connections once the server has started		if(hScavengerThread != INVALID_HANDLE_VALUE)		{			return 0;		}		DWORD total = 0;		for(long index = 0; index < count; ++index)		{			tConnectionData * connection = new tConnectionData(&globalData);			bool result = connection->Initialize();			if(result == true)			{				connectionPool.push_back(connection);				total++;			}			else			{				// TODO: Handle error				delete connection;			}		}		return total;	}	DWORD AddWorkerThreads(DWORD count)	{		DWORD total = 0;		for(DWORD index = 0; index < count; ++index)		{			tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData;			tWorkerThreadData * threadData = new tWorkerThreadData;			threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId);			if(threadData->hThread != NULL)			{				total++;				EnterCriticalSection(&workerThreadCS);				workerThreads.push_back(threadData);				LeaveCriticalSection(&workerThreadCS);				workerThreadData->serverData = this;				workerThreadData->threadData = threadData;				DWORD dwResult = ResumeThread(threadData->hThread);				if(dwResult == (DWORD)-1)				{					// TODO: Handle error					__asm nop				}			}			else			{				delete workerThreadData;				delete threadData;			}		}		return total;	}};DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam){	tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam;	DWORD dwResult = data->serverData->WorkerThread();	LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS;	EnterCriticalSection(pCS);	std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin();	while(itr != data->serverData->workerThreads.end())	{		tWorkerThreadData * td = (*itr);		if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread)		{			printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId);			data->serverData->workerThreads.erase(itr);			break;		}		itr++;	}	delete data->threadData;	delete data;	LeaveCriticalSection(pCS);	return dwResult;}DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam){	return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();}bool InitializeWinsock(){	WSADATA wd = { 0 };	if(WSAStartup(MAKEWORD(2, 2), &wd) != 0)	{		// TODO: Handle error		return false;	}	if(LOBYTE( wd.wVersion ) < 2)	{		WSACleanup();		// TODO: Handle error		return false;	}	return true;}void DeinitializeWinsock(){	WSACleanup();}// Our high performance server :)class cHighPerformanceServer{private:	tHighPerformanceServerData * internalData;public:	cHighPerformanceServer()	{		internalData = new tHighPerformanceServerData;	}	~cHighPerformanceServer()	{		delete internalData;	}	bool Create(unsigned short port)	{		// Get the system information		SYSTEM_INFO SystemInfo;		GetSystemInfo(&SystemInfo);		// Try to create an I/O completion port		internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads);		if(internalData->hCompletionPort == NULL)		{			// TODO: Log error			Destroy();			return false;		}		// Calculate how many worker threads we should create to process IOCP events		DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads;		if(internalData->dwNumberOfWorkerThreads == 0)		{			if(internalData->dwNumberOfConcurrentThreads == 0)			{				dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue;			}			else			{				dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue;			}		}		// Create the worker threads!		DWORD dwWorkerTotal = internalData->AddWorkerThreads(dwNumberOfWorkerThreads);		if(dwWorkerTotal != dwNumberOfWorkerThreads)		{			// TODO: Log error			Destroy();			return false;		}		internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);		if(internalData->sListenSocket == INVALID_SOCKET)		{			// TODO: Log error			Destroy();			return false;		}		// Bind the socket to the port		internalData->wPort = port;		internalData->saInternetAddr.sin_family = AF_INET;		internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);		internalData->saInternetAddr.sin_port = htons(internalData->wPort);		int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr));		if(bindResult == SOCKET_ERROR)		{			// TODO: Log error			Destroy();			return false;		}		int listenResult = listen(internalData->sListenSocket, internalData->backLog);		if(listenResult == SOCKET_ERROR)		{			// TODO: Log error			Destroy();			return false;		}		DWORD dwBytes = 0;		int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, 			&internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx, 			sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL);		if(ioctlResult == SOCKET_ERROR)		{			// TODO: Log error			Destroy();			return false;		}		dwBytes = 0;		ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, 			&internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs, 			sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL);		if(ioctlResult == SOCKET_ERROR)		{			// TODO: Log error			Destroy();			return false;		}		// Assign the global data for our connections		internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx;		internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs;		internalData->globalData.listenSocket = internalData->sListenSocket;		internalData->globalData.hCompletionPort = internalData->hCompletionPort;		internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads;		internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable		internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable		internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx		// If we wanted to accept data sent from ConnectEx via AcceptEx		//internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2);		DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount);		if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount)		{			// TODO: Log error			Destroy();			return false;		}		// Connect the listener socket to IOCP		if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0)		{			// TODO: Log error			Destroy();			return false;		}		internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0);		if(internalData->hScavengerExitEvent == NULL)		{			// TODO: Log error			Destroy();			return false;		}		internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId);		if(internalData->hScavengerThread == NULL)		{			// TODO: Log error			Destroy();			return false;		}		DWORD dwResult = ResumeThread(internalData->hScavengerThread);		if(dwResult == (DWORD)-1)		{			// TODO: Log error			Destroy();			__asm nop		}		// Success!		return true;	}	void Destroy()	{		if(internalData->hScavengerExitEvent != NULL)		{			SetEvent(internalData->hScavengerExitEvent);			if(internalData->hScavengerThread != INVALID_HANDLE_VALUE)			{				int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2);				if(result != WAIT_OBJECT_0)				{					// TODO: Log error					__asm nop				}				CloseHandle(internalData->hScavengerThread);				internalData->hScavengerThread = INVALID_HANDLE_VALUE;			}			CloseHandle(internalData->hScavengerExitEvent);			internalData->hScavengerExitEvent = NULL;		}		if(internalData->sListenSocket != INVALID_SOCKET)		{			closesocket(internalData->sListenSocket);			internalData->sListenSocket = INVALID_SOCKET;		}		std::vector<HANDLE> workerThreadHandles;		std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin();		while(itr != internalData->workerThreads.end())		{			workerThreadHandles.push_back((*itr)->hThread);			itr++;		}		// Clean up the worker threads waiting on the IOCP		if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)		{			EnterCriticalSection(&internalData->workerThreadCS);			size_t count = internalData->workerThreads.size();			for(size_t x = 0; x < count; ++x)			{				PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);			}			LeaveCriticalSection(&internalData->workerThreadCS);		}		// Wait for all worker threads to close		for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS)		{			DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x);			DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000);			if(dwResult != WAIT_OBJECT_0)			{				// TODO: Log error				__asm nop			}		}		// Sanity check		if(internalData->workerThreads.size())		{			// TODO: Log error			printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size());		}		if(internalData->connectionPool.size())		{			std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin();			while(itr != internalData->connectionPool.end())			{				closesocket((*itr)->socket_);				delete (*itr);				itr++;			}			internalData->connectionPool.clear();		}		if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)		{			CloseHandle(internalData->hCompletionPort);			internalData->hCompletionPort = INVALID_HANDLE_VALUE;		}	}};HANDLE exitEvent = 0;BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent){	switch (ConsoleEvent)	{		case CTRL_LOGOFF_EVENT:		case CTRL_C_EVENT:		case CTRL_BREAK_EVENT:		case CTRL_CLOSE_EVENT:		case CTRL_SHUTDOWN_EVENT:		{			if(exitEvent != 0)			{				SetEvent(exitEvent);				return TRUE;			}		}	}	return FALSE;}int main(int argc, char * argv[]){	printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData));	if(InitializeWinsock() == false)		return 0;	cHighPerformanceServer server;	if(server.Create(15779) == false)	{		return 0;	}	exitEvent = CreateEvent(0, TRUE, FALSE, 0);	SetConsoleCtrlHandler(ConsoleHandler, TRUE);	WaitForSingleObject(exitEvent, INFINITE);	SetConsoleCtrlHandler(ConsoleHandler, FALSE);	server.Destroy();	DeinitializeWinsock();	CloseHandle(exitEvent);	return 0;}
Advertisement
Quote:Original post by Drew_Benton
That would explain why the service time is gradually getting longer as more connections are being added and clients sporadically disconnect. I was running 500 clients per laptop, which it seems the network can't handle from one source.

The network can handle it fine, the desktop cannot. 500 connections coming from one machine is quite a large workload, especially if it then has to queue up and send data on those connections. Spreading out your connections across many machines should improve your test case, and allow you to implement other tests more easily as well.

Also, and I can't find a reference for this so take it with a grain of salt, I recall reading somewhere that Windows XP networking stack has certain limitations that server OS ones do not.
Quote:
If I split the test up into 2 x 250 connection parts and watch Wireshark, I see far fewer retransmits and never get any notifications of the delayed reads. If I try running 400 clients per computer, then right towards the 600 mark, I start getting more retransmits and longer service time delays in my program. As I hit almost 800 connections, the retransmits were filling up Wireshark and most of the connections were failing in my program due to longer service times.

I see now that my code seems to be more than suitable for handling a lot more connection and traffic, but my network and my current test setup is not. What would you suggest is the best way to go about testing code like this in general to avoid problems like this? I mean, when you are in an early stage and want to test the upper bounds of your code but have nothing much to lure random testers in and you need lots of traffic, do you just have to wait?

Let's say I wanted to try and pull off a larger test, would the problem lie in my router? I.e., if I setup a dedicated server to run on at home and got let's say 10 or so older computers (you know those P4 512mb Dell Optiplex ones) connected via LAN, do you think the router still couldn't handle it or are the computers themselves the problem?

I doubt your router is the problem. While most consumer routers are fairly limited in their internet bandwidth abilities, their internal switches can generally operate fairly well (and you should be hitting only the switch if you're on the local network). Spreading out the connections to be coming from multiple clients will create a more realistic test scenario. Configuring your host OS to be a server class one (even a 180 day trial of Server 2008 will do) would help as well.

In time the project grows, the ignorance of its devs it shows, with many a convoluted function, it plunges into deep compunction, the price of failure is high, Washu's mirth is nigh.

Thanks Washu, that's great information to know. I think I'll go that route of testing on the Server OS with a few more computers across the lan. I'll post some more results in a couple days or so after I get everything setup and test again. I might also invest some time in making the code 64bit compatible (or at least make sure it works in 64bit mode) as well so I can get a 2-for-1 deal as well.

I might as well get this type of work done now, because I'm going to have to pickup some more testing hardware anyways to work through the distributed computing and making a simple program that can work across multiple computers at once anyways. I won't go crazy or anything, just get some cheap basics that should work out fine.

Thanks again everyone, I'll keep you updated. [smile]
Although you're measuring latency while sending small amounts of data, I don't see any NO_DELAY option.
Quote:Original post by Antheus
Although you're measuring latency while sending small amounts of data, I don't see any NO_DELAY option.


Thanks, I forgot about that flag in the test client. That seemed to help a little but having set that flag really emphasizes the problem with the testing hardware/OS combination on the server.

I ran another large test trying to get 750 from one computer to make sure nothing changed and sure enough, the higher the client count got, the more TCP Retransmissions flooded the screen with longer RTT to ACKs. In addition, I was getting a flood of duplicate ACKs, which to me looks like there is too much activity going on for the hardware on the test clients as it's not churning through the data fast enough. Similarly, another test with 400 clients on each computer resulted in the same behavior, so I'm certain now about the OS theory.

I don't expect to be able to get more testing hardware or a server setup until some time next week, so for now I'll keep working on the small case examples using the IOCP code using what I have. I still have to solve some more problems dealing with concurrency and stream processing. That's ok since this is really all for fun and learning; I'm not in a rush or have deadlines looming. I've also started already doing my initial research and development on making my small distributed/cluster code to tackle the scalability issues I originally talked about. So I have plenty to do until I can do a larger more effective test. When I do, I'll make sure to write up the results and hardware used for reference.

Thanks everyone, this thread has been a great benefit to me!
That's an interesting topic, I'm interested to see if/how you'll solve your problem.

I don't know if it's too complex or not for you to test, but maybe you could try to change your network system to use a single UDP socket on the server to which the clients can send/receive data. The main point here would be to test if you're hitting a TCP socket count overhead/limit somewhere. With UDP your packets won't be reliable anymore, but since it's just for a test, it shouldn't matter..

Y.
Quote:Original post by Ysaneya
That's an interesting topic, I'm interested to see if/how you'll solve your problem.


Me too! I'm hoping a trial run on a server will turn out positive, but I'm still deciding on how to go about that test. I need my own server estup, which I've had planned for a while so I was just going to setup my own this upcoming week, but before making that kind of plunge, I've had to spend a lot of time researching options and making future considerations: A) Buy a refurbished server 1U unit from geeks.com, load up server OS, and test. B) Lease a dedicated server for a month and test on that so it's more "real world testing". C) Make a future investment and simply build my own via Newegg.

As much as I want to setup my own on my own local network, I don't know enough about the process to where I feel like I'd be making an educated buying purchase, so I'm leaning towards doing a 30 day lease of a dedicated for the purpose of testing this stuff. The other ideas I had about the distributed computing and shards/mmo architecture I think I can get away with on my local network using my existing desktop and laptops so no worries there as those goals aren't based on being able to act as real servers. This IOCP stuff though, I want to get it right.

Quote:I don't know if it's too complex or not for you to test, but maybe you could try to change your network system to use a single UDP socket on the server to which the clients can send/receive data. The main point here would be to test if you're hitting a TCP socket count overhead/limit somewhere. With UDP your packets won't be reliable anymore, but since it's just for a test, it shouldn't matter..


I've started the process of making my IOCP server UDP based, but I'm getting a little stuck on one specific issue. In TCP, each connection has a WSARead posted on it and it simply blocks until there is something going on.

In my current UDP setup however, when I post a overlapped WSARecvFrom on the main listen socket, it completes immediately with an error code of ERROR_IO_PENDING. This is bad because since it does not block, the server will run at 100% CPU usage as it's polling the recv buffers rather than waiting for a completion event.

I've found that if I instead just post a blocking WSARecv, and then use PostQueuedCompletionStatus to the worker threads, then I can utilize the system as it should be, but that's defeating the purpose as it's bottlenecked at the non-overlapped WSARecvFrom call.

Going back to the overlapped WSARecvFrom call, each call does post a recv operation for the IOCP code to use, but I don't really know how to make it so that it's not flooding the connection with overlapped recv requests. I.e., running it in the loop will make it post an overlapped operation, so calling 1000 times a second means 1000 queued overlapped events waiting to be handled. Well that running 60 seconds is 60,000 over lapped events waiting ot be happened, or in UDP, 60,000 packets a minute.

I'm not really sure at the moment of how to go about coding limitations on that process in an efficient manner that would allow me to do the same caliber of TCP testing using UDP. My only initial thought on the matter is to have some value that stores how many posted overlapped operations there are and when that number hits some limit, Sleep the recv thread a little to see if those events have reduced. In the worker threads, I'd decrease the value on each operation. I was thinking I could do that system using the Interlocked family of functions, but that doesn't address what to do when I have too many posted as Sleeping seems a bit of a hack.

What I think I should be doing is having the worker threads decrement the overlapped counter as mentioned before, but have a detection in place to set a global event that the UDP thread waits on if the current overlapped counter value is great than some safe value. When that event is then triggered, it will loop through the WSARecvFroms again, posting requests and updating the counter until it hits the max quota and then waits for the event again. That seems better than Sleep and more effective and scalable overall.

I should be able to finish this code and test the idea in the next couple of days. I actually have simple UDP I'm working with for the distributed system component I'm making. More to come soon! [smile]
hi Drew. Just fui.

Server VM based on dual core centrino and hosted 6 different servers, RAM 4gb.
Router fortigate 200 (not home router) clients, 25 different computers belonging different subnetworks, each of them running several clients. Each packet size was about 600 bytes. Results was 800 ccu without any significant exhausting server resources (i guess it was very easy to increase this amount, but we was limited by clients side...)
Thanks for those results aissp! I am still looking forward to testing the TCP version on a server myself and checking it out. I think I will run out of client resources before you did though [lol]

I've been working on the UDP version and after testing it, the results seem unreal. I mean, I know I am testing on localhost and all, but it's amazing how it's turning out.

Just some preliminary numbers about the current results. I am still thinking something has to be wrong, but so far it looks like I'm doing it right. I will spend some more time thinking through it though. I used TeamViewer to RDC into my laptops, so there was a small amount of network traffic from UDP ~4kb/s always going on.

Server: UDP IOCP with 2500 overlapped received kept pending at a time (see my Journal for why 2500!) As soon as this value hit 1/2, more requests were posted to bring it back to the max. This logic only triggers on 1/2 empty. The server also has a hard coded limitation of tracking 8kb "connections" (arbitrary). Since this is early stage work, I have to limit the server to one IOCP thread and one worker thread to avoid any synchronization issues with storing timing data. Unlike the TCP server, this server has to make use of boost::singleton_pool for the massive allocations and deallocations for the connection objects posted on each overlapped event. That wouldn't make performance "better" though.

Client: Creates a UDP socket and sends 32 bytes each second. Each client uses __rdtsc to generate its own UID and sends that to the server. Server stores client data in a stdext hash_map by the 64bit number sent. Simple client just like the TCP one was.

I ran 1000 processes of the Client on each of two laptops, 2000 total clients. Initial memory usage was 29mb (high for many reasons, modified TCP code) and stayed there in that range. CPU usage was more often 1 and sometimes 2 and 0 throughout the first 5 minutes of the test. About each minute that passed added 1 second on cpu time. About 150kb/s data was being processed by the application according to NetMeter.

The test ran for just a little over 5 minutes, informal timing. Out of the 2000 connections, 1033 had an average service time of over 1 second, but under still under ~1ms (< 1002 ms). 967 had an average service time of under 1 second, but under 1ms below it (>998ms). Given that 1/2 of the clients have their data handled faster than they are sending it, makes me a little concerned. I think I need to add in sequence numbers to the packets to try and figure out if duplicates are throwing off the values or if it's just the lack of precision of GetTickCount. I should move to a more high precision timer for a more accurate test.

For a second test, I ran 1500 processes of the Client on each laptop, 3000 total clients for the server. For this test, about 220kb/s data was being processed by the applications according to NetMeter. CPU usage was more often 1 and 2 % rather than 0 in the previous version. As a result, almost 2 seconds of cpu time was spent per minute on this test.

This test results were more reflective of the amount of traffic, but not by much. 325 were still running with an average service time of around 999ms. 2204 were running somewhere around 1000ms and fractions. 471 had averages above 1s between 1-5ms over.

Download: Results 1 | Results 2. The format is: [Client Index] => Average Service Time

Of course, testing is by no means scientific or "official", but the obvious difference between TCP and UDP are being seen; I just didn't know it would be this big. I'm sure my tests are far from optimal, but I'll keep at it this weekend to get better tests replicable for when I test on a server. I also will try to make a UDP/TCP client all-in-one as that would be more useful than trying to make two projects.

These posts are getting longer and longer each time, so I'll stop here, but I did want to mention I also made a one process test client that spawned more UDP connections and that testing went well likewise. Each laptop tried to run 4,000 UDP sockets and traffic was ~675kb/s. Occasional dips did take place where it dropped down to 1/3 of that, which should be reflected in the service time latencies. I ran a quick test for a couple of minutes (7950 total clients actually made it) and results were as follows: Results 3. yes the first few at ~500ms is puzzling, but everything else looks normal, but scaled way better than TCP testing did!
Quote:which it seems the network can't handle from one source.


Actually, I think the problem is the testing client. When there are 500 separate processes, each taking 2 megabytes, and each doing their own I/O, the client-side kernel will have trouble scheduling all that. If you wrote one uber-client that used IOCP to multiplex a large number of client connections, I believe that client could sustain many more connections. From the point of view of the server, there would be no difference, because each connection is still a separate client-side port.

The reason you get half as much data back as in is because TCP overhead is at least 40 bytes per packet (20 for IP and 20 for TCP without options). Thus, your 32 bytes are actually less than the overhead.

Also, when posting measurements, "kB" is usually 1000 bytes, "kb" is usually 1000 bits, "KB" is usually 1024 bytes, and "Kb" is usually 1024 bits. Thus, when you post "48 kb" I read that as bits, meaning you're doing I/O that would fit on a modem. My guess is you meant bytes. A lot of people don't use the same conventions, though (and there's "kibi" and "kby" and others, too) so it's best to write out the unit: "kilobytes/s."
enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement