Stressing IOCP server

Started by
24 comments, last by ramdy 13 years, 4 months ago
Hello, Drew.

Regarding the hand-made queue:

nSize is accesed by workers, only. (It must be locked per worker)
nReadIndex is accesed and modified by engine (the main thread)

The point of doing it like this is that engine thread may never wait for workers.

So, the problem is the "GetEventQueueSize" (from engine) which returns the aritmetical operations between 2 variables manipulated by different threads. What about if GetEventQueueSize createas a local variable nLocalSize to store nSize and then return: nLocalSize-nReadIndex (nReadIndex can only be changed by engine which is busy (its single thread) with GetEventQueueSize).


About the possibility to the array of events being "surpased" (Imagine MAX_EVENTS_ARRAY_SIZE = 10)
nSize = 6
nReadIndex = 0
---
nSize = 7 (7,8,9,0,1,2,3,4,5,6,7)
nReadIndex = 6

this way you would be lossing a cuantity of events equal to MAX_EVENTS_ARRAY_SIZE, however... given a situation like this if this being dynamic, you would be constantly allocating memory wich would drive to a bad situation as well. So, what about thinking in other way. Ok, array can be surpased but if this is happening, isn't is this an indicator of something going very wrong? What about implementing a method like:

nSize ++;
if (nSize == nReadIndex)
array surpased, it's going wrong...

In some way, as said before, it would be a method to anticipate a "not enough memory" problem :)

(Please tell me Im right) :D


About preventing more than one WSASend, only method is to make the engine to wait for workers processing a previous send request, but it costs making the engine to wait. or just returning a busy flag or something to engine (which complicate the programming in the engine side).

Thanks as always!!

Advertisement

Next thing coming to mind is the "packet reordering" problem where after posting it to the Completion Port it may be returned from it in a different order. Would I have to worry about it? this is going to happen by posting only 1 WSARecv/ 1WSASend concurrent per socket?



It really sounds to me as if you're not yet ready for a full-blown IOCP based system. While the concept is actually quite simple, systems programming in an unmanaged language with threading involved is actually really hard to get right. You really should consider trying for a regular select() loop initially, and only try to do something fancier if and when you get enough players to warrant the effort. And, at that point, if you have a little more experience with distributed systems and multi-threading, that'd probably be good :-)

That being said: You're asking the right question here. If there is only ever one overlapped recv() outstanding on a particular socket, how could there be a re-ordering on that socket? Answer: there can't.
enum Bool { True, False, FileNotFound };
hi, hplus0603

Well, I have to say Im not going for a full-blown IOCP. In fact, my first idea was to serve with just 1 worker thread for the recvs/sends and another for the accepts. As I said at first post Im not any network expert but well, this solution may be always better than Direct Play :D (at least for the amount of concurrent connections). I have learnt a lot with this experience where I started by the most simple blocking sockets (the client part is wsaasyncselect model) to end doing the IOCP model and currently it is working stable for 24 hours and everything seems ok. (If anyone want an exe to test it, I can send)

Again, thanks for your considerations.

GetQueuedCompletionStatus Function (MSDN)

Attempts to dequeue an I/O completion packet from the specified I/O completion port. If there is no completion packet queued, the function waits for a pending I/O operation associated with the completion port to complete.
To dequeue multiple I/O completion packets at once, use the GetQueuedCompletionStatusEx function.

So, GetQueuedCompletionStatus dequeues (in an internal CP process) one packet at once. Once packet is dequeued the function sets the bytetransferred, socket context and overlaped variables and release a waiting thread, basic question comes, if function releases 1 thread and when this first thread hasn't finished, released a second thread, are the bytetransferred, socket context and overlaped variables overwritten?

Thanks.


So, GetQueuedCompletionStatus dequeues (in an internal CP process) one packet at once. Once packet is dequeued the function sets the bytetransferred, socket context and overlaped variables and release a waiting thread, basic question comes, if function releases 1 thread and when this first thread hasn't finished, released a second thread, are the bytetransferred, socket context and overlaped variables overwritten?


This should be obvious from the rules for scope and memory. If you're using the same value (say, globals) as pointers, or the same OVERLAPPED instance, then yes, those will get overwritten.

It sounds to me as if you should take a step back, and understand the rules for memory management and scoping in C/C++ before you write any more "real" code. Your question makes me believe that you're trying to work with things that are significantly beyond your current understanding of the tools you're trying to use. That, generally, is not a great way of learning how to do things right.
enum Bool { True, False, FileNotFound };

Once packet is dequeued the function sets the bytetransferred, socket context and overlaped variables and release a waiting thread, basic question comes, if function releases 1 thread and when this first thread hasn't finished, released a second thread, are the bytetransferred, socket context and overlaped variables overwritten?


If you are using only one global set of variables yes, it would. However, that approach is incorrect in the IOCP model as each worker thread should have its own set of variables for that function. It's really just threading 101, each thread should use its own local data unless you have global state to share, to which then you must synchronize access to it.

So your worker thread should be setup like:

void WorkerThread()
{
BOOL result = 0;
DWORD numberOfBytes = 0;
ULONG_PTR key = 0;
OVERLAPPED * lpOverlapped = 0;
// ...

while( do_not_exit_condition )
{
result = GetQueuedCompletionStatus( completionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE );
// whatever timeout your system needs
// ...
}

// ...
}


This way, no matter how many worker threads you have, the local variables are only affected so everything works as expected. The biggest thing to note is that GetQueuedCompletionStatus returns a pointer to your OVERLAPPED object.This means you can use the CONTAINING_RECORD macro to get back your original object if you passed one originally. I.e if you had a POD type:


struct UDPOverlappedEvent
{
OVERLAPPED overlapped;
sockaddr_in address;
int addressLength;
BYTE operation;
UINT16 dataBufferSize;
UINT8 dataBufferData[HPS_OVERLAPPED_BUFFER_SIZE];
};


And then did the following:

UDPOverlappedEvent * event = new UDPOverlappedEvent;
// code removed to setup the object
INT result = WSARecvFrom( ... removed ..., &event->overlapped, NULL);
// more code removed


Then when you get a completion packet in a worker thread:

//... code removed
OVERLAPPED * lpOverlapped = 0;
UDPOverlappedEvent * event = 0;
//... code removed
result = GetQueuedCompletionStatus(completionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE);
//... code removed
event = CONTAINING_RECORD(lpOverlapped, UDPOverlappedEvent, overlapped);
//... code removed
delete event;


You would then have the whole event object you originally passed implicitly by using the OVERLAPPED field. You still have to do error checking and the like but assuming the pointer is valid and you consistently used that style, to which the key parameter would help in identifying if you didn't, then you are safe. Where that might happen is if you manually call PostQueuedCompletionStatus which the overlapped field is optional!

To see more of that code I had another post using IOCP with UDP. Follow the discussion there because some additional points are brought up. Antheus has some good points as well as hplus! Even though you are using TCP, the fundamental IOCP concepts are still about the same. It's been a while since I used that code, but IIRC, I did a bit of testing with it with enet and it seemed to work ok, but I'd reevaluate it before trying to use it for anything. Also, I'd just use boost::asio with UDP rather than that code since I'm already using boost::singleton_pool.

As for your previous question about the nSize stuff, you are asking me, "what's the best wrong way I can do this but still get away with it", to which I'll reply, "it doesn't matter since it's still the wrong way!". The problem with trying to do that as you are still is that the compiler has certain freedoms to rearrange stuff and make optimizations as it sees fit depending on settings. I don't know if you have ever compared debug / release generated code with different settings but sometimes the final code can look quite different than what you actually wrote.

Let's say you try to prestore nSize but the compiler decides it'd be best if it didn't store it and just did a lookup each time as before, then what? Trying to guess what the compiler will do is really bad and even if you check it, you would have to recheck it each time you recompile. If you do want to try and hack your way to a solution, you'd need to understand all the of these concepts and then a lot more specific to the compiler you are using.

There's certainly nothing wrong with using a fixed size array conceptually, but the issue I'm pointing out is the class you are writing doesn't actually solve the problems you want it to. Using it as is will eventually come back to bite you, even if seems to give you the correct results now!

The alternative you are looking for is to change the class mechanics to something like this:

#define MAX_EVENTS_ARRAY_SIZE 4096

typedef struct TCP_EVENT
{
std::string Msg;
DWORD ClientIndex;
int EventId;
};

class EVENT_QUEUE
{
private:
TCP_EVENT EventsArray[ MAX_EVENTS_ARRAY_SIZE ];
DWORD nReadIndex;
DWORD nSize;
volatile LONG nCount;

public:
EVENT_QUEUE() : nReadIndex( 0 ), nSize( 0 ), nCount( 0 )
{
}

void Push( DWORD ClientIndex, int EventId, std::string Message )
{
// Create and set a TCP_EVENT structure
EventsArray[nSize].ClientIndex = ClientIndex;
EventsArray[nSize].EventId = EventId;
EventsArray[nSize].Msg = Message;

// Increment nSize for cyclic array
nSize++;
if (nSize >= MAX_EVENTS_ARRAY_SIZE)
nSize = 0;

if( InterlockedIncrement( &nCount ) == MAX_EVENTS_ARRAY_SIZE )
{
abort(); // Here is your event overflow
}
}

TCP_EVENT & GetNextEvent()
{
return( EventsArray[ nReadIndex ] );
}

void Pop()
{
nReadIndex++;
if( nReadIndex >= MAX_EVENTS_ARRAY_SIZE )
{
nReadIndex = 0;
}

InterlockedDecrement( &nCount );
}

bool HasEvent()
{
return ( InterlockedCompareExchange( &nCount, 0, 0 ) > 0 );
}
} EventQueue;


Given the following conditions:
- HasEvent can be called from any thread at any time
- Pop can only be called from one thread, and is not thread safe
- GetNextEvent can only be called from one thread, and is not thread safe
- Push can only be called from one thread, and is not thread safe.
- Externally locking Push makes the function thread safe
- Externally locking Pop / GetNextEvent makes the function thread safe but only logically correct if HasEvent returns true
- Calling Pop when HasEvent does not return true messes up the state
- Calling GetNextEvent without calling Pop after HasEvent returns true or mismatching the number of Pops called to the number of times HasEvent results in invalid events being processed.
- And anything else I might have missed "logically" speaking

To use that code now, you'd have:

Worker Thread] lock / call Push / unlock (as normal)

Event Thread]

const int max_events_per_frame = 100;
for( int x = 0; x < max_events_per_frame; ++x )
{
if( ! EventQueue.HasEvent() )
break;
TCP_EVENT & e = EventQueue.GetNextEvent();
// todo: use e
EventQueue.Pop();
}


Now, you don't have to worry about nSize and keep it locked as it should. The nCount is a generic counter that is modified using only atomic operations so it's fine how you are using it across different threads. To prevent your event thread from always processing events as a result of a race condition of producers producing faster than you are consuming, the max_events_per_frame variable has been added to ensure your other work is able to be done. If the event thread simple churns through events, then you don't need that logic and change run until exit and only process when HasEvent returns true.

I personally don't like having code with such imposing restrictions on its use because it leads to harder to find bugs down the line and you have to keep careful documentation of it. However, given your setup, I don't see it being a problem as long as you adhere to the conditions of use.Of course, once you start working on a multiprocessor system, there are other rules ( one example ) that come into play that you have to be aware of as well, so even that solution might break down!

So I'm going to have to second hplus's notion (I started writing this reply way before he posted his, but I just saw his on the preview as I was reading over mine) to take a few steps back before you continue on so you don't code yourself into a deep dark hole you can't climb out of! No one likes being told this, but we are all here to help anyone willing to listen and the consensus from the people posting in this thread is you would be better served going with something simpler first and then look into taking the DIY approach after learning more of the complexities of the topics involved and perhaps even trying another library that does this stuff for you.

Good luck!

ServerWorkerThread(LPVOID CompletionPortID)
{
HANDLE CompletionPort = (HANDLE) CompletionPortID;
LPOVERLAPPED Overlapped = NULL;
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_OPERATION_DATA PerIoData = NULL;
DWORD Flags =0;
DWORD BytesTransferred = 0;

while(TRUE)
{
// continually loop to service io completion packets
//Worker thread waits in the completion port.
if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)
...

// Append Msg received to client sRECV_BUFFER.
Client[PerHandleData->ClientIndex].sRECV_BUFFER.append(PerIoData->Buffer);

//------------------------------------------------------
// Reset PerIoData and post a new WSARecv
ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));
...


So... since this function is entered once per thread and vars are local, each thread gets its own copy and none overwrites each other (this was my doubt if in a multiple concurrent situations at different points of the function several threads may interact with same -named- variable).

In the code above, each thread would have its own pointer variable which GetQueuedCompletionStatus would set.
Now I see it clear!


About HasEvent, where preferring the amount of events in queue, would be any problem by just returning nCount?
DWORD GetEventsQueueSize() { return nCount; }<br /> <br /> <br /> Given the following conditions:<br /> - HasEvent can be called from any thread at any time———- (thread safe with return nCount) as long as I know return is also thread safe<br /> - Pop can only be called from one thread, and is not thread safe————- yes, only called by engine (single thread)<br /> - GetNextEvent can only be called from one thread, and is not thread safe———- yes, only called by engine (single thread)<br /> - Push can only be called from one thread, and is not thread safe.————- only called by workers and is locked per worker on every operation.<br /> - Externally locking Push makes the function thread safe <br /> - Externally locking Pop / GetNextEvent makes the function thread safe but only logically correct if HasEvent returns true ———- done this way (if GetEventsQueueSize returning &gt;0)<br /> - Calling Pop when HasEvent does not return true messes up the state —————– Pop is only called by engine and nCount becomes thread safe so, I see it ok.<br /> - Calling GetNextEvent without calling Pop after HasEvent returns true or mismatching the number of Pops called to the number of times HasEvent results in invalid events being processed.<br /> - And anything else I might have missed &quot;logically&quot; speaking<br /> <br /> Pop is always called after a GetNextEvent (always from engine). Another thing is the return nCount. It could miss but only by returning a lower value to the engine which wouldn not be a problem, it is reevaluated each frame.<br /> <br /> It is many years without seeing any C/C++ code so I have to say it&#39;s hard for me, specially the multithread aspect and of course this would be a subject of study for months. I hope to get the free time soon to dedicate to it deeply but work and family gets most of the time. However I see the code now very good and robust (currently about 2 weeks debugging and reviewing where I will keep pushing)<br /> <br /> As a confession I took a look to Assio before starting the project but I saw the manual (&gt;1000 pages) so I thought, let&#39;s do it directly, it will be fast. But, sincerelly, I would take Assio now, mostly, for the code warranties.<br /> <br /> Thanks again <img src='http://public.gamedev.net/public/style_emoticons/<#EMO_DIR#>/wink.gif' class='bbc_emoticon' alt=';)' /><br /> <br />

This topic is closed to new replies.

Advertisement