Jump to content
  • Advertisement
Sign in to follow this  
BradDaBug

Critique my threadsafe lockless queue!

This topic is 3227 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I'm practically a complete noob at multithreaded programming, but I'm trying to learn. So I attempted to build a threadsafe queue. It's intended to support multiple writers and a single reader. I want it to work on x86 platforms, and it be even better if it worked on PPC (Xbox 360 and whatnot) even with all that CPU's crazy instruction reordering (one particular spot that worries me is whether the middle line inside Push() can get reordered AFTER the final increment of m_writer). So, what is wrong with it? What haven't I accounted for? How can I make it better/faster/cheaper?
template <typename T, int MAX_SIZE = 1024>
class MyQueue
{
	T m_stuff[MAX_SIZE];

	// index of the next item that will be popped
	LONG m_reader;

	// index of the most recent completed write
	LONG m_writer;

	// index of the next available spot for writing
	LONG m_nextWrite;

public:

	MyQueue()
	{
		m_reader = 0;
		m_writer = 0;
		m_nextWrite = 0;
	}

	// attempt a push. Returns true if successfull, or false if the queue is full.
	bool Push(T t)
	{
		// make sure we don't overwrite old values
		if (GetCount() < MAX_SIZE - 1)
		{
			// reserve a spot for writing
			LONG spotToWrite = InterlockedIncrement(&m_nextWrite) % MAX_SIZE;

			// do the write
			m_stuff[spotToWrite] = t;

			// update the "write" cursor
			InterlockedIncrement(&m_writer);

			return true;
		}

		return false;
	}

	// pop a value and return true, or return false if nothing left to pop
	// (this only supports single readers)
	bool Pop(T& t)
	{
		if (GetCount() > 0)
		{
			t = m_stuff[m_reader % MAX_SIZE];
			m_reader++;
			return true;
		}

		return false;
	}

	int GetCount() { return m_writer - m_reader; }

private:

	static LONG InterlockedIncrement(volatile LONG* target)
	{
		LONG start, newValue;
		do
		{
			start = *target;
			newValue = start + 1;
		} while (InterlockedCompareExchange(target, newValue, start) != start);

		return start;
	}
};

Share this post


Link to post
Share on other sites
Advertisement
Any particular reason you don't use the built-in Windows InterlockedIncrement function? Also, I don't see anything that guarantees that your values are aligned on a 32-bit boundary, which is necessary for the built-in interlocked functions to work correctly.

Your push function is not safe; suppose a thread is interrupted between the "do the write" step and the "update the write cursor" step. Since you want multiple writers, it is possible that two writers stomp on each other in this function.

Pop() may also give incorrect results if it is interrupted during the GetCount() check. Even though you just want a single reader there's a chance things can go wrong if the timing is just so.


You can find a working, debugged implementation of a lockless queue in the Epoch language SDK. The relevant files are found in Source/Shared/Utility/Threading.

Share this post


Link to post
Share on other sites
Quote:
Original post by ApochPiQ
Any particular reason you don't use the built-in Windows InterlockedIncrement function?

Hmm, I suppose not. The only difference between my function and the Windows function is that I return the final value - 1, but the Windows function returns the final value. I think I could easily code around the difference.

Quote:
Also, I don't see anything that guarantees that your values are aligned on a 32-bit boundary, which is necessary for the built-in interlocked functions to work correctly.

Aren't normal integers always aligned to 32-bit boundaries by default? If not, how would I align them? Using the __declspec(align(#)) stuff?

Quote:
Your push function is not safe; suppose a thread is interrupted between the "do the write" step and the "update the write cursor" step. Since you want multiple writers, it is possible that two writers stomp on each other in this function.

Pop() may also give incorrect results if it is interrupted during the GetCount() check. Even though you just want a single reader there's a chance things can go wrong if the timing is just so.

But the code gets the index to write to first before it does the write, so each thread should be guaranteed to be writing to a unique index (right?). The next InterlockedIncrement() call after the write is just for the benefit of the reader thread, not the writers.

I guess there is a chance that GetCount() could be incorrect, but worst case it should only be less than the actual correct value. So the worst that could happen in Pop() is that it incorrectly thinks the queue is full. But would probably only happen when the queue is nearly full anyway.

Still, now I see a problem in Push(). Several threads could get past the "make sure we don't overwrite old values" check and end up overwriting values before the reader has a chance to read them. (I'm not sure the code was clear, but my goal with that "make sure we don't overwrite old values" check was to prevent the write cursor getting too far ahead of the read cursor and loop back around and start overwriting values written earlier, not to prevent multiple threads from overwriting data should they all get into the Push() method at the same time. That's what the "reserve a spot for writing" part is for.)

Quote:
You can find a working, debugged implementation of a lockless queue in the Epoch language SDK. The relevant files are found in Source/Shared/Utility/Threading.

Wow, I was hoping I could get away with something really simple. Guess I have some studying to do.

Thanks!

Share this post


Link to post
Share on other sites
Quote:
Original post by BradDaBug

But the code gets the index to write to first before it does the write, so each thread should be guaranteed to be writing to a unique index (right?). The next InterlockedIncrement() call after the write is just for the benefit of the reader thread, not the writers.


Quote:
bool Push(T t)
{
// make sure we don't overwrite old values
if (GetCount() < MAX_SIZE - 1)
{
// reserve a spot for writing
LONG spotToWrite = InterlockedIncrement(&m_nextWrite) % MAX_SIZE;

// do the write
m_stuff[spotToWrite] = t;

// update the "write" cursor
InterlockedIncrement(&m_writer);

Let MAX_SIZE be 3. Let queue be empty.

4 threads push();
On 4-core PC, they all call getCount() and determine there is enough room (0 elements, MAX_SIZE=3).

Each thread obtains spotToWrite.
Thread    Spot
1 0
2 1
3 2
4 0


Threads 1 and 4 now race for spot 0.

After all 4 threads push, the size of queue is 4, but MAX_SIZE is 3. So queue contains one element more than it can hold...


One possible flow:

CPU Tick Thrd1 Thrd2 Thrd3 Thrd4
1 GetCount() GetCount() GetCount() GetCount()
2 InterlockedInc
3 InterlockedInc
4 m_stuff[] = t; InterlockedInc
5 InterlockedInc m_stuff[] = t; m_stuff[] = t;
6 m_stuff[] = t;


Always remember that without locks, you have no order guarantees.

And this is just 50,000 feet view. It doesn't take into consideration memory or ordering issues and whatnot....

Share this post


Link to post
Share on other sites
I've been trying to tweak stuff to fix that bug in Push(), and the solution I'm looking at right now involves adding an extra CAS function in Push() and an extra one in Pop(). At what point does the overhead of all those CAS functions start to add up to the point where there's no advantage over a simple mutex?

Share this post


Link to post
Share on other sites
Here's my second attempt. Ignore what I said earlier about the extra CAS functions. This just avoids the overwriting old values bug in Push() by checking if the queue is full AFTER the m_nextWrite increment. If the queue is full then it un-increments m_nextWrite and bails out.

template <typename T, int MAX_SIZE = 1024>
class MyQueue
{
T m_stuff[MAX_SIZE];

// index of the next item that will be popped
LONG m_reader;

// index of the most recent completed write
LONG m_writer;

// index of the next available spot for writing
LONG m_nextWrite;

public:

MyQueue()
{
m_reader = 0;
m_writer = 0;
m_nextWrite = 0;
}

// attempt a push. Returns true if successfull, or false if the queue is full.
bool Push(T t)
{
// reserve a spot for writing
LONG spotToWrite = InterlockedIncrement(&m_nextWrite) - 1;

// check if the queue is full
// (it's possible for m_reader to be bigger IRL than the value we get
// here, but worst case is the queue will appear full when it isn't)
if (spotToWrite - m_reader >= MAX_SIZE - 1)
{
// queue is full, so back out
InterlockedDecrement(&m_nextWrite);
return false;
}

// do the write
m_stuff[spotToWrite % MAX_SIZE] = t;

// update the "write" cursor
InterlockedIncrement(&m_writer);

return true;
}

// pop a value and return true, or return false if nothing left to pop
// (this only supports single readers)
bool Pop(T& t)
{
if (GetCount() > 0)
{
t = m_stuff[m_reader % MAX_SIZE];
m_reader++;

return true;
}

return false;
}

int GetCount() { return m_writer - m_reader; }

};


Edit: fixed a tiny bug

[Edited by - BradDaBug on September 10, 2009 9:05:01 AM]

Share this post


Link to post
Share on other sites
Quote:
		LONG spotToWrite = InterlockedIncrement(&m_nextWrite) - 1;

if (spotToWrite - m_reader >= MAX_SIZE - 1)
{
// queue is full, so back out
InterlockedDecrement(&m_nextWrite);
return false;
}


Let MAX_SIZE be 3.
Let queue have 2 elements in it.
m_reader = 0
spotToWrite = 2

3 threads on 3 cores push concurrently.
1 thread on separate core reads.

IDX = spotToWrite

Tick T1 T2 T3 T4
1 IDX=2 IDX=3 IDX=4
2 IDX-m_reader=2 pop/m_reader++
3 mstuff[2] = t IDX-m_reader=2 IDX-m_reader=3
4 mstuff[2] = t return

Oops. T2 overwrote the value of T1.

The only way to solve this is to spin on both, push as well as pop.

Simply put, instead of increment/decrement, the solution would be to make both push and pop spin on InterlockedExchange. But that is only part of problem.


A more annoying problem is how to solve the case when queue is full. If push keeps spinning, then it has the potential of stalling, even dead-locking all threads that attempt to push, until an element is popped.

Typically, with fixed-size queues, one needs to make operations optional. Pseudo writer push code would look something like this:
void someMainLoop() {
vector<int> buffer;
locklessqueue<int> q;

while (true) { // the logic, ....
while (!buffer.empty() && q.push(buffer.front()) buffer.pop_front();

...
// stuff to put on queue
buffer.push_back(...)
}
};


Why all the mess. Queue might be full. We can either wait until it's empty, or we simply keep going, hoping it will be available later.

So we keep stuff to put on queue privately, in regular container. Every once in a while, we try to put as much data as possible on the queue. If queue is full, we buffer the data for a while.


Many lock-free/wait-free/lock-less queues use dynamic structure for this very reason. Limiting the size is quite tricky from design perspective, unless one allows push() to fail. Of course, dynamic memory allocation has its own set of problems, and typically requires thread-safe lock-less allocator, or incredibly complex hazard pointers.

Share this post


Link to post
Share on other sites
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:
MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.

Share this post


Link to post
Share on other sites
Quote:
Original post by phresnel
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:
MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.


Tend to disagree.
Following this reasoning would mean that a lockless algorithm cannot exist, by definition.

A lockless algorithm has to use atomic operations, otherwise he'd have to use mutexes, critical sections, spin-locks, or something similar - making it non-lockless, eventually :).

Share this post


Link to post
Share on other sites
Quote:
Original post by tivolo
Quote:
Original post by phresnel
(no offense ;))
Unfortunately, it can't be lockless when "Interlocked(Inc|Dec)rement()" is an

Quote:
MSDN
atomic operation.


That is, if multiple threads do an atomic operation on shared data, then you have a lock.


Tend to disagree.
Following this reasoning would mean that a lockless algorithm cannot exist, by definition.

A lockless algorithm has to use atomic operations, otherwise he'd have to use mutexes, critical sections, spin-locks, or something similar - making it non-lockless, eventually :).


Both of you are correct in my opinion. Captain Obvious would disagree, but lockless (or lockfree, more correctly) does not actually mean that no locks are used. Or that no atomic operations are used. It just means progress will always be made on the algorithm regardless of the state of any of the threads working on the problem.

Technically, you do have a lock if you do an atomic operation. Heck, the x86 assembly primitive instruction for this is even called "lock". But this doesn't preclude it from being used in a lockfree algorithm, because the guarantees of this particular type of lock are such that progress will always be made and hence it is a lock-free algorithm. It's actually a wait-free algorithm in this case, which is stronger than a lock-free algorithm. Technically, in a lock-free algorithm that's not wait-free, you're more than welcome to block as many threads as you want indefinitely using any synchronization primitive you can think of, it just has to be such that the other threads can pick up the work.

Share this post


Link to post
Share on other sites
Sign in to follow this  

  • Advertisement
×

Important Information

By using GameDev.net, you agree to our community Guidelines, Terms of Use, and Privacy Policy.

We are the game development community.

Whether you are an indie, hobbyist, AAA developer, or just trying to learn, GameDev.net is the place for you to learn, share, and connect with the games industry. Learn more About Us or sign up!

Sign me up!