C++ Lockless Queue for a Threadpool

Started by
11 comments, last by Hodgman 13 years, 8 months ago
Hey guys,

I need a bit of help. I'm looking into optimizing my threadpool with a lockless queue for its tasks. I'm looking for a single producer (the applications' main thread) and a multi-consumer (all my threadpools' threads). I've been reading articles and whitepapers for all different implementations using the boost libraries, C++0x, and all kinds of stuff. I'm looking for a simple-yet-effective method for a Win32 environment.

Anyone have any pointers, tips, experience or reference material they can recommend? I'm trying to get a feel for this as well, I don't anyone to just post source code with no explanation of what it does :-P

Thanks, guys!
Keith M. Programming - My Game Dev Blog.
Tutorials. Games. Code Snippets. Bad Jokes. I got 'em all.

Follow me on Twitter. [twitter]KeithMaggio[/twitter]
Listen to me yap about programming and games and junk.
There's an excellent article on this in the book "Game Programming Gems 6". It's the very first section: "Lock-free algorithms".
Darn, I only have Game Programming Gems 2. Oh well, still a good book and I'll look that article up! Thanks! Keep the ideas comin'!
Keith M. Programming - My Game Dev Blog.
Tutorials. Games. Code Snippets. Bad Jokes. I got 'em all.

Follow me on Twitter. [twitter]KeithMaggio[/twitter]
Listen to me yap about programming and games and junk.
How exactly are you planning on using this queue? What sort of data will it hold? How often will producers access it? How often will consumers access it? Have you profiled a lock-based solution and determined that it is insufficiently fast for your needs?

Lockless programming is a bitch to get right, and the benefits are very rarely substantial enough to justify the difficulty. It pays to be very, very sure that you need such a beast before deploying it. For instance, a standard reader/writer lock would accomplish what you need, and is trivially easy to implement on Win32 (I have code and documentation explaining how to do it if you're interested).

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

I'd be very interested to read it!

Pretty much it's implementing the threadpool design: threads are created at the programs start, and are exited at the end. This queue will contain tasks for the threads to complete. If a thread has no task, it will access a task queue, which will pop off the next available task.

The type of tasks used will range from anything from AI to object updating, audio, physics, math, Kevin Bacon -- Anything I need it to. I'm trying to make it as extensible as possible so I can use it again in other future projects and make a ton of demos and tutorials on it =) a research project for myself.

I realize they're a bitch to get right, but that's all the more reason for me to pursue it! If I didn't want a challenge, then I guess I wouldn't be in game development =D
Keith M. Programming - My Game Dev Blog.
Tutorials. Games. Code Snippets. Bad Jokes. I got 'em all.

Follow me on Twitter. [twitter]KeithMaggio[/twitter]
Listen to me yap about programming and games and junk.
A standard thread worker pool can work off of locking without too much overhead; the only big concern you'll run into is the need for relatively large work chunks to minimize the amount of context switching and contention for the work pool. However, even a lockless method won't really alleviate that; so in my mind the best route to take would be to start with a lock-based approach and only mess with lockless stuff if you can prove that you really need it - and I'll bet money here and now that you won't need it [wink]

That said... here's my standard implementation of a reader/writer lock on Win32 (may need some minor tweaks for Win64 but nothing serious). It's a great starting point for a task pool; we use it for something very similar in our current engine.

First, the header, a couple of utility RAII lock classes, and some general notes:

//// Multiple readers/single writer synchronization counter//// This counter allows multiple threads to read from a shared resource,// and permits a maximum of one thread at a time to write to that same// resource.//class SyncCounter{// Construction and destructionpublic:	SyncCounter();	~SyncCounter();// Reading interfacepublic:	void IncrementReadCount();	void DecrementReadCount();// Writing interfacepublic:	void LockForWriting();	bool TryLockForWriting();	void UnlockFromWriting();// Internal stateprivate:	// Note that these two members will lie on the same cache line,	// which can lead to false sharing and over-locking on most CPU	// architectures. The only real solution would be to ensure the	// memory lies on separate cache lines which requires knowledge	// of each target platform's cache line size, as well as a huge	// amount of wasted memory. All things considered, a little bit	// of false sharing is a fair price to pay for most purposes.	volatile LONG ReadCounter;	volatile LONG WriteCounter;	// We use two Windows events as controls for locking and unlocking	// the resource. The first event is signaled whenever there is not	// an active write lock, allowing us to block reader threads until	// writing is completed. When a read lock is in place, we unsignal	// the second event, which prevents any writer threads from taking	// out a write lock. The combination ensures that multiple threads	// can read while only one can write, and that readers and writers	// cannot operate simultaneously.	HANDLE NotBusyFlagEvent;	HANDLE WriteMutexEvent;	// We use some thread-local storage to ensure that individual	// threads can recursively take out locks without deadlocking	// themselves or losing track of the total read/write counts.	// These members hold the Windows TLS table indices.	DWORD TLSReadLockCount;	DWORD TLSWriteLockCount;};//// RAII wrapper class for automatically holding a read lock on a SyncCounter//class AutoReadLock{// Construction/destructionpublic:	AutoReadLock(SyncCounter& syncobject)	 : SyncObject(syncobject)	{		SyncObject.IncrementReadCount();	}	~AutoReadLock()	{		SyncObject.DecrementReadCount();	}// Internal trackingprivate:	SyncCounter& SyncObject;};//// RAII wrapper class for automatically holding a write lock on a SyncCounter//class AutoWriteLock{// Construction/destructionpublic:	AutoWriteLock(SyncCounter& syncobject)	 : SyncObject(syncobject)	{		SyncObject.LockForWriting();	}	~AutoWriteLock()	{		SyncObject.UnlockFromWriting();	}// Internal trackingprivate:	SyncCounter& SyncObject;};

This should fairly clearly explain the purpose of the classes and their general operation. Now for the implementation details:

SyncCounter::SyncCounter() :	ReadCounter(-1),	WriteCounter(-1){	NotBusyFlagEvent = ::CreateEvent(NULL, TRUE, TRUE, NULL);	WriteMutexEvent = ::CreateEvent(NULL, FALSE, TRUE, NULL);	TLSReadLockCount = ::TlsAlloc();	TLSWriteLockCount = ::TlsAlloc();	::TlsSetValue(TLSReadLockCount, 0);	::TlsSetValue(TLSWriteLockCount, 0);}SyncCounter::~SyncCounter(){	::CloseHandle(NotBusyFlagEvent);	::CloseHandle(WriteMutexEvent);	::TlsFree(TLSReadLockCount);	::TlsFree(TLSWriteLockCount);}void SyncCounter::IncrementReadCount(){	// We don't need a read-lock if we already have write control	DWORD writelockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSWriteLockCount));	if (writelockcount > 0) {		return;	}	// Make sure we don't deadlock by recursively taking out locks	DWORD readlockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSReadLockCount));	::TlsSetValue(TLSReadLockCount, reinterpret_cast<LPVOID>(readlockcount + 1));	if (readlockcount > 0) {		return;	}	::WaitForSingleObject(NotBusyFlagEvent, INFINITE);	if (::InterlockedIncrement(&ReadCounter) == 0) {		::WaitForSingleObject(WriteMutexEvent, INFINITE);	}}void SyncCounter::DecrementReadCount(){	// We don't need a read-lock if we already have write control	DWORD writelockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSWriteLockCount));	if (writelockcount > 0) {		return;	}	// Make sure we don't deadlock by recursively taking out locks	DWORD readlockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSReadLockCount));	::TlsSetValue(TLSReadLockCount, reinterpret_cast<LPVOID>(--readlockcount));	if (readlockcount > 0) {		return;	}	if (::InterlockedDecrement(&ReadCounter) < 0) {		::SetEvent(WriteMutexEvent);	}}void SyncCounter::LockForWriting(){	// Make sure we don't deadlock by recursively taking out locks	DWORD writelockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSWriteLockCount));	::TlsSetValue(TLSWriteLockCount, reinterpret_cast<LPVOID>(writelockcount + 1));	if (writelockcount > 0) {		return;	}	::WaitForSingleObject(WriteMutexEvent, INFINITE);	if (::InterlockedIncrement(&WriteCounter) == 0) {		::ResetEvent(NotBusyFlagEvent);	}}bool SyncCounter::TryLockForWriting(){	// Make sure we don't deadlock by recursively taking out locks	DWORD writelockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSWriteLockCount));	::TlsSetValue(TLSWriteLockCount, reinterpret_cast<LPVOID>(writelockcount + 1));	if (writelockcount > 0) {		return true;	}	if(::WaitForSingleObject(WriteMutexEvent, 0) == WAIT_OBJECT_0) {		if (::InterlockedIncrement(&WriteCounter) == 0) {			::ResetEvent(NotBusyFlagEvent);		}		return true;	}	::TlsSetValue(TLSWriteLockCount, reinterpret_cast<LPVOID>(writelockcount));	return false;}void SyncCounter::UnlockFromWriting(){	// Make sure we don't deadlock by recursively taking out locks	DWORD writelockcount = reinterpret_cast<DWORD>(::TlsGetValue(TLSWriteLockCount));	::TlsSetValue(TLSWriteLockCount, reinterpret_cast<LPVOID>(--writelockcount));	if (writelockcount > 0) {		return;	}	if (::InterlockedDecrement(&WriteCounter) < 0) {		::SetEvent(NotBusyFlagEvent);	}	::SetEvent(WriteMutexEvent);}

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

I see what you're doing with the lock - very nice changeover from the CRITICAL_SECTION in Win32 API. Like I said I wanted to try the queue for a challenge, otherwise I could have simply used the std::queue which I believe has locking set up inside already (not sure, have to check).

But I think I will take the simple route to do it first, then cross over to make it lockless.

But if anyone still has input as to how to get Lockless up and running I'm all ears :-)
The C++ standard library containers are not, generally speaking, thread safe.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

OK Good so there is still a challenge to be had.

So any tips/references on implementing LIGHTLY-locking queues? And any other comments or tips on implementing a good threadpool?
Quote:Original post by UtMan
OK Good so there is still a challenge to be had.

So any tips/references on implementing LIGHTLY-locking queues? And any other comments or tips on implementing a good threadpool?

I'm not sure what you mean by LIGHTLY-locking, but without getting really intricate the best you can probably do is to have a lock at each end. Here's a link to a paper and some pseudo-code. The paper has both a blocking and non-blocking queue, I'm referring to the blocking one, here. The trick is to keep a dummy node around so that en-queuing an dequeuing don't interfere with one another.

This topic is closed to new replies.
