Critique my threadsafe lockless queue!

Started by
43 comments, last by Dmytry 14 years, 7 months ago
I'm practically a complete noob at multithreaded programming, but I'm trying to learn. So I attempted to build a threadsafe queue. It's intended to support multiple writers and a single reader. I want it to work on x86 platforms, and it be even better if it worked on PPC (Xbox 360 and whatnot) even with all that CPU's crazy instruction reordering (one particular spot that worries me is whether the middle line inside Push() can get reordered AFTER the final increment of m_writer). So, what is wrong with it? What haven't I accounted for? How can I make it better/faster/cheaper?
template <typename T, int MAX_SIZE = 1024>
class MyQueue
{
	T m_stuff[MAX_SIZE];

	// index of the next item that will be popped
	LONG m_reader;

	// index of the most recent completed write
	LONG m_writer;

	// index of the next available spot for writing
	LONG m_nextWrite;

public:

	MyQueue()
	{
		m_reader = 0;
		m_writer = 0;
		m_nextWrite = 0;
	}

	// attempt a push. Returns true if successfull, or false if the queue is full.
	bool Push(T t)
	{
		// make sure we don't overwrite old values
		if (GetCount() < MAX_SIZE - 1)
		{
			// reserve a spot for writing
			LONG spotToWrite = InterlockedIncrement(&m_nextWrite) % MAX_SIZE;

			// do the write
			m_stuff[spotToWrite] = t;

			// update the "write" cursor
			InterlockedIncrement(&m_writer);

			return true;
		}

		return false;
	}

	// pop a value and return true, or return false if nothing left to pop
	// (this only supports single readers)
	bool Pop(T& t)
	{
		if (GetCount() > 0)
		{
			t = m_stuff[m_reader % MAX_SIZE];
			m_reader++;
			return true;
		}

		return false;
	}

	int GetCount() { return m_writer - m_reader; }

private:

	static LONG InterlockedIncrement(volatile LONG* target)
	{
		LONG start, newValue;
		do
		{
			start = *target;
			newValue = start + 1;
		} while (InterlockedCompareExchange(target, newValue, start) != start);

		return start;
	}
};
I like the DARK layout!
Advertisement
Any particular reason you don't use the built-in Windows InterlockedIncrement function? Also, I don't see anything that guarantees that your values are aligned on a 32-bit boundary, which is necessary for the built-in interlocked functions to work correctly.

Your push function is not safe; suppose a thread is interrupted between the "do the write" step and the "update the write cursor" step. Since you want multiple writers, it is possible that two writers stomp on each other in this function.

Pop() may also give incorrect results if it is interrupted during the GetCount() check. Even though you just want a single reader there's a chance things can go wrong if the timing is just so.


You can find a working, debugged implementation of a lockless queue in the Epoch language SDK. The relevant files are found in Source/Shared/Utility/Threading.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Quote:Original post by ApochPiQ
Any particular reason you don't use the built-in Windows InterlockedIncrement function?

Hmm, I suppose not. The only difference between my function and the Windows function is that I return the final value - 1, but the Windows function returns the final value. I think I could easily code around the difference.

Quote:Also, I don't see anything that guarantees that your values are aligned on a 32-bit boundary, which is necessary for the built-in interlocked functions to work correctly.

Aren't normal integers always aligned to 32-bit boundaries by default? If not, how would I align them? Using the __declspec(align(#)) stuff?

Quote:Your push function is not safe; suppose a thread is interrupted between the "do the write" step and the "update the write cursor" step. Since you want multiple writers, it is possible that two writers stomp on each other in this function.

Pop() may also give incorrect results if it is interrupted during the GetCount() check. Even though you just want a single reader there's a chance things can go wrong if the timing is just so.

But the code gets the index to write to first before it does the write, so each thread should be guaranteed to be writing to a unique index (right?). The next InterlockedIncrement() call after the write is just for the benefit of the reader thread, not the writers.

I guess there is a chance that GetCount() could be incorrect, but worst case it should only be less than the actual correct value. So the worst that could happen in Pop() is that it incorrectly thinks the queue is full. But would probably only happen when the queue is nearly full anyway.

Still, now I see a problem in Push(). Several threads could get past the "make sure we don't overwrite old values" check and end up overwriting values before the reader has a chance to read them. (I'm not sure the code was clear, but my goal with that "make sure we don't overwrite old values" check was to prevent the write cursor getting too far ahead of the read cursor and loop back around and start overwriting values written earlier, not to prevent multiple threads from overwriting data should they all get into the Push() method at the same time. That's what the "reserve a spot for writing" part is for.)

Quote:You can find a working, debugged implementation of a lockless queue in the Epoch language SDK. The relevant files are found in Source/Shared/Utility/Threading.

Wow, I was hoping I could get away with something really simple. Guess I have some studying to do.

Thanks!
I like the DARK layout!
Quote:Original post by BradDaBug

But the code gets the index to write to first before it does the write, so each thread should be guaranteed to be writing to a unique index (right?). The next InterlockedIncrement() call after the write is just for the benefit of the reader thread, not the writers.


Quote:
bool Push(T t){  // make sure we don't overwrite old values  if (GetCount() < MAX_SIZE - 1)  {    // reserve a spot for writing    LONG spotToWrite = InterlockedIncrement(&m_nextWrite) % MAX_SIZE;    // do the write    m_stuff[spotToWrite] = t;    // update the "write" cursor    InterlockedIncrement(&m_writer);

Let MAX_SIZE be 3. Let queue be empty.

4 threads push();
On 4-core PC, they all call getCount() and determine there is enough room (0 elements, MAX_SIZE=3).

Each thread obtains spotToWrite.
Thread    Spot 1         0 2         1 3         2 4         0


Threads 1 and 4 now race for spot 0.

After all 4 threads push, the size of queue is 4, but MAX_SIZE is 3. So queue contains one element more than it can hold...


One possible flow:
CPU Tick   Thrd1             Thrd2           Thrd3           Thrd4   1      GetCount()       GetCount()      GetCount()     GetCount()   2                     InterlockedInc   3                                                      InterlockedInc   4                      m_stuff[] = t;  InterlockedInc   5      InterlockedInc                  m_stuff[] = t;  m_stuff[] = t;   6      m_stuff[] = t;


Always remember that without locks, you have no order guarantees.

And this is just 50,000 feet view. It doesn't take into consideration memory or ordering issues and whatnot....
I've been trying to tweak stuff to fix that bug in Push(), and the solution I'm looking at right now involves adding an extra CAS function in Push() and an extra one in Pop(). At what point does the overhead of all those CAS functions start to add up to the point where there's no advantage over a simple mutex?
I like the DARK layout!
Here's my second attempt. Ignore what I said earlier about the extra CAS functions. This just avoids the overwriting old values bug in Push() by checking if the queue is full AFTER the m_nextWrite increment. If the queue is full then it un-increments m_nextWrite and bails out.

template <typename T, int MAX_SIZE = 1024>class MyQueue{	T m_stuff[MAX_SIZE];	// index of the next item that will be popped	LONG m_reader;	// index of the most recent completed write	LONG m_writer;	// index of the next available spot for writing	LONG m_nextWrite;public:	MyQueue()	{		m_reader = 0;		m_writer = 0;		m_nextWrite = 0;	}	// attempt a push. Returns true if successfull, or false if the queue is full.	bool Push(T t)	{		// reserve a spot for writing		LONG spotToWrite = InterlockedIncrement(&m_nextWrite) - 1;		// check if the queue is full		// (it's possible for m_reader to be bigger IRL than the value we get		// here, but worst case is the queue will appear full when it isn't)		if (spotToWrite - m_reader >= MAX_SIZE - 1)		{			// queue is full, so back out			InterlockedDecrement(&m_nextWrite);			return false;		}		// do the write		m_stuff[spotToWrite % MAX_SIZE] = t;		// update the "write" cursor		InterlockedIncrement(&m_writer);		return true;	}	// pop a value and return true, or return false if nothing left to pop	// (this only supports single readers)	bool Pop(T& t)	{		if (GetCount() > 0)		{			t = m_stuff[m_reader % MAX_SIZE];			m_reader++;			return true;		}		return false;	}	int GetCount() { return m_writer - m_reader; }};

Edit: fixed a tiny bug

[Edited by - BradDaBug on September 10, 2009 9:05:01 AM]
I like the DARK layout!
Quote:
		LONG spotToWrite = InterlockedIncrement(&m_nextWrite) - 1;		if (spotToWrite - m_reader >= MAX_SIZE - 1)		{			// queue is full, so back out			InterlockedDecrement(&m_nextWrite);			return false;		}


Let MAX_SIZE be 3.
Let queue have 2 elements in it.
m_reader = 0
spotToWrite = 2

3 threads on 3 cores push concurrently.
1 thread on separate core reads.

IDX = spotToWrite
Tick     T1                T2               T3                   T4 1      IDX=2            IDX=3             IDX=4 2      IDX-m_reader=2                                      pop/m_reader++ 3      mstuff[2] = t    IDX-m_reader=2   IDX-m_reader=3 4                       mstuff[2] = t    return

Oops. T2 overwrote the value of T1.

The only way to solve this is to spin on both, push as well as pop.

Simply put, instead of increment/decrement, the solution would be to make both push and pop spin on InterlockedExchange. But that is only part of problem.


A more annoying problem is how to solve the case when queue is full. If push keeps spinning, then it has the potential of stalling, even dead-locking all threads that attempt to push, until an element is popped.

Typically, with fixed-size queues, one needs to make operations optional. Pseudo writer push code would look something like this:
void someMainLoop() {  vector<int> buffer;  locklessqueue<int> q;  while (true) { // the logic, ....    while (!buffer.empty() && q.push(buffer.front()) buffer.pop_front();    ...    // stuff to put on queue    buffer.push_back(...)  }};


Why all the mess. Queue might be full. We can either wait until it's empty, or we simply keep going, hoping it will be available later.

So we keep stuff to put on queue privately, in regular container. Every once in a while, we try to put as much data as possible on the queue. If queue is full, we buffer the data for a while.


Many lock-free/wait-free/lock-less queues use dynamic structure for this very reason. Limiting the size is quite tricky from design perspective, unless one allows push() to fail. Of course, dynamic memory allocation has its own set of problems, and typically requires thread-safe lock-less allocator, or incredibly complex hazard pointers.
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.
Quote:Original post by phresnel
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.


Tend to disagree.
Following this reasoning would mean that a lockless algorithm cannot exist, by definition.

A lockless algorithm has to use atomic operations, otherwise he'd have to use mutexes, critical sections, spin-locks, or something similar - making it non-lockless, eventually :).
Quote:Original post by tivolo
Quote:Original post by phresnel
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.


Tend to disagree.
Following this reasoning would mean that a lockless algorithm cannot exist, by definition.

A lockless algorithm has to use atomic operations, otherwise he'd have to use mutexes, critical sections, spin-locks, or something similar - making it non-lockless, eventually :).


Both of you are correct in my opinion. Captain Obvious would disagree, but lockless (or lockfree, more correctly) does not actually mean that no locks are used. Or that no atomic operations are used. It just means progress will always be made on the algorithm regardless of the state of any of the threads working on the problem.

Technically, you do have a lock if you do an atomic operation. Heck, the x86 assembly primitive instruction for this is even called "lock". But this doesn't preclude it from being used in a lockfree algorithm, because the guarantees of this particular type of lock are such that progress will always be made and hence it is a lock-free algorithm. It's actually a wait-free algorithm in this case, which is stronger than a lock-free algorithm. Technically, in a lock-free algorithm that's not wait-free, you're more than welcome to block as many threads as you want indefinitely using any synchronization primitive you can think of, it just has to be such that the other threads can pick up the work.

This topic is closed to new replies.

Advertisement