Here's an implementation of a lock-free multi-reader/multi-writer array-based queue

Started by
21 comments, last by Prune 15 years, 5 months ago
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue

And can you go back and edit your initial post so that it doesn't stretch the page out so widely? [smile] The source boxes shouldn't stretch out like that, but they are. Try breaking up the "TODO" comments into multiple lines and placing some of the trailing comments on their own line. That might resolve the stretching.
"I thought what I'd do was, I'd pretend I was one of those deaf-mutes." - the Laughing Man
Advertisement
Quote:Original post by LessBread
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue

If you look at page 4, where he references fully non-blocking queues, he refers to Michael and Scott's algorithm, which I also referred to above when I mentioned linked-list-based queues; that algorithm is also discussed in one of the verification papers I mentioned.
For both array and linked-list implementations, one appears to require a DCAS, but Sutter is in fact mistaken--it's not necessary. See the single-word CAS paper I referenced earlier (I'm going to forward that reference to Sutter). Now, this is a bit of a trick since, though this last example is an array-based queue, it does use a linked list for the LL/SC emulation, but the efficiency impact is small--indeed, this approach is actually faster than DCAS on systems with more than a few processors.

Quote:And can you go back and edit your initial post so that it doesn't stretch the page out so widely? [smile] The source boxes shouldn't stretch out like that, but they are. Try breaking up the "TODO" comments into multiple lines and placing some of the trailing comments on their own line. That might resolve the stretching.

Done.

[Edited by - Prune on November 17, 2008 3:43:20 PM]
"But who prays for Satan? Who, in eighteen centuries, has had the common humanity to pray for the one sinner that needed it most?" --Mark Twain

~~~~~~~~~~~~~~~Looking for a high-performance, easy to use, and lightweight math library? http://www.cmldev.net/ (note: I'm not associated with that project; just a user)
Cool. Hopefully Sutter will respond.

Thanks for the formatting clean up (there's more to be done but of secondary importance).

"I thought what I'd do was, I'd pretend I was one of those deaf-mutes." - the Laughing Man
Quote:Original post by LessBread
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue
I can't speak for Prune's implementation, but Sutter teaches us that locality is very important for concurrency. Therefore I was confused by his choice of a linked-list over an array when implementing the queue in his article - wouldn't an array be much more cache friendly?
I posted it on Sutter's blog.

Interestingly, Intel's Compiler doesn't seem to have an intrinsic for the double quad-word CAS cmpxchg16b, even though MSVC does (_InterlockedCompareExchange128), though MSDN warns that it's slow. Perhaps this omission is an indication Intel might drop that instruction from future x86-64 CPUs (there have been rumors going back to 2004) :(
Itanium only has cmp8xchg16 (_InterlockedCompare64Exchange128) which, though it swaps two quad-words, only compares one, so it's basically useless for this algorithm, and on IA64 the Evequoz algorithm would need to be used. Given the slowness consideration about cmpxchg16b, that algorithm might actually be comparable in speed on x86-64 as well.

Bleh, linking to other stuff that uses the intrinsics, such as SDL, gives

"error C2733: second C linkage of overloaded function '_InterlockedCompareExchange' not allowed"

So I had to change the arguments of the interlocked intrinsics in processor.h back to long from unsigned long.

[Edited by - Prune on November 17, 2008 6:45:09 PM]
"But who prays for Satan? Who, in eighteen centuries, has had the common humanity to pray for the one sinner that needed it most?" --Mark Twain

~~~~~~~~~~~~~~~Looking for a high-performance, easy to use, and lightweight math library? http://www.cmldev.net/ (note: I'm not associated with that project; just a user)
Going back to your sizeof(T)==4 restriction - AFAIK this isn't required.

The only variables that need to be atomic (i.e. need to use interlocked exchange) are the head/tail variables - the actual data array shouldn't require any special protection. See Suttors words on "do work, then publish" - the actual data array is the work, and head/tail are used to publish.

My FIFO uses a special Index class for the head/tail vars. This class uses 32-bit Compare And Swap and allows the head/tail vars to be incremented while making sure that they cannot increment past each other (turning a full queue into an empty queue by accident). There's also some hackery to avoid the ABA problem. Locking is also provided to allow the array to be resized. 16bits are given to the index, 15 to "solving" ABA, and 1 bit acts as a mutex.

N.B. The TAtomic class wraps up the CAS operation with the function SetIfEqual(new,old).
//copyright 2008 Hodgman - Free for personal or educational use only ;)	enum IncrementType	{		Pre,		Post	};	/** @brief Integer type that maintains a counter of modifications made.	 * The Counted Index is designed to avoid the "ABA problem" in lock-free code.	 * It can also act as a fast user-space mutex.	 * @todo finish documenting this class	 */	class CCountedIndex	{	public:		CCountedIndex(uint32 v=0) : i(v) {}		CCountedIndex(const CCountedIndex& o) : i(o.i) {}		/** Tries to lock the mutex bit		 * @return true if the mutex was locked, else false */		bool Lock()		{			uint v = i;			if( v & LockMask )				return false;			return i.SetIfEqual( v | LockMask, v ); //set lock		}		/** Unlocks the mutex bit. The mutex is assumed to be locked prior to calling. */		void Unlock()		{			uint v = i;			ASSERT( v & LockMask );			i = v & ~LockMask;                      //remove lock		}		/** Assigns a new value to the index bits. No thread-safety checks! */		void Set( uint32 index )		{			uint v = i;			ASSERT( v & LockMask );			uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter			         ((index)&IndexMask)            | //set index			         LockMask;                        //keep lock			i = n;		}		/** Increment the index bits. May fail due to concurrent modifications.		 * @param limit One past the maximum index value. Index will wrap to 0 if limit is reached.		 * @return true if the index was incremented, else false */		bool Increment( uint32 limit )		{			uint v = i;			if( v & LockMask )				return false;			uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter			         Inc(v&IndexMask,limit);          //increment index			return i.SetIfEqual( n, v );		}		/** . */		template<IncrementType T, bool L>		uint32 Increment( uint32 limit, uint32 full, bool& fail )		{			uint32 v = i;			ASSERT( !L || (v & LockMask) )			if( !L && (v & LockMask) )			{				return std::numeric_limits<uint32>::max();			}			uint oldIndex = v&IndexMask;			uint newIndex = Inc(oldIndex, limit);          //increment index			fail = (newIndex == full);			if( fail )				return std::numeric_limits<uint32>::max();			int n = newIndex | ((v+(1<<IndexBits))&CountMask) |//increment counter				 (L?LockMask:0);			return i.SetIfEqual( n, v )				? (T==Pre?oldIndex:newIndex)				: std::numeric_limits<uint32>::max();		}		/** . */		template<IncrementType T, bool L>		uint32 Increment( uint32 limit, const CCountedIndex& original )		{			uint32 v = i;			ASSERT( !L || (v & LockMask) )			if( v != original.i || (!L&&(v & LockMask)) )				return std::numeric_limits<uint32>::max();			uint oldIndex = v&IndexMask;			uint newIndex = Inc(oldIndex, limit);   //increment index			int n = newIndex |			        ((v+(1<<IndexBits))&CountMask)| //increment counter			        (L?LockMask:0);			return i.SetIfEqual( n, v )				? (T==Pre?oldIndex:newIndex)				: std::numeric_limits<uint32>::max();		}		/** . */		void IncrementAndUnlock( uint32 limit )		{			uint v = i;			ASSERT( v & LockMask );			uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter			         Inc(v&IndexMask,limit);          //increment index			i = n;		}		/** . */		bool IncrementAndLock( uint32 limit )		{			uint v = i;			if( v & LockMask )				return false;			int n = ((v+(1<<IndexBits))&CountMask) | //increment counter			        Inc(v&IndexMask,limit)         | //increment index			        LockMask;                        //set lock			return i.SetIfEqual( n, v );		}		/** . */		operator uint32() { return i&IndexMask; }		/** . */		bool operator==( uint32 j ) { return (uint32(i)&IndexMask) == j; }	private:		CCountedIndex& operator=(const CCountedIndex& o);		inline int Inc(uint32 i, uint32 size){ ASSERT(i<IndexMask);return (i+1==size)?0:i+1; }		TAtomic<uint32> i;		const static uint IndexMask = 0x0000FFFF;		const static uint CountMask = 0x7FFF0000;		const static uint LockMask  = 0x80000000;		const static uint IndexBits = 16;		const static uint CountBits = 15;		const static uint LockBits  = 1;	};
Quote:Original post by Hodgman
I can't speak for Prune's implementation, but Sutter teaches us that locality is very important for concurrency. Therefore I was confused by his choice of a linked-list over an array when implementing the queue in his article - wouldn't an array be much more cache friendly?


Well if you have two threads writing to the same cache line often (or simply one reading and one writing), that's very bad for concurrency. By putting the elements in a list, you reduce the chance of this happening. See ~1:18:40 in http://video.google.com/videoplay?docid=-4714369049736584770.
Hodgman, I don't see how you'd handle the following situation:
You have a queue as follows: 00X0000 (X represents one element, 0 empty).
Counting from 0, head is at 2 and tail at 3.
Three threads initiate writes. The tail is incremented atomically three times to 6; the threads publish 3, 4, and 5 correspondingly for work. Then they are simultaneously writing into those locations. A reader thread can read locations 3-5 before their writes have completed, which is incorrect behavior.

You could do something with two heads and two tails, but then the code complexity becomes equivalent to the modified Shann algorithm I used.

Note also that there are several formally proved properties about that algorithm's correctness, meeting invariants of the definition of a queue, linearizability, and being fully non-blocking, which is what decided the issue for me. Finally, a 32-bit CAS can be used if you instead consider one 16 bit word to be the reference counter and the other, instead of a pointer or whatever, to be a 16 bit index into an array (or some other subdivision of the 32 bits).

the_edd, one could use a strided "array" instead with elements spaced at least one cache line apart :P

BTW I'm really having a problem with the signed/unsigned thing. How do I avoid having to put reinterpret_cast for the first argument of every _InterlockedExchange??() ? (plus, would the standard conversion preserve bit patterns of the other, non-pointer, arguments? I think this would be the case on 2's complement architectures)
"But who prays for Satan? Who, in eighteen centuries, has had the common humanity to pray for the one sinner that needed it most?" --Mark Twain

~~~~~~~~~~~~~~~Looking for a high-performance, easy to use, and lightweight math library? http://www.cmldev.net/ (note: I'm not associated with that project; just a user)
Quote:Original post by Prune
Hodgman, I don't see how you'd handle the following situation:
You have a queue as follows: 00X0000 (X represents one element, 0 empty).
Counting from 0, head is at 2 and tail at 3.
Three threads initiate writes. The tail is incremented atomically three times to 6; the threads publish 3, 4, and 5 correspondingly for work. Then they are simultaneously writing into those locations. A reader thread can read locations 3-5 before their writes have completed, which is incorrect behavior.
Sorry, I forgot to mention that my array is a structure like this:
struct Node{	T data;	CAtomic valid;	Node():valid(0){}	Node(const Node&o):data(o.data),valid(o.valid){}};TAtomic<Node*> array;CCountedIndex  head, tail;
The 'valid' variable within the array is used as a flag to indicate whether the write has competed yet or not.
If a pop operation occurs on a node that is not yet valid, then it has to wait for the writer to complete it's operation (and set the flag).
Push looks something like:
uint32 limit = 7;//size of arrayuint32 full = head;bool isFull;uint32 write = tail.Increment<Pre,false>( limit, full, isFull );if( write != std::numeric_limits<uint32>::max() )//increment succeeded{	array[write].data = v;//do work	array[write].valid = 1;//then publish	break;//this snippet is inside a while(true) loop}else if( isFull )//increment failed because array is full{	//try to lock head+tail and resize array}
But this is in essence a spin-lock, so your algorithm cannot be considered lock-free. Can you give guarantees that lock convoys, priority inversion, livelock/deadlock, etc. can never occur?
"But who prays for Satan? Who, in eighteen centuries, has had the common humanity to pray for the one sinner that needed it most?" --Mark Twain

~~~~~~~~~~~~~~~Looking for a high-performance, easy to use, and lightweight math library? http://www.cmldev.net/ (note: I'm not associated with that project; just a user)

This topic is closed to new replies.

Advertisement