Lockless FIFO Queue Implementation

Started by
26 comments, last by Hodgman 14 years, 10 months ago
Hey, My longer explanation is outdated, so I'll just say this: I'm working on a lockless queue implementation to use in cross-thread informing of events in a server; the code is below. This queue is designed to work only in the case of one reader, one writer--IE, one and only one thread calling pop(), and one and only one thread calling push(). I *think* I have the problems that were in it worked out, although for now please ignore: 1. The lack of a memory lock in the position specified in push(), and a way to handle creation/deletion of nodes other than the default operators--I'll implement these when I'm sure the logic of the algorithm is right. 2. The possibility of first and last being written to the same cache line and being unreadable at the same time. 3. The current lack of any sort of size() method and the fact that it only stores pointers right now (didn't want to get into error handling for calls to pop() on an empty queue and junk up the code until I got it working at all). 4. The lack of optimization, although of course tips are nice :). Thanks! -Akhilleus
template<typename T>
class Queue
{
public:
	Queue() : last(new QNode(0, true, SAFE, 0)), first(new QNode(0, false, SKIP_NEXT, last))
	{}
	void push(T *data)
	{
		/////////////////////////////////////////////////////////////////////////////////////////////////
		//shared variables read:
		//	(N/A)
		//shared variables written:
		//	prev->tail (if read during write, value read is guaranteed to be meaningful and equal to the
		//				correct or the previous value, as only one bit is actually changed on any write,
		//				because tail is only ever 0 or 1)
		//notes:
		//	1.  the pointer last is never read or written by pop()
		//	2.  the object *last is only accessed by pop() when reading its tail and/or data values
		//	3.  the original object *last is released to full access by pop() by the operation
		//		prev->tail = 0, after which it is never accessed by push() again
		/////////////////////////////////////////////////////////////////////////////////////////////////

		last->next = new QNode(data);
		QNode* prev = last;
		last = const_cast<QNode*>(last->next);
		//memory barrier here so that this line get run last
		prev->tail = 0;
	}
	//unnecessary in the current code, as pop() returns a pointer (NULL if the queue is empty)
	//bool pop_safe()
	//{
	//	return(first->status != SKIP_NEXT || first->next->tail != 1);
	//}
	T *pop()
	{
		/////////////////////////////////////////////////////////////////////////////////////////////////
		//shared variables read:
		//	last->tail (always in the form first->next->tail)
		//shared variables written:
		//	(N/A)
		//notes:
		//	1.  the pointer last is never read or written by pop()
		//	2.  the object *last is only accessed by pop() when reading its value status (atomic)
		//	3.  the original object *last is released to full access by pop() by the atomic operation
		//		prev->status = SAFE, after which it is never accessed by push() again
		/////////////////////////////////////////////////////////////////////////////////////////////////
		while(true)
		{
			if(first->next->tail)
			{
				switch(first->status)
				{
					case(SAFE) :
						//return first's data without popping
						first->status = ALREADY_READ;
						return(first->data);
					case(ALREADY_READ) :
						//return last's data without popping
						first->status = SKIP_NEXT;
						return(first->next->data);
					default: //SKIP_NEXT (first->status never equals TAIL)
						//return nothing, as all values have been popped and the queue is empty
						return(0);
				}
			}
			else //next node is not the tail
			{
				T* to_return;
				QNode* prev;
				switch(first->status)
				{
					case(SAFE) :
						//pop first from the queue
						to_return = first->data;
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
						return(to_return);
					case(ALREADY_READ) :
						//move on to the next node
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
						break;
					default: //SKIP_NEXT
						//set next node to ALREADY_READ and move on to it
						first->next->status = ALREADY_READ;
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
				}
			} //end if
		}
	}
private:
	//classes & constants
	static const char SAFE = 0;
	static const char ALREADY_READ = 1;
	static const char SKIP_NEXT = 2;;
	struct QNode
	{
		QNode(T *d = 0, bool is_tail = true, char node_status = SAFE, QNode* next_node = 0)
			: data(d), tail(is_tail ? 1 : 0), status(node_status), next(next_node)
		{}
		T *data;
		volatile char tail;
		char status;
		QNode volatile *next;
	};
	//local data
	QNode *last;	//WARNING: do not switch declaration order with first (see init. list)
	QNode *first;
};
template<typename T> const char Queue<T>::SAFE;
template<typename T> const char Queue<T>::ALREADY_READ;
template<typename T> const char Queue<T>::SKIP_NEXT;
[Edited by - Akhilleus on June 12, 2009 5:13:45 PM]
Advertisement
The short, sweet, and nice answer is your code is not thread safe, it's not lock-free, and it's definitely not even a queue implementation (what happens if you call push more than once?). [wink]

Accomplishing a lockless data structure is quite tricky and takes quite a bit of low level research and work, especially if you want it to work on both x64 and x86 computer architectures. Material on the topic itself is quite limited, so I don't have any links, but I would suggest do some more extensive research on what lockless/lockfree algorithms are.

The best article to start with that I know of would be: Lockless Programming Considerations for Xbox 360 and Microsoft Windows. You might also be interested in>this thread: Double-check my lock-free voodoo.

Good luck!
Quote:*EDIT: I don't have any code to prevent optimization reordering some of my statements, but I'm hoping that's trivial to implement, so please overlook it for now.
The volatile keyword should stop the compiler from breaking your code with optimisations, but stopping the CPU from doing so is a lot harder (x86 CPUs re-order instructions internally!!!). You need to learn about memory barriers (aka memory fences, aka interlocked operations) to make this safe.


Also, the default implementation of 'new' will probably be grabbing a mutex internally, which defeats the entire purpose of the lock-free queue :/
Try and do all your memory allocation up-front if possible, and then use placement-new if constructors are required.


This series of articles by Herb Sutter should help:
http://www.ddj.com/cpp/210600279
http://www.ddj.com/hpc-high-performance-computing/210604448
http://www.ddj.com/cpp/211601363
http://www.ddj.com/hpc-high-performance-computing/212201163
Quote:The short, sweet, and nice answer is your code is not thread safe, it's not lock-free, and it's definitely not even a queue implementation (what happens if you call push more than once?).


Oops! push() does work more than once, I just seem to have accidentally erased the line resetting the last pointer. Fixed.

But for the thread-safety, it isn't designed to be thread-safe; it's only designed to work with threads if exactly one uses push() and only push, and exactly one uses pop() and only pop(). If it isn't safe for that, could you let me know where it will encounter problems (if it's completely botched, at least the first vulnerability or too so I can see where it went wrong)?

Quote:The volatile keyword should stop the compiler from breaking your code with optimisations, but stopping the CPU from doing so is a lot harder (x86 CPUs re-order instructions internally!!!). You need to learn about memory barriers (aka memory fences, aka interlocked operations) to make this safe.


Also, the default implementation of 'new' will probably be grabbing a mutex internally, which defeats the entire purpose of the lock-free queue :/
Try and do all your memory allocation up-front if possible, and then use placement-new if constructors are required.


Ahh, well I'll definitely do some research on memory barriers then. As for new, I'm aware of the possible problem, but I was planning on defining my own version to pull objects from a preallocated pool, if the Queue code itself is viable.
Quote:Original post by Akhilleus
But for the thread-safety, it isn't designed to be thread-safe; it's only designed to work with threads if exactly one uses push() and only push, and exactly one uses pop() and only pop(). If it isn't safe for that, could you let me know where it will encounter problems (if it's completely botched, at least the first vulnerability or too so I can see where it went wrong)?
The following code is fairly representative of why lock-free queues are very hard to write:
if(first->next == last)//this might be true at the time{//but the value of last may have changed here	if(last->status != ALREADY_READ)//so this line is now wrong


The copy-and-swap (CAS) instruction is the back-bone of a lot of these algorithms. It's basically an atomic (thread-safe) version of "set if equals".
It's implemented in the CPU to be fast and safe, but in high-level code it would look like:
bool CAS( int* target, int oldValue, int newValue ){  bool success = false;  lock a mutex so target can't change unexpectedly  if( *target == oldValue )  {    *target = newValue;    success = true;  }  unlock the mutex  return success;}

Whenever working on data that can be accessed by more than one thread, then CAS comes into play.


The above example of thread unsafe code however can't be fixed with CAS, so it might be better to re-design the algorithm.
One way to fix this part of the current algorithm might be like this (disclaimer: I'm not a lock-free expert, so this is probably buggy too...)
bool firstNextIsLast = false;QNode* copyOfLast = CopyWithBarrier( last );if( first->next == copyOfLast ){  firstNextIsLast = true;  //make some changes to first->next}if( firstNextIsLast ){  QNode* copyOfLast2 = CopyWithBarrier( last );  if( copyOfLast != copyOfLast2 )//uh oh, 'first->next' does not equal 'last' anymore! We edited the wrong node!  {    undo the changes done to first->next    firstNextIsLast = false;  }}


I'd highly recommend reading Herb's articles on lock-free queues instead of trying to fix your existing one though ;)

To quote a MS guy who's name I've forgotten: "Lock-free coding is hard... The kind of 'hard' where 6 months after you think you've finally understood it, you realize your code is completely wrong."
I believe it has been proven that you cannot implement a lockless, fair, linked list with only single-word atomic bus operations (which includes load-reservation store-conditional from PPC as well as your typical CAS instructions).

If you want to be lockless, you end up with what amounts to spinlock, which are not fair, and aren't really lockless in the strict sense (it's not guaranteed that every user will always make progress). This is why people with lockless needs typically end up using lockless FIFOs. The single-reader, single-writer version (with a "head" and "tail" counter) as the simplest to implement, just an atomic increment for a single read or single write, as long as there will only be one thread on each side (in fact, I used that kind of FIFO extensively in BeOS over 10 years ago -- how time flies :-)
enum Bool { True, False, FileNotFound };
I dunno - this is what I use in for my inner-thread implementations. This is a lock-free multi-writer single-writer linked-list based queue.

template<typename T>class InterlockedQueue{	struct Node	{		Node(const T& data, Node * next)			:data(data)			,next(next)			,last(NULL)		{}		T data;		Node * last; //This is only set when this is the end sentinal node.		Node * next;	};public:	typedef T value_type;		InterlockedQueue()		:begin(NULL)		,end(NULL)#ifdef _DEBUG		,isPopping(0)#endif	{		end = new Node(T(), NULL);		begin = new Node(T(), end);		end->last = begin;	}	~InterlockedQueue()	{		while (!empty())		{			pop();		}		delete end;		delete begin;	}	//TODO: Copy / assignment operators.	/*		This function executes an atomic push of data.		The order of data insertion, however, is not guaranteed.			Eg, you can push 1, 2, 3 and get back 2, 3, 1.	*/	void push(const T& data)	{		Node * pushNode = new Node(data, end);		//Swap the last pointer to point at our new node.		Node * last = reinterpret_cast<Node *>(InterlockedExchangePointer(&end->last, pushNode));		//Set the previous node to point at our new node.		InterlockedExchangePointer(&last->next, pushNode);	}	const T& peek() const	{		return begin->next->data;	}	void pop()	{#ifdef _DEBUG		assert(InterlockedCompareExchange(&isPopping, 1, 0) == 0);#endif		Node * prev = reinterpret_cast<Node *>(InterlockedExchangePointer(&begin->next, begin->next->next));		delete prev;#ifdef _DEBUG		isPopping = 0;#endif	}	bool empty() const	{		return (begin->next) == end;	}private:	Node * begin;	Node * end;#ifdef _DEBUG	volatile long isPopping;#endif};


I've tested this on a 4 core machine, with 8 threads writing and 1 thread reading, without any sort of issue - but your mileage may vary.

[size=1]Visit my website, rawrrawr.com

Some more food for thought:
1) Even if your algorithm doesn't use locks, code like this can end up running serially due to locking within the CPU:
class Queue{...Node * first;Node * last;}
'first' and 'last' will likely be placed on the same cache line, and on most CPUs if two threads want write-access to a cache line, then they will invisibly lock it and run serially (not in parallel). Good profiling tools like VTune can detect these events and show you where it is happening.

2) A lot of the time a lock-free queue is overkill for inter-thread communication.
For example, in a multi-writer single-reader situation, if the reader-task can be scheduled to only occur after all writer-tasks are complete, then a wait-free queue can be used instead, which is much simpler and more efficient than a lock-free algorithm.
typedef std::vector< std::vector<Foo> > ThreadLocalQueue;ThreadLocalQueue tlQueue;init:-----tlQueue.resize( numThreads );for each entry in tlQueue as q  q.reserve( queueSize );writer task:------------for each calculation to be done  tlQueue[currentThreadIdx].push_back( Foo(calculation) );WriteBarrier();reader task:------------ReadBarrier();for each entry in tlQueue as q  for each entry in q as foo    do something with foo  q.clear();
Quote:Original post by bobofjoe
I dunno - this is what I use in for my inner-thread implementations. This is a lock-free multi-writer single-writer linked-list based queue.

*** Source Snippet Removed ***

I've tested this on a 4 core machine, with 8 threads writing and 1 thread reading, without any sort of issue - but your mileage may vary.


I don't see how this can work right if there's a queue with one element, and one thread does a push() while another does a pop().

1. Popper reads begin->next->next (which is "end"), and gets pre-empted.
2. Pusher inserts a new element, so begin->next->next is now the new element.
3. Popper sets begin->next to "end," thus losing the recently inserted element.

When you tested with "no issues," did you actually keep track of the number of nodes added, the number of nodes removed, and compare the difference to the number of nodes in the list, after running for a while? I would expect there to be missing (leaked) nodes.
enum Bool { True, False, FileNotFound };
Okay, I've tried to take all the errors (both specific and in the overall design) pointed out here to rewrite the code, except that I unfortunately haven't had time to address the serial line problem mentioned by Hodgman. I realize it would probably be better to find a pre-implemented queue with the properties I want, or to follow one of the online to tutorials to make one, but now that I'm learning about the problems in designing one it's too interesting not to try and complete. I have been reading articles and information about the concept and related ones, however, as you all suggested.

The new code is in my first post replacing the old code now. I went ahead and fixed the problem whereby the code was relying on assignments to a pointer to be atomic; now pop() NEVER accesses last, even to read (likewise, push() never accesses first). Instead, I've changed the way QNode deals with statuses; one member holds whether the node is safe, already read, or whether it and the next node are both already read (as it was in the old code). There is also a second member, tail, that initializes to 1, indicating that the node is the tail, and is changed to zero by push() when another node is pushed onto it. This is the value that pop() uses to determine whether a node is the tail, instead of comparing that node's address with last. Because tail is always either 0 or 1 (it's a char, not a bool, so I ensure that numerical implementation myself), only one bit ever changes. So if push() is interrupted by pop() when the value is only half-written (if for some reason the machine isn't using aligned chars), pop() will access it but still get a valid value (the current value, if the 1 has already been written, or the previous value, if not). Obviously the only side effect of this is that pop() will in some cases think it's reached the end of the list when actually there's one more node in the process of being added, which is an unimportant side effect.

Again, I'm still very appreciative of comments, suggestions, and errors being pointed out.

This topic is closed to new replies.

Advertisement