Lockless FIFO Queue Implementation

Started by
26 comments, last by Hodgman 14 years, 10 months ago
I recently built a FIFO multi-writer single-reader queue for the Epoch programming language. The code is fairly simple; feel free to steal it [smile]

Lockless.h
//// The Epoch Language Project// FUGUE Virtual Machine//// Basic building blocks for creating lock-free algorithms//#pragma once//// Helper function for atomic compare-and-swap// Returns true on success//template <class DataType>bool CompareAndSwap(DataType* field, DataType oldvalue, DataType newvalue){	DataType retval;	_asm	{		mfence		mov eax, oldvalue		mov ecx, newvalue		mov edx, dword ptr [field]		lock cmpxchg dword ptr[edx], ecx		mov retval, eax		mfence	}	return (retval == oldvalue);}



Mailbox.h
//// The Epoch Language Project// FUGUE Virtual Machine//// Implementation of a lock-free mailbox structure for message passing//#pragma once// Dependencies#include "Utility/Threading/Lockless.h"#include "Utility/Threading/Threads.h"#include "Utility/Types/IDTypes.h"#include "Utility/Memory/ThreadLocalAllocator.h"#include "Configuration/RuntimeOptions.h"//// This class encapsulates a lockless mailbox algorithm for asynchronous message passing.// Each thread is granted a mailbox when it is started up; this mailbox is used for all// messages passed into that thread. Messages are dequeued in FIFO order.//// IMPORTANT: this algorithm is only safe for a many producer/single consumer scenario.//            Critical assumptions made by the code are only valid if a single consumer//            thread is used. However, any number of producer threads is permitted.//// The algorithm uses three distinct stacks: a read stack, a write stack, and a cache.// Producers push items onto the write stack using a simple and well-known CAS method.// When the consumer thread arrives to retrieve a message, it first checks the cache// for any waiting messages. If the cache is empty, the consumer thread swaps the read// and write stacks. At this point, the read stack is controlled entirely by the consumer// thread, so we don't need to worry about contention for elements in the read stack.// The entire read stack is then traversed and its contents pushed into the cache.// Note that we achieve FIFO semantics only by using this extra cache stack.//// For additional performance, we use customized memory management internally to avoid// expensive serialized heap allocations/frees. We cannot directly use new/delete in// the implementation, because these incur serialized heap accesses, which defeats the// purpose of the lock-free mailbox. Instead, we use a two-pronged approach to memory// management for the mailbox. Mail messages are kept in a pre-allocated buffer, and// memory is supplied by this buffer using a simple lock-free LIFO stack of free slots.// We also use a special allocator class for the std::stack container, which uses a// thread-local heap provided by the OS. Since the thread-local heap is never accessed// by an outside thread, we can avoid the cost of serialized heap accesses to that// particular memory. The combination of these two approaches allows for maximum speed// in the mailbox container, while retaining lock-free semantics and thread safety.//template <class PayloadType>class LocklessMailbox{// Construction and destructionpublic:	//	// Construct and initialize the read and write stacks	//	LocklessMailbox(HANDLE owner)		: Owner(owner)	{		WriteHead = OriginalWriteHead = new Node;		ReadHead = OriginalReadHead = new Node;		unsigned buffersize = Config::NumMessageSlots;		NodeBuffer = new Node[buffersize];		NodeFreeList = NULL;		for(unsigned i = 0; i < buffersize; ++i)		{			NodeTracker* tracker = new NodeTracker(NodeFreeList, &NodeBuffer);			FreeListHead = NodeFreeList = tracker;			NodeBuffer.Tracker = tracker;		}	}	//	// Clean up the read, write, and cache stacks, freeing any	// remaining messages from each stack.	//	~LocklessMailbox()	{		for(MessageStackType::iterator iter = PendingReads.begin(); iter != PendingReads.end(); ++iter)			delete (*iter)->Payload;		Node* n = WriteHead;		while(n)		{			delete n->Payload;			n = n->Next;		}		n = ReadHead;		while(n)		{			delete n->Payload;			n = n->Next;		}		for(unsigned i = 0; i < Config::NumMessageSlots; ++i)			delete NodeBuffer.Tracker;		delete OriginalReadHead;		delete OriginalWriteHead;		delete [] NodeBuffer;	}// Message passing interfacepublic:	//	// Register a message from a producer thread. Any number of threads	// may call this function.	//	void AddMessage(PayloadType* info)	{		Node* msgnode = AllocateNode(info);		while(true)		{			msgnode->Next = WriteHead;			bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);			if(success)				return;		}	}	//	// Retrieve a message from the mailbox.	// IMPORTANT: only ONE consumer thread (per mailbox) should call this function	//	PayloadType* GetMessage()	{		if(!PendingReads.empty())			return PopPendingRead();		if(ReadHead->Next == NULL)			SwapReadAndWrite();		Node* n = ReadHead;		while(ReadHead->Next)		{			PendingReads.push_back(n);			n = n->Next;			ReadHead = n;		}		if(PendingReads.empty())			return NULL;		return PopPendingRead();	}// Internal helpersprivate:	//	// Pop a pending read from the cache stack	//	PayloadType* PopPendingRead()	{		Node* readnode = PendingReads.back();		PayloadType* payload = readnode->Payload;		FreeNode(readnode);		PendingReads.pop_back();		return payload;	}	//	// Internal helper for swapping the read/write stacks	// See class comment for details	//	void SwapReadAndWrite()	{		while(true)		{			Node* readhead = ReadHead;			Node* oldwrite = WriteHead;			bool success = CompareAndSwap(&WriteHead, oldwrite, readhead);			if(success)			{				ReadHead = oldwrite;				return;			}		}	}// Internal memory managementprivate:	struct NodeTracker;	struct Node	{		Node() : Next(NULL), Payload(NULL) { }		explicit Node(PayloadType* p) : Next(NULL), Payload(p) { }		Node* Next;		PayloadType* Payload;		NodeTracker* Tracker;	};	struct NodeTracker	{		NodeTracker() : Next(NULL), AttachedNode(NULL) { }		explicit NodeTracker(Node* attached) : Next(NULL), AttachedNode(attached) { }		NodeTracker(NodeTracker* next, Node* attached) : Next(next), AttachedNode(attached) { }		NodeTracker* Next;		Node* AttachedNode;	};	//	// Allocate space for a message node, using the local reserved pool	//	// We trade off a limited amount of message space for the ability	// to allocate and free node storage without locking on the heap.	// Memory is managed with a simple free list that stores pointers	// into the pre-allocated node buffer. The list is implemented as	// a lock-free FILO stack; this allows multiple producers to send	// messages without blocking on the allocation routines.	//	Node* AllocateNode(PayloadType* info)	{		NodeTracker* rettracker;		if(!FreeListHead->Next)		{			::TerminateThread(Owner, 0);			throw Exception("Too many messages backlogged; make sure task is accepting the sent messages!");		}		// Pull node out of the free list		NodeTracker* newhead;		do		{			rettracker = FreeListHead;			newhead = rettracker->Next;		} while (!CompareAndSwap(&FreeListHead, rettracker, newhead));		rettracker->AttachedNode->Payload = info;		return rettracker->AttachedNode;	}	//	// Return a node to the free list	//	void FreeNode(Node* node)	{		node->Payload = NULL;		while(true)		{			node->Tracker->Next = FreeListHead;			if(CompareAndSwap(&FreeListHead, node->Tracker->Next, node->Tracker))				break;		}	}// Internal trackingprivate:	Node* WriteHead, *OriginalWriteHead;	Node* ReadHead, *OriginalReadHead;	Node* NodeBuffer;	NodeTracker* NodeFreeList;	NodeTracker* FreeListHead;	typedef std::deque<Node*, ThreadLocalAllocator<Node*> > MessageStackType;	MessageStackType PendingReads;	HANDLE Owner;};


Config::NumMessageSlots is currently set to 64 in the Epoch project, but you may need to fine-tune that for your particular situation.

This queue can pump something like 100,000 messages/second on my E6600 dual-core machine, so performance shouldn't be an issue [smile]

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Advertisement
Quote: Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?
enum Bool { True, False, FileNotFound };
Quote:Original post by hplus0603
Quote: Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?


Some type of spin attempt is probably needed, depending on what restrictions is put on the system..

There can either be a real spin-LOCK, such that some other thread may be unable to progress past a certain point, until the current thread has run at least one more instruction. This is of the type:
lock_it:compare_exchange(lock, 1, 0)jump_on_failure lock_it(do something)lock = 0

Iff the thread that takes this lock is suspended by the OS on (do something), then it is possible that some other thread, that tries to run the locking code, will be waiting for a suspended thread. When that happens the entire system will cease to progress for some time. When this is possible, the structure is not lock-free.

If we have a different algorithm such as the following:
do_it:load obj from addrcompare_exchange(addr, next addr, obj)jump_on_failure do_it(use obj)

Then it is also possible that this thread will spin forever, theoretically, without doing anything productive. That will happen very rarely, iff other threads are continuously doing work, keeping the entire system progressing. Whenever other threads stop requesting new objects to do work on, however, then this thread will be guaranteed to progress, if scheduled to run. This way it is lock-free in the sense that it is impossible for the entire system to cease progressing.

Assuming the possibility of unidentifiable and infinite threads using the structure, then as far as I know it is impossible to make any real provable guarantees for the progress of a certain thread.

More information: http://en.wikipedia.org/wiki/Non-blocking_synchronization.
Following links from that Wikipedia page there seems to be algorithms that solve this perfectly, assuming the number of threads is known, or similar constraints.
Quote:Original post by hplus0603
Quote: Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?



Technically, a CAS instruction isn't lockless in the first place, because it introduces a bus-level lock on the CPU. So if you want to be picky, nothing is lockless [wink]

I'm not aware of any method to do a CAS that doesn't spin until the CAS succeeds. How else are you supposed to handle the case where the swap fails? You can't just CAS once and hope it works, because in the instance where the swap is not made, you've just lost (and leaked) an entry.


There is a minor fairness problem, which is that it is possible (although incredibly unlikely) that if one writer continually spams the queue, it can prevent another thread from ever succeeding with the CAS. I know of no way around that which doesn't introduce synchronization issues; I've noted similar problems with basically every lock-free structure I've ever seen.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Quote:
I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?


To me it seems that it should guarantee progress (considering everything else is correct).

CAS will succeed, unless the value has been modified. If can fail only if:
- One writer's CAS succeeded
- Reader's CAS succeeded

And since there is no pre-allocated storage, new will fail outside of block and there is no case where list might be full.

Reader might be trying to read empty list, perhaps racing with writer, but that would require an extra attempt at most.

While fairness isn't guaranteed, at least progress should occur. Fairness to reader might be more of a problem. Since it can deque only a single item, while there can be many writers, the queue might not be emptying fast enough.
Concurrency is prety much where its at today.
A few searches over at M$ or Dr.Dobbs should provide plenty of information.

For a good Queue implementation take a look here:
Measuring Parallel Performance: Optimizing a Concurrent Queue
Quote:I'm not aware of any method to do a CAS that doesn't spin until the CAS succeeds.


The x86 has a native CMPXCHG instruction. It is implemented in multi-CPU systems to do arbitration of the bus; I believe it is actually guaranteed to be fair at the hardware level, and thus will have an upper bound on how long it will take to execute. The implementation in hardware typically goes something like:

1) Tell chipset/memory controller to wait for an exclusive lock for a specific cache line.
2) Wait until chipset provides the cache line.
3) Do the compare/exchange.
4) Release the cache line.

As long as the chipset does some kind of round robin or queuing on who gets a given contested cache line next, it will be fair and guarantee progress. By contrast, typical spinlocks do not guarantee progress, because it could always be the case that one particular thread gets a particular spinlock. And while that's supposed to be "unlikely" in simple analysis, there have been various bugs/crashes/catastrophes caused by just such kinds of bugs in the past...

The PPC, however, does not have bus-atomic operations; instead, it puts the cache line logic into the CPU. You load a cache line with reservation. While there is a reserved cache line, the CPU will snoop the bus, and any traffic to that same address will invalidate the reservation. You make the changes you want to make in cache, and then store conditional with reservation check. The store will fail if you lost the reservation, and you can test for this and loop back. This is, by the way, not fair, and not generally guaranteed to make progress, so real-time OS implementers on top of PPC might have a harder job. I'm just saying: when you have learned that concurrency is hard, you find that concurrency is harder than you think :-)
enum Bool { True, False, FileNotFound };
In the code I posted for Lockless.h, you can find the implementation of the CompareAndSwap function, which makes use of the x86 cmpxchg instruction.

Even though cmpxchg is atomic at the CPU level, you still need to check for a failed swap - hence the loop.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Quote:Original post by hplus0603
Quote: Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?
Yeah there's no fairness, PPC especially can have any thread starving others if bottlenecked with CAS/etc.

A CAS or reservation/store-conditional bottleneck is "lock-free" though, in that if any one thread is suspended the other threads can still keep working.
The PPC's version of this primitive is especially lax about pausing other threads to do the atomic operation.
Each thread has a finite amount of work to do in a frame though, so if a thread is stuck in a spin-lock it it only affects the progress of that thread and doesn't affect the whole system (besides reducing parallelism!).

So each spin-lock is a 'gate' that only one thread can pass through at a time - a very lightweight synchroniser. If only one thread tries to do the operation at a time then it's fine, but if more than one thread has to do the same operation then only one can proceed at a time.

Calling some lock-free functions from multiple threads, while thread-safe, can cause this synchronisation. So it's still a good idea to minimise the use of these data-structures/algorithms.
For example, instead of pushing individual items into a queue, you could collect an array of items locally in a thread and then push the whole array into the queue with one operation.

Better yet, if you can group work into 'jobs' and then schedule the jobs with dependencies, then can guarantee that two jobs won't be running at the same time, and thus those two jobs can access a data structure without threading-issues (besides OOO CPU details...). This is the step up from lock-free, called wait-free. If you've got a bunch of these algorithms, then when one is doing a serial task others can be doing parallel tasks, keeping a thread pool busy.
e.g. Two threads are used to run the update function, only thread 1 runs a start/end frame function. Thread 2 is paused during this time.
start frame -> update 1 ->             -> update 2 -> end frame
The update function can write data into two arrays, no syncing of threads required. When both threads are done, a serial task groups the results together and does something with the data.
class Work{  Thread* pThread = new Thread( StartWorkerThread );  std::vector<Result*> results[NUM_THREADS];  void Start()  {    pThread->Run();    Update( 0 );  }  void StartWorkerThread()  {    Update( 1 );  }  void Update( int threadIndex )  {    ReadBarrier();    for( i=0; i<100; ++i )    {     Result* calculation = //Do Stuff     results[threadIndex].push_back( calculation );    }    WriteBarrier();    if( threadIndex != 0 )     pThread->GoToSleep();    else    {     WaitToFinish( pThread );     Finish();    }  }  void Finish()  {    ReadBarrier();    for( int thread=0; thread<NUM_THREADS; ++thread )    {     for( int i=0; i<100; ++i )     {      results[thread]->UseResultSomehow();     }      results[thread].clear();    }    WriteBarrier();    Start();  }};
Actually, I believe research now shows that the currently best way to do work queuing is to keep one queue per thread. As long as all threads are busy, there is no contention at all. If a thread runs out of work, then it can steal work from another thread -- typically from the *back* of that thread's queue, although it's also possible to do it from the front. One trick in that case is to CAS the element you will dequeue to NULL; if you fail doing that, you fail the dequeue, and loop back. You can do a safe one-writer many-readers queue that way quite simply.

If the rule is that only the main thread adds jobs to the queues, and/or the rule is that only a thread adds jobs to its own queue, then a single-writer multiple-reader FIFO is exactly what you need!
enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement