Inter-thread communication

Started by
20 comments, last by Hodgman 9 years, 8 months ago

tl;dr - in short, this is a question of how to best implement my own version of PostThreadMessage().

More precisely, my target problem right now is communication between the input and render threads in the context of a GUI. I'd like this very same logic to be extensible to communication between the input and game logic threads, and load and render threads. I've kind of settled on a fairly simple counter-based method as I've come along, as it just works.

My assumptions are:

- I don't want to use a locking mechanism

- instead I either use polling in the render thread if running in real-time mode, or signaling via something like a waitable object to force a refresh, if not running in real-time mode. This bit I'm somewhat worried about as I'm not yet sure as to how portable it is.

- I'd like the communication scheme to be extensible to any many-to-one thread model where many threads can send messages to one target thread

With this in mind the best (and so far only, really) way I've managed think of is a FIFO stack with each sender thread having its own dispatch queue and a write marker that points into a circular buffer (eg the stack). The receiving thread then reads everything from its respective read marker up to the write marker at each iteration (or when notified). There is no synchronization between the two threads other than a possible SetEvent() called by the sender if the render thread doesn't already poll in a tight loop.

So far this has been a fairly straightforward and foolproof approach. The primary problem I see here is guesstimating the size of the message buffer and, in particular, what to do when an overflow occurs. Before going about reallocating memory to fit more messages I figured it'd make sense to actually see if there might be more trickery involved in the way Windows does it (yeah, that was a question).

Also, in a way it feels a bit like a hack, even though it works :)

Advertisement
What you've implemented is a "lock-free queue", which is a structure that's notoriously difficult to implement. If you have no synchronization code in there, then I can assure you that you likely have a bug ;P
For it to be portable, you need to insert memory fences at specific points to ensure the order of reads and writes between threads is consistent. The difficult part here is that *most* x86 CPUs will seem to work fine even without the fences, making it hard to tell if your code is correct or not...
I had a unit-test for my previous lock-free multiple-producer/multiple-consumer FIFO set to run every time I compiled my code. After passing continuously for 6 months, it finally failed once due to a tiny race condition... ;(

Resizable lock-free structures are even harder to design, so if this is for a game, I would advocate just "knowing your data" and setting a suitable upper limit (and asserting/crashing on overflow).

If this is for Windows only, there's a lock-free singly linked list somewhere in the Win32 API (I forget the name... Slist?), which is probably used by their own messaging stuff.

Some links:
http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
https://code.google.com/p/eight/source/browse/include/eight/core/thread/fifo_mpmc.h
https://code.google.com/p/eight/source/browse/include/eight/core/thread/fifo_spsc.h

The last two links are into my code-base. The read/write cursors are 'atomics', which means they insert memory fences when they are used.
The MPMC version (multiple producers/consumers) has an atomic flag inside each data item, so that the write cursor can be incremented, then the data written, then the data-flag set to true - so that multiple writer-threads can use it at once. Reader threads can then wait on these flags to be true before incrementing the read cursor.

In some cases though, it might actually be better to use an array of SPSC queues, because this way there will be no contention between writer-threads!

If this is for Windows only, there's a lock-free singly linked list somewhere in the Win32 API (I forget the name... Slist?), which is probably used by their own messaging stuff.

Yes, SList is correct.

Additionally boost has some lock free containers in it, and if I recall correctly intel released a whole library of lock free data structures.

In time the project grows, the ignorance of its devs it shows, with many a convoluted function, it plunges into deep compunction, the price of failure is high, Washu's mirth is nigh.

This is totally off the top of my head, so correct me if I'm wrong...

initialise atomic to zero, then:

void ReadWriteFromQueue( bool read )

{

while( atomic_increment != 1 ) // another thread has access already

{

atomic_decrement;

}

if( read == true )

{

// read from queue code here

}

else

{

// write to queue here

// if queue size is too small, resize (possibly during development to best guess max queue size)

}

atomic_decrement;

}

This is totally off the top of my head, so correct me if I'm wrong...


This is not lock-free. You've just implemented a (very poor) spinlock-based critical section. That spinlock is going to burn through CPU since it has no step back, allows thread starvation, and the while loop will do a lot of cache bouncing. There are lock-free circular-buffer-based bounded concurrent queues and lock-free linked-list unbounded concurrent queues, and even more advanced wait-free concurrent queues.

With this in mind the best (and so far only, really) way I've managed think of is a FIFO stack


This is roughly what you should be doing. Some other languages call these "channels" or "ports" or just "concurrent queues." Everything that Hodgman said is true, though; these are difficult data structures to write and yours is almost certainly broken without you realizing it, so you should take one of the existing well-tested rock-solid implementations instead of trying to make your own.

Do be aware of a potential dead-lock you can get into with this design. If your render thread is ever stuck waiting for the main thread and the main thread is stuck waiting for the render thread, your game will deadlock. You do end up needing some synchronization like this due to the peculiarities of what specifically is allowed to live on a render thread and which parts of rendering have to live on the main thread.

These kinds of deadlocks can happen even more frequently with bounded concurrent queues since you can't push a new event into said queue when it's full but the other thread might _also_ be blocked trying to push into its queue, so you have to be smart about what you do when you need to push a value into a bounded queue. Unbounded queues don't necessarily solve this problem, either; they just change it to be harder to detect and the results to be more serious. You have to think a lot about the synchronization between threads (or processes, or service) even in "lock-free" contexts.

This is one of the reasons I prefer fork-join parallelism. You don't have a dedicated render thread but rather a pool of worker threads that can be tasked to do physics work, AI work, rendering work, etc, while the main thread continues to run serially over the game loop (the actual OS "main thread," e.g. the "input thread," may still need to be a separate thread from the game/main thread and you still need to deal with all the above problems, but at least it's localized to just one pair of threads and not for each subsystem).

Sean Middleditch – Game Systems Engineer – Join my team!


If you have no synchronization code in there, then I can assure you that you likely have a bug ;P


Resizable lock-free structures are even harder to design, so if this is for a game, I would advocate just "knowing your data" and setting a suitable upper limit (and asserting/crashing on overflow).

I haven't gone over the links you provided yet, but I with the SPSC approach I'm using I don't think I need atomics - even for the read/write markers, as it should be safe to assume these can never conflict. Right?

The toughest part of writing a multi-threaded application is comprehending the logic of execution flow and I indeed have rock-walled with strange effects showing up every now and then for apparently no reason. This is why I'm finally setting up a messaging system and aren't really afraid of taking my time with it. Thanks for going into the trouble of posting some reading!

With respect to handling overflow - I'm actually not all that concerned with it. It's just that my initial brute-force addition of GUI refresh messages for in-game windows somewhat surprised me: even though I only had 74 controls hidden and visible that were being created at application start, without filtering, my dispatcher received almost 2000 refresh commands even before the first frame was drawn. I figured I'd done a fairly thorough job of removing redundancy from my code, but cascading the dirty flag can really add up quickly. With simple filtering it wasn't hard to normalize the amount of messages and it helped me identify a simple missing check in the rest of the code, but the cold truth (and a lesson) is that my conservative estimate of queue length of 512 messages overflowed quite quickly. And that made me worried.

I suppose the dispatch queue can in a way be a fairly efficient way of also identifying redundancy that can otherwise be somewhat hard to spot.


In some cases though, it might actually be better to use an array of SPSC queues, because this way there will be no contention between writer-threads!

That's actually what I'm doing right now: each producer thread has its own queue and the consumer iterates through each of them. I don't really see a reason to use an MPSC/MPMC queue in a game as there aren't too many threads to start with. I mentioned this in my own words in the OP smile.png.

I guess if my target was a new operating system with an unbounded thread count or some thread-heavy distributed system, I would need a less bloated approach to minimize memory redundancy and centralize messaging, but right now (especially once I've profiled my code a bit and know how may messages are actually flying around) I don't really have a problem with allocating relatively copious amounts of space to accommodate large enough queues. The way I'm thinking right now is along the lines of "simpler is better": a megabyte per thread-thread interaction can hold a fair number of messages and costs almost nothing.


These kinds of deadlocks can happen even more frequently with bounded concurrent queues since you can't push a new event into said queue when it's full but the other thread might _also_ be blocked trying to push into its queue, so you have to be smart about what you do when you need to push a value into a bounded queue. Unbounded queues don't necessarily solve this problem, either; they just change it to be harder to detect and the results to be more serious. You have to think a lot about the synchronization between threads (or processes, or service) even in "lock-free" contexts.

I do appreciate the potential complexity that can arise when the number of threads increases. Since I'm still dabbling with thread-safe messaging I think I'll do what Hodgman said and set up a hard limit to the queue length. Once I hit that during development I'll just analyze the reason and either work backwards to reduce the number of messages or increase the queue size as needed.


Do be aware of a potential dead-lock you can get into with this design. If your render thread is ever stuck waiting for the main thread and the main thread is stuck waiting for the render thread, your game will deadlock.

With an SPSC queue I don't frankly see how a deadlock can occur, especially since none of the threads are waiting for another thread. I'm going for true asynchronous communication here where the render thread just takes pre-calculated input at the start of each frame and applies it all at once to avoid mid-frame updates.

with the SPSC approach I'm using I don't think I need atomics - even for the read/write markers, as it should be safe to assume these can never conflict. Right?

Only if you can assume that your code executes in the order that you've written it in, which you can't. So - Not right :D

First up, the compiler is totally allowed to reorder your code, as long as the reordered version results in the same visible behavior from a single-threaded point of view.
In a single-threaded program, it doesn't matter whether you write new data and then update the cursor, or whether you do those two writes in the other order... The outcome is the same, so the compiler is free to play with that ordering.
I'm a multi-threaded program though, the wrong ordering can cause the consumer to try and read new data before it has even been written!
So, unless you're using lovingly hand-crafted ASM, you at the very least require a compile-time memory barrier to force the compiler to emit the instructions in the correct order.

Second, CPUs are damn fancy these days - they basically have an optimizer built in!
The CPU is totally allowed to reorder your code, as long as the reordered version results in the same visible behavior from a single-threaded point of view! It can reorder the instruction stream, and can also buffer up and reorder reads and writes to memory!
That means the same situation can occur, where data/cursors are read/written out of order and you end up with corrupt results.
Now, x86 is pretty nice and although it does a lot of reordering, it does create some kinds of implicit fences around certain memory accesses to avoid this problem, but there are many CPUs that don't. To write solid, portable lock-free code, you either have to read the manuals for your target CPUs so you know what assumptions are valid, or you need to insert the appropriate runtime memory fence instructions, which will tell the CPU not to reorder (if necessary).

with the SPSC approach I'm using I don't think I need atomics - even for the read/write markers, as it should be safe to assume these can never conflict. Right?

Only if you can assume that your code executes in the order that you've written it in, which you can't. So - Not right biggrin.png

First up, the compiler is totally allowed to reorder your code, as long as the reordered version results in the same visible behavior from a single-threaded point of view.
In a single-threaded program, it doesn't matter whether you write new data and then update the cursor, or whether you do those two writes in the other order... The outcome is the same, so the compiler is free to play with that ordering.
I'm a multi-threaded program though, the wrong ordering can cause the consumer to try and read new data before it has even been written!
So, unless you're using lovingly hand-crafted ASM, you at the very least require a compile-time memory barrier to force the compiler to emit the instructions in the correct order.

Second, CPUs are damn fancy these days - they basically have an optimizer built in!
The CPU is totally allowed to reorder your code, as long as the reordered version results in the same visible behavior from a single-threaded point of view! It can reorder the instruction stream, and can also buffer up and reorder reads and writes to memory!
That means the same situation can occur, where data/cursors are read/written out of order and you end up with corrupt results.
Now, x86 is pretty nice and although it does a lot of reordering, it does create some kinds of implicit fences around certain memory accesses to avoid this problem, but there are many CPUs that don't. To write solid, portable lock-free code, you either have to read the manuals for your target CPUs so you know what assumptions are valid, or you need to insert the appropriate runtime memory fence instructions, which will tell the CPU not to reorder (if necessary).

Point taken, understood and appreciated :).

I'm in a bit of a hurry, but just for kicks, here's my Q'n'D implementation. It seems to be working at first glance, but it's open for peer review. Note that I don't need to send generic messages (eg the range of message types is restricted to simple pointer/variable updates which are contained in a structure specific to each consumer thread), thus avoiding unnecessary casting.

Producer thread:

msg = disp.QueryFreeMessage();

if(msg)

{ fill in and call disp.Dispatch(); }

Consumer thread:

while(msg = disp.GetMessage()) { HandleMessage(msg); }


#define MAX_PRODUCER_THREADS	64
#define MAX_PRODUCER_MESSAGES	65536

//LINUX:	__sync_add_and_fetch()
//OSX:		OSAtomicAdd32
//WINDOWS:	InterlockedIncrement()
UINT MyAtomicIncrement(	IN volatile UINT * ptr);
void MyAtomicSet(		IN volatile UINT * ptr, IN UINT value);

template<class T>
struct IThreadDispatchQueueSPSC	{
	UINT iThreadID;

	volatile UINT iWriteMarker;
	volatile UINT iReadMarker;

	T queue[MAX_PRODUCER_MESSAGES];

	IThreadDispatchQueueSPSC()
		{
		iWriteMarker = 0;
		iReadMarker = 0;
		}

	T* QueryFreeMessage()
		{ return &queue[iWriteMarker]; }

	T* GetMessage()
		{
		if(iReadMarker == iWriteMarker)
			return 0;

		T* res = &queue[iReadMarker];

		UINT iNext = iReadMarker + 1;

		if(iNext >= MAX_PRODUCER_MESSAGES - 1)
			{ iNext = 0; }

		MyAtomicSet(&iReadMarker, iNext);

		return res;
		}

	void Dispatch()
		{
		UINT iNext = iWriteMarker + 1;

		if(iNext >= MAX_PRODUCER_MESSAGES - 1)
			{ iNext = 0; }

		if(iNext == iReadMarker)
			{ lout << "Warning: dispatch queue full ('" << TStackTrace::getThreadName(iThreadID) << "')" << endl; return; }

		MyAtomicSet(&iWriteMarker, iNext);
		}
	};

template <class T>
struct IThreadMessageQueue_MPSCArray	{
	UINT iConsumerThreadID;

	IThreadDispatchQueueSPSC<T> producerThreads[MAX_PRODUCER_THREADS];

	volatile UINT iNumProducerThreads;

	IThreadMessageQueue_MPSCArray()
		{
		iNumProducerThreads = 0;
		iConsumerThreadID = 0;

		ZeroMemory(producerThreads, sizeof(producerThreads));
		}

	bool RegisterProducerThread(UINT iThreadID)
		{
		if(iNumProducerThreads >= MAX_PRODUCER_THREADS)
			{
			lout << "Error: too many producer threads: " << iNumProducerThreads << endl;
			//THROW("Too many producer threads");
			return false;
			}

		int iIndex = MyAtomicIncrement(&iNumProducerThreads) - 1;
 		lout << "new producer thread: " << iIndex << " " << iThreadID << endl;
		producerThreads[iIndex].iThreadID = iThreadID;

		return true;
		}

	//returns a pointer to the next free message, NULL if the stack overflows.
	T* QueryFreeMessage()
		{
		UINT iThreadID = GetCurrentThreadId();

		for(int i = 0; i < (int)iNumProducerThreads; i++)
			{
			IThreadDispatchQueueSPSC<T>* t = &producerThreads[i];

			if(iThreadID == t->iThreadID)
				{ return t->QueryFreeMessage(); }
			}

		//add this thread
		if(!RegisterProducerThread(iThreadID))
			return NULL;

		//try again
		return QueryFreeMessage();
		}

	//produce a message into the queue
	void Dispatch()
		{
		UINT iThreadID = GetCurrentThreadId();

		for(int i = 0; i < (int)iNumProducerThreads; i++)
			{
			IThreadDispatchQueueSPSC<T>* t = &producerThreads[i];
			if(t->iThreadID == iThreadID)
				{ t->Dispatch(); break; }
			}
		}

	//iterates through unhandled messages sent to this thread. Should only be called by the consumer thread.
	T* GetMessage()
		{
		for(int i = 0; i < (int)iNumProducerThreads; i++)
			{
			IThreadDispatchQueueSPSC<T>* t = &producerThreads[i];

			if(t->iReadMarker != t->iWriteMarker)
				{ return t->GetMessage(); }
			}

		//out of messages
		return NULL;
		}
};

msg = disp.QueryFreeMessage();
if(msg)
....
T* QueryFreeMessage() { return &queue[iWriteMarker]; }

if(msg) will always be true.

Dispatch can fail, but the caller doesn't know sad.png
Also, in this case, the caller has already used QueryFreeMessage to get a pointer to a queue-slot, and has written data into that slot, even though the queue is full (overwriting not-yet-consumed data).
You probably want to make QueryFreeMessage actually return false to solve this, and change the error inside Dispatch into an assertion failure, because it shouldn't ever happen if the client is using the class correctly.

GetMessage increments the read cursor, which lets the other thread know that it's safe to override that slot... but if the write thread does reuse that slot before HandleMessage is called, then you'll have data that's being leaked (never actually getting consumed), and other data that gets consumed twice. To solve that, you'd have to only increment the read cursor after the data has been consumed.
Or, Instead of returning T*'s to the user, return a T by value, which has been copied before the cursor has been incremented.


Additionally boost has some lock free containers in it, and if I recall correctly intel released a whole library of lock free data structures.

Cool, last time I looked at that, it was just the idea / submission-for-review stage, not actually accepted into boost yet.

Additionally boost has some lock free containers in it, and if I recall correctly intel released a whole library of lock free data structures.

Cool, last time I looked at that, it was just the idea / submission-for-review stage, not actually accepted into boost yet.

Yeah, its nice that they finally got it up and going... as for the Intel library I was thinking of.... here's a link

In time the project grows, the ignorance of its devs it shows, with many a convoluted function, it plunges into deep compunction, the price of failure is high, Washu's mirth is nigh.

This topic is closed to new replies.

Advertisement