Sign in to follow this  
Followers 0
Akhilleus

Lockless FIFO Queue Implementation

27 posts in this topic

Hey, My longer explanation is outdated, so I'll just say this: I'm working on a lockless queue implementation to use in cross-thread informing of events in a server; the code is below. This queue is designed to work only in the case of one reader, one writer--IE, one and only one thread calling pop(), and one and only one thread calling push(). I *think* I have the problems that were in it worked out, although for now please ignore: 1. The lack of a memory lock in the position specified in push(), and a way to handle creation/deletion of nodes other than the default operators--I'll implement these when I'm sure the logic of the algorithm is right. 2. The possibility of first and last being written to the same cache line and being unreadable at the same time. 3. The current lack of any sort of size() method and the fact that it only stores pointers right now (didn't want to get into error handling for calls to pop() on an empty queue and junk up the code until I got it working at all). 4. The lack of optimization, although of course tips are nice :). Thanks! -Akhilleus
template<typename T>
class Queue
{
public:
	Queue() : last(new QNode(0, true, SAFE, 0)), first(new QNode(0, false, SKIP_NEXT, last))
	{}
	void push(T *data)
	{
		/////////////////////////////////////////////////////////////////////////////////////////////////
		//shared variables read:
		//	(N/A)
		//shared variables written:
		//	prev->tail (if read during write, value read is guaranteed to be meaningful and equal to the
		//				correct or the previous value, as only one bit is actually changed on any write,
		//				because tail is only ever 0 or 1)
		//notes:
		//	1.  the pointer last is never read or written by pop()
		//	2.  the object *last is only accessed by pop() when reading its tail and/or data values
		//	3.  the original object *last is released to full access by pop() by the operation
		//		prev->tail = 0, after which it is never accessed by push() again
		/////////////////////////////////////////////////////////////////////////////////////////////////

		last->next = new QNode(data);
		QNode* prev = last;
		last = const_cast<QNode*>(last->next);
		//memory barrier here so that this line get run last
		prev->tail = 0;
	}
	//unnecessary in the current code, as pop() returns a pointer (NULL if the queue is empty)
	//bool pop_safe()
	//{
	//	return(first->status != SKIP_NEXT || first->next->tail != 1);
	//}
	T *pop()
	{
		/////////////////////////////////////////////////////////////////////////////////////////////////
		//shared variables read:
		//	last->tail (always in the form first->next->tail)
		//shared variables written:
		//	(N/A)
		//notes:
		//	1.  the pointer last is never read or written by pop()
		//	2.  the object *last is only accessed by pop() when reading its value status (atomic)
		//	3.  the original object *last is released to full access by pop() by the atomic operation
		//		prev->status = SAFE, after which it is never accessed by push() again
		/////////////////////////////////////////////////////////////////////////////////////////////////
		while(true)
		{
			if(first->next->tail)
			{
				switch(first->status)
				{
					case(SAFE) :
						//return first's data without popping
						first->status = ALREADY_READ;
						return(first->data);
					case(ALREADY_READ) :
						//return last's data without popping
						first->status = SKIP_NEXT;
						return(first->next->data);
					default: //SKIP_NEXT (first->status never equals TAIL)
						//return nothing, as all values have been popped and the queue is empty
						return(0);
				}
			}
			else //next node is not the tail
			{
				T* to_return;
				QNode* prev;
				switch(first->status)
				{
					case(SAFE) :
						//pop first from the queue
						to_return = first->data;
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
						return(to_return);
					case(ALREADY_READ) :
						//move on to the next node
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
						break;
					default: //SKIP_NEXT
						//set next node to ALREADY_READ and move on to it
						first->next->status = ALREADY_READ;
						prev = first;
						first = const_cast<QNode*>(first->next);
						delete prev;
				}
			} //end if
		}
	}
private:
	//classes & constants
	static const char SAFE = 0;
	static const char ALREADY_READ = 1;
	static const char SKIP_NEXT = 2;;
	struct QNode
	{
		QNode(T *d = 0, bool is_tail = true, char node_status = SAFE, QNode* next_node = 0)
			: data(d), tail(is_tail ? 1 : 0), status(node_status), next(next_node)
		{}
		T *data;
		volatile char tail;
		char status;
		QNode volatile *next;
	};
	//local data
	QNode *last;	//WARNING: do not switch declaration order with first (see init. list)
	QNode *first;
};
template<typename T> const char Queue<T>::SAFE;
template<typename T> const char Queue<T>::ALREADY_READ;
template<typename T> const char Queue<T>::SKIP_NEXT;
[Edited by - Akhilleus on June 12, 2009 5:13:45 PM]
0

Share this post


Link to post
Share on other sites
The short, sweet, and nice answer is your code is not thread safe, it's not lock-free, and it's definitely not even a queue implementation (what happens if you call push more than once?). [wink]

Accomplishing a lockless data structure is quite tricky and takes quite a bit of low level research and work, especially if you want it to work on both x64 and x86 computer architectures. Material on the topic itself is quite limited, so I don't have any links, but I would suggest do some more extensive research on what lockless/lockfree algorithms are.

The best article to start with that I know of would be: Lockless Programming Considerations for Xbox 360 and Microsoft Windows. You might also be interested in>this thread: Double-check my lock-free voodoo.

Good luck!
0

Share this post


Link to post
Share on other sites
Quote:
*EDIT: I don't have any code to prevent optimization reordering some of my statements, but I'm hoping that's trivial to implement, so please overlook it for now.
The volatile keyword should stop the compiler from breaking your code with optimisations, but stopping the CPU from doing so is a lot harder (x86 CPUs re-order instructions internally!!!). You need to learn about memory barriers (aka memory fences, aka interlocked operations) to make this safe.


Also, the default implementation of 'new' will probably be grabbing a mutex internally, which defeats the entire purpose of the lock-free queue :/
Try and do all your memory allocation up-front if possible, and then use placement-new if constructors are required.


This series of articles by Herb Sutter should help:
http://www.ddj.com/cpp/210600279
http://www.ddj.com/hpc-high-performance-computing/210604448
http://www.ddj.com/cpp/211601363
http://www.ddj.com/hpc-high-performance-computing/212201163
0

Share this post


Link to post
Share on other sites
Quote:
The short, sweet, and nice answer is your code is not thread safe, it's not lock-free, and it's definitely not even a queue implementation (what happens if you call push more than once?).


Oops! push() does work more than once, I just seem to have accidentally erased the line resetting the last pointer. Fixed.

But for the thread-safety, it isn't designed to be thread-safe; it's only designed to work with threads if exactly one uses push() and only push, and exactly one uses pop() and only pop(). If it isn't safe for that, could you let me know where it will encounter problems (if it's completely botched, at least the first vulnerability or too so I can see where it went wrong)?

Quote:
The volatile keyword should stop the compiler from breaking your code with optimisations, but stopping the CPU from doing so is a lot harder (x86 CPUs re-order instructions internally!!!). You need to learn about memory barriers (aka memory fences, aka interlocked operations) to make this safe.


Also, the default implementation of 'new' will probably be grabbing a mutex internally, which defeats the entire purpose of the lock-free queue :/
Try and do all your memory allocation up-front if possible, and then use placement-new if constructors are required.


Ahh, well I'll definitely do some research on memory barriers then. As for new, I'm aware of the possible problem, but I was planning on defining my own version to pull objects from a preallocated pool, if the Queue code itself is viable.
0

Share this post


Link to post
Share on other sites
Quote:
Original post by Akhilleus
But for the thread-safety, it isn't designed to be thread-safe; it's only designed to work with threads if exactly one uses push() and only push, and exactly one uses pop() and only pop(). If it isn't safe for that, could you let me know where it will encounter problems (if it's completely botched, at least the first vulnerability or too so I can see where it went wrong)?
The following code is fairly representative of why lock-free queues are very hard to write:
if(first->next == last)//this might be true at the time
{
//but the value of last may have changed here
if(last->status != ALREADY_READ)//so this line is now wrong


The copy-and-swap (CAS) instruction is the back-bone of a lot of these algorithms. It's basically an atomic (thread-safe) version of "set if equals".
It's implemented in the CPU to be fast and safe, but in high-level code it would look like:
bool CAS( int* target, int oldValue, int newValue )
{
bool success = false;
lock a mutex so target can't change unexpectedly
if( *target == oldValue )
{
*target = newValue;
success = true;
}
unlock the mutex
return success;
}

Whenever working on data that can be accessed by more than one thread, then CAS comes into play.


The above example of thread unsafe code however can't be fixed with CAS, so it might be better to re-design the algorithm.
One way to fix this part of the current algorithm might be like this (disclaimer: I'm not a lock-free expert, so this is probably buggy too...)
bool firstNextIsLast = false;
QNode* copyOfLast = CopyWithBarrier( last );
if( first->next == copyOfLast )
{
firstNextIsLast = true;
//make some changes to first->next
}
if( firstNextIsLast )
{
QNode* copyOfLast2 = CopyWithBarrier( last );
if( copyOfLast != copyOfLast2 )//uh oh, 'first->next' does not equal 'last' anymore! We edited the wrong node!
{
undo the changes done to first->next
firstNextIsLast = false;
}
}


I'd highly recommend reading Herb's articles on lock-free queues instead of trying to fix your existing one though ;)

To quote a MS guy who's name I've forgotten: "Lock-free coding is hard... The kind of 'hard' where 6 months after you think you've finally understood it, you realize your code is completely wrong."
0

Share this post


Link to post
Share on other sites
I believe it has been proven that you cannot implement a lockless, fair, linked list with only single-word atomic bus operations (which includes load-reservation store-conditional from PPC as well as your typical CAS instructions).

If you want to be lockless, you end up with what amounts to spinlock, which are not fair, and aren't really lockless in the strict sense (it's not guaranteed that every user will always make progress). This is why people with lockless needs typically end up using lockless FIFOs. The single-reader, single-writer version (with a "head" and "tail" counter) as the simplest to implement, just an atomic increment for a single read or single write, as long as there will only be one thread on each side (in fact, I used that kind of FIFO extensively in BeOS over 10 years ago -- how time flies :-)
0

Share this post


Link to post
Share on other sites
I dunno - this is what I use in for my inner-thread implementations. This is a lock-free multi-writer single-writer linked-list based queue.


template<typename T>
class InterlockedQueue
{
struct Node
{
Node(const T& data, Node * next)
:data(data)
,next(next)
,last(NULL)
{}

T data;
Node * last; //This is only set when this is the end sentinal node.
Node * next;
};
public:
typedef T value_type;

InterlockedQueue()
:begin(NULL)
,end(NULL)
#ifdef _DEBUG
,isPopping(0)
#endif
{
end = new Node(T(), NULL);
begin = new Node(T(), end);
end->last = begin;
}

~InterlockedQueue()
{
while (!empty())
{
pop();
}

delete end;
delete begin;
}

//TODO: Copy / assignment operators.

/*
This function executes an atomic push of data.
The order of data insertion, however, is not guaranteed.
Eg, you can push 1, 2, 3 and get back 2, 3, 1.
*/

void push(const T& data)
{
Node * pushNode = new Node(data, end);
//Swap the last pointer to point at our new node.
Node * last = reinterpret_cast<Node *>(InterlockedExchangePointer(&end->last, pushNode));

//Set the previous node to point at our new node.
InterlockedExchangePointer(&last->next, pushNode);
}

const T& peek() const
{
return begin->next->data;
}

void pop()
{
#ifdef _DEBUG
assert(InterlockedCompareExchange(&isPopping, 1, 0) == 0);
#endif
Node * prev = reinterpret_cast<Node *>(InterlockedExchangePointer(&begin->next, begin->next->next));
delete prev;

#ifdef _DEBUG
isPopping = 0;
#endif
}

bool empty() const
{
return (begin->next) == end;
}

private:
Node * begin;
Node * end;

#ifdef _DEBUG
volatile long isPopping;
#endif
};



I've tested this on a 4 core machine, with 8 threads writing and 1 thread reading, without any sort of issue - but your mileage may vary.
0

Share this post


Link to post
Share on other sites
Some more food for thought:
1) Even if your algorithm doesn't use locks, code like this can end up running serially due to locking within the CPU:
class Queue
{
...
Node * first;
Node * last;
}
'first' and 'last' will likely be placed on the same cache line, and on most CPUs if two threads want write-access to a cache line, then they will invisibly lock it and run serially (not in parallel). Good profiling tools like VTune can detect these events and show you where it is happening.

2) A lot of the time a lock-free queue is overkill for inter-thread communication.
For example, in a multi-writer single-reader situation, if the reader-task can be scheduled to only occur after all writer-tasks are complete, then a wait-free queue can be used instead, which is much simpler and more efficient than a lock-free algorithm.
typedef std::vector< std::vector<Foo> > ThreadLocalQueue;
ThreadLocalQueue tlQueue;
init:
-----
tlQueue.resize( numThreads );
for each entry in tlQueue as q
q.reserve( queueSize );

writer task:
------------
for each calculation to be done
tlQueue[currentThreadIdx].push_back( Foo(calculation) );
WriteBarrier();

reader task:
------------
ReadBarrier();
for each entry in tlQueue as q
for each entry in q as foo
do something with foo
q.clear();
0

Share this post


Link to post
Share on other sites
Quote:
Original post by bobofjoe
I dunno - this is what I use in for my inner-thread implementations. This is a lock-free multi-writer single-writer linked-list based queue.

*** Source Snippet Removed ***

I've tested this on a 4 core machine, with 8 threads writing and 1 thread reading, without any sort of issue - but your mileage may vary.


I don't see how this can work right if there's a queue with one element, and one thread does a push() while another does a pop().

1. Popper reads begin->next->next (which is "end"), and gets pre-empted.
2. Pusher inserts a new element, so begin->next->next is now the new element.
3. Popper sets begin->next to "end," thus losing the recently inserted element.

When you tested with "no issues," did you actually keep track of the number of nodes added, the number of nodes removed, and compare the difference to the number of nodes in the list, after running for a while? I would expect there to be missing (leaked) nodes.
0

Share this post


Link to post
Share on other sites
Okay, I've tried to take all the errors (both specific and in the overall design) pointed out here to rewrite the code, except that I unfortunately haven't had time to address the serial line problem mentioned by Hodgman. I realize it would probably be better to find a pre-implemented queue with the properties I want, or to follow one of the online to tutorials to make one, but now that I'm learning about the problems in designing one it's too interesting not to try and complete. I have been reading articles and information about the concept and related ones, however, as you all suggested.

The new code is in my first post replacing the old code now. I went ahead and fixed the problem whereby the code was relying on assignments to a pointer to be atomic; now pop() NEVER accesses last, even to read (likewise, push() never accesses first). Instead, I've changed the way QNode deals with statuses; one member holds whether the node is safe, already read, or whether it and the next node are both already read (as it was in the old code). There is also a second member, tail, that initializes to 1, indicating that the node is the tail, and is changed to zero by push() when another node is pushed onto it. This is the value that pop() uses to determine whether a node is the tail, instead of comparing that node's address with last. Because tail is always either 0 or 1 (it's a char, not a bool, so I ensure that numerical implementation myself), only one bit ever changes. So if push() is interrupted by pop() when the value is only half-written (if for some reason the machine isn't using aligned chars), pop() will access it but still get a valid value (the current value, if the 1 has already been written, or the previous value, if not). Obviously the only side effect of this is that pop() will in some cases think it's reached the end of the list when actually there's one more node in the process of being added, which is an unimportant side effect.

Again, I'm still very appreciative of comments, suggestions, and errors being pointed out.
0

Share this post


Link to post
Share on other sites
I recently built a FIFO multi-writer single-reader queue for the Epoch programming language. The code is fairly simple; feel free to steal it [smile]

Lockless.h
//
// The Epoch Language Project
// FUGUE Virtual Machine
//
// Basic building blocks for creating lock-free algorithms
//

#pragma once

//
// Helper function for atomic compare-and-swap
// Returns true on success
//
template <class DataType>
bool CompareAndSwap(DataType* field, DataType oldvalue, DataType newvalue)
{
DataType retval;

_asm
{
mfence
mov eax, oldvalue
mov ecx, newvalue
mov edx, dword ptr [field]

lock cmpxchg dword ptr[edx], ecx
mov retval, eax
mfence
}

return (retval == oldvalue);
}




Mailbox.h
//
// The Epoch Language Project
// FUGUE Virtual Machine
//
// Implementation of a lock-free mailbox structure for message passing
//

#pragma once

// Dependencies
#include "Utility/Threading/Lockless.h"
#include "Utility/Threading/Threads.h"

#include "Utility/Types/IDTypes.h"

#include "Utility/Memory/ThreadLocalAllocator.h"

#include "Configuration/RuntimeOptions.h"



//
// This class encapsulates a lockless mailbox algorithm for asynchronous message passing.
// Each thread is granted a mailbox when it is started up; this mailbox is used for all
// messages passed into that thread. Messages are dequeued in FIFO order.
//
// IMPORTANT: this algorithm is only safe for a many producer/single consumer scenario.
// Critical assumptions made by the code are only valid if a single consumer
// thread is used. However, any number of producer threads is permitted.
//
// The algorithm uses three distinct stacks: a read stack, a write stack, and a cache.
// Producers push items onto the write stack using a simple and well-known CAS method.
// When the consumer thread arrives to retrieve a message, it first checks the cache
// for any waiting messages. If the cache is empty, the consumer thread swaps the read
// and write stacks. At this point, the read stack is controlled entirely by the consumer
// thread, so we don't need to worry about contention for elements in the read stack.
// The entire read stack is then traversed and its contents pushed into the cache.
// Note that we achieve FIFO semantics only by using this extra cache stack.
//
// For additional performance, we use customized memory management internally to avoid
// expensive serialized heap allocations/frees. We cannot directly use new/delete in
// the implementation, because these incur serialized heap accesses, which defeats the
// purpose of the lock-free mailbox. Instead, we use a two-pronged approach to memory
// management for the mailbox. Mail messages are kept in a pre-allocated buffer, and
// memory is supplied by this buffer using a simple lock-free LIFO stack of free slots.
// We also use a special allocator class for the std::stack container, which uses a
// thread-local heap provided by the OS. Since the thread-local heap is never accessed
// by an outside thread, we can avoid the cost of serialized heap accesses to that
// particular memory. The combination of these two approaches allows for maximum speed
// in the mailbox container, while retaining lock-free semantics and thread safety.
//
template <class PayloadType>
class LocklessMailbox
{
// Construction and destruction
public:
//
// Construct and initialize the read and write stacks
//
LocklessMailbox(HANDLE owner)
: Owner(owner)
{
WriteHead = OriginalWriteHead = new Node;
ReadHead = OriginalReadHead = new Node;

unsigned buffersize = Config::NumMessageSlots;
NodeBuffer = new Node[buffersize];

NodeFreeList = NULL;
for(unsigned i = 0; i < buffersize; ++i)
{
NodeTracker* tracker = new NodeTracker(NodeFreeList, &NodeBuffer[i]);
FreeListHead = NodeFreeList = tracker;
NodeBuffer[i].Tracker = tracker;
}
}

//
// Clean up the read, write, and cache stacks, freeing any
// remaining messages from each stack.
//
~LocklessMailbox()
{
for(MessageStackType::iterator iter = PendingReads.begin(); iter != PendingReads.end(); ++iter)
delete (*iter)->Payload;

Node* n = WriteHead;
while(n)
{
delete n->Payload;
n = n->Next;
}

n = ReadHead;
while(n)
{
delete n->Payload;
n = n->Next;
}

for(unsigned i = 0; i < Config::NumMessageSlots; ++i)
delete NodeBuffer[i].Tracker;

delete OriginalReadHead;
delete OriginalWriteHead;

delete [] NodeBuffer;
}

// Message passing interface
public:

//
// Register a message from a producer thread. Any number of threads
// may call this function.
//
void AddMessage(PayloadType* info)
{
Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}
}

//
// Retrieve a message from the mailbox.
// IMPORTANT: only ONE consumer thread (per mailbox) should call this function
//
PayloadType* GetMessage()
{
if(!PendingReads.empty())
return PopPendingRead();

if(ReadHead->Next == NULL)
SwapReadAndWrite();

Node* n = ReadHead;
while(ReadHead->Next)
{
PendingReads.push_back(n);
n = n->Next;
ReadHead = n;
}

if(PendingReads.empty())
return NULL;

return PopPendingRead();
}

// Internal helpers
private:

//
// Pop a pending read from the cache stack
//
PayloadType* PopPendingRead()
{
Node* readnode = PendingReads.back();
PayloadType* payload = readnode->Payload;
FreeNode(readnode);
PendingReads.pop_back();
return payload;
}

//
// Internal helper for swapping the read/write stacks
// See class comment for details
//
void SwapReadAndWrite()
{
while(true)
{
Node* readhead = ReadHead;
Node* oldwrite = WriteHead;
bool success = CompareAndSwap(&WriteHead, oldwrite, readhead);
if(success)
{
ReadHead = oldwrite;
return;
}
}
}

// Internal memory management
private:
struct NodeTracker;

struct Node
{
Node() : Next(NULL), Payload(NULL) { }
explicit Node(PayloadType* p) : Next(NULL), Payload(p) { }

Node* Next;
PayloadType* Payload;
NodeTracker* Tracker;
};

struct NodeTracker
{
NodeTracker() : Next(NULL), AttachedNode(NULL) { }
explicit NodeTracker(Node* attached) : Next(NULL), AttachedNode(attached) { }
NodeTracker(NodeTracker* next, Node* attached) : Next(next), AttachedNode(attached) { }

NodeTracker* Next;
Node* AttachedNode;
};

//
// Allocate space for a message node, using the local reserved pool
//
// We trade off a limited amount of message space for the ability
// to allocate and free node storage without locking on the heap.
// Memory is managed with a simple free list that stores pointers
// into the pre-allocated node buffer. The list is implemented as
// a lock-free FILO stack; this allows multiple producers to send
// messages without blocking on the allocation routines.
//
Node* AllocateNode(PayloadType* info)
{
NodeTracker* rettracker;

if(!FreeListHead->Next)
{
::TerminateThread(Owner, 0);
throw Exception("Too many messages backlogged; make sure task is accepting the sent messages!");
}

// Pull node out of the free list
NodeTracker* newhead;
do
{
rettracker = FreeListHead;
newhead = rettracker->Next;
} while (!CompareAndSwap(&FreeListHead, rettracker, newhead));

rettracker->AttachedNode->Payload = info;
return rettracker->AttachedNode;
}

//
// Return a node to the free list
//
void FreeNode(Node* node)
{
node->Payload = NULL;

while(true)
{
node->Tracker->Next = FreeListHead;
if(CompareAndSwap(&FreeListHead, node->Tracker->Next, node->Tracker))
break;
}
}


// Internal tracking
private:
Node* WriteHead, *OriginalWriteHead;
Node* ReadHead, *OriginalReadHead;

Node* NodeBuffer;

NodeTracker* NodeFreeList;
NodeTracker* FreeListHead;

typedef std::deque<Node*, ThreadLocalAllocator<Node*> > MessageStackType;
MessageStackType PendingReads;

HANDLE Owner;
};


Config::NumMessageSlots is currently set to 64 in the Epoch project, but you may need to fine-tune that for your particular situation.

This queue can pump something like 100,000 messages/second on my E6600 dual-core machine, so performance shouldn't be an issue [smile]
0

Share this post


Link to post
Share on other sites
Quote:
Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?
0

Share this post


Link to post
Share on other sites
Quote:
Original post by hplus0603
Quote:
Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?


Some type of spin attempt is probably needed, depending on what restrictions is put on the system..

There can either be a real spin-LOCK, such that some other thread may be unable to progress past a certain point, until the current thread has run at least one more instruction. This is of the type:

lock_it:
compare_exchange(lock, 1, 0)
jump_on_failure lock_it
(do something)
lock = 0

Iff the thread that takes this lock is suspended by the OS on (do something), then it is possible that some other thread, that tries to run the locking code, will be waiting for a suspended thread. When that happens the entire system will cease to progress for some time. When this is possible, the structure is not lock-free.

If we have a different algorithm such as the following:

do_it:
load obj from addr
compare_exchange(addr, next addr, obj)
jump_on_failure do_it
(use obj)

Then it is also possible that this thread will spin forever, theoretically, without doing anything productive. That will happen very rarely, iff other threads are continuously doing work, keeping the entire system progressing. Whenever other threads stop requesting new objects to do work on, however, then this thread will be guaranteed to progress, if scheduled to run. This way it is lock-free in the sense that it is impossible for the entire system to cease progressing.

Assuming the possibility of unidentifiable and infinite threads using the structure, then as far as I know it is impossible to make any real provable guarantees for the progress of a certain thread.

More information: http://en.wikipedia.org/wiki/Non-blocking_synchronization.
Following links from that Wikipedia page there seems to be algorithms that solve this perfectly, assuming the number of threads is known, or similar constraints.
0

Share this post


Link to post
Share on other sites
Quote:
Original post by hplus0603
Quote:
Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?



Technically, a CAS instruction isn't lockless in the first place, because it introduces a bus-level lock on the CPU. So if you want to be picky, nothing is lockless [wink]

I'm not aware of any method to do a CAS that doesn't spin until the CAS succeeds. How else are you supposed to handle the case where the swap fails? You can't just CAS once and hope it works, because in the instance where the swap is not made, you've just lost (and leaked) an entry.


There is a minor fairness problem, which is that it is possible (although incredibly unlikely) that if one writer continually spams the queue, it can prevent another thread from ever succeeding with the CAS. I know of no way around that which doesn't introduce synchronization issues; I've noted similar problems with basically every lock-free structure I've ever seen.
0

Share this post


Link to post
Share on other sites
Quote:

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?


To me it seems that it should guarantee progress (considering everything else is correct).

CAS will succeed, unless the value has been modified. If can fail only if:
- One writer's CAS succeeded
- Reader's CAS succeeded

And since there is no pre-allocated storage, new will fail outside of block and there is no case where list might be full.

Reader might be trying to read empty list, perhaps racing with writer, but that would require an extra attempt at most.

While fairness isn't guaranteed, at least progress should occur. Fairness to reader might be more of a problem. Since it can deque only a single item, while there can be many writers, the queue might not be emptying fast enough.
0

Share this post


Link to post
Share on other sites
Quote:
I'm not aware of any method to do a CAS that doesn't spin until the CAS succeeds.


The x86 has a native CMPXCHG instruction. It is implemented in multi-CPU systems to do arbitration of the bus; I believe it is actually guaranteed to be fair at the hardware level, and thus will have an upper bound on how long it will take to execute. The implementation in hardware typically goes something like:

1) Tell chipset/memory controller to wait for an exclusive lock for a specific cache line.
2) Wait until chipset provides the cache line.
3) Do the compare/exchange.
4) Release the cache line.

As long as the chipset does some kind of round robin or queuing on who gets a given contested cache line next, it will be fair and guarantee progress. By contrast, typical spinlocks do not guarantee progress, because it could always be the case that one particular thread gets a particular spinlock. And while that's supposed to be "unlikely" in simple analysis, there have been various bugs/crashes/catastrophes caused by just such kinds of bugs in the past...

The PPC, however, does not have bus-atomic operations; instead, it puts the cache line logic into the CPU. You load a cache line with reservation. While there is a reserved cache line, the CPU will snoop the bus, and any traffic to that same address will invalidate the reservation. You make the changes you want to make in cache, and then store conditional with reservation check. The store will fail if you lost the reservation, and you can test for this and loop back. This is, by the way, not fair, and not generally guaranteed to make progress, so real-time OS implementers on top of PPC might have a harder job. I'm just saying: when you have learned that concurrency is hard, you find that concurrency is harder than you think :-)
0

Share this post


Link to post
Share on other sites
In the code I posted for Lockless.h, you can find the implementation of the CompareAndSwap function, which makes use of the x86 cmpxchg instruction.

Even though cmpxchg is atomic at the CPU level, you still need to check for a failed swap - hence the loop.
0

Share this post


Link to post
Share on other sites
Quote:
Original post by hplus0603
Quote:
Node* msgnode = AllocateNode(info);
while(true)
{
msgnode->Next = WriteHead;
bool success = CompareAndSwap(&WriteHead, msgnode->Next, msgnode);
if(success)
return;
}


Technically, this is not lockless, because it uses spin locking.

I haven't analyzed it in detail, but it seems as if it does not guarantee fairness (or at least progress) among the multiple writers, either?
Yeah there's no fairness, PPC especially can have any thread starving others if bottlenecked with CAS/etc.

A CAS or reservation/store-conditional bottleneck is "lock-free" though, in that if any one thread is suspended the other threads can still keep working.
The PPC's version of this primitive is especially lax about pausing other threads to do the atomic operation.
Each thread has a finite amount of work to do in a frame though, so if a thread is stuck in a spin-lock it it only affects the progress of that thread and doesn't affect the whole system (besides reducing parallelism!).

So each spin-lock is a 'gate' that only one thread can pass through at a time - a very lightweight synchroniser. If only one thread tries to do the operation at a time then it's fine, but if more than one thread has to do the same operation then only one can proceed at a time.

Calling some lock-free functions from multiple threads, while thread-safe, can cause this synchronisation. So it's still a good idea to minimise the use of these data-structures/algorithms.
For example, instead of pushing individual items into a queue, you could collect an array of items locally in a thread and then push the whole array into the queue with one operation.

Better yet, if you can group work into 'jobs' and then schedule the jobs with dependencies, then can guarantee that two jobs won't be running at the same time, and thus those two jobs can access a data structure without threading-issues (besides OOO CPU details...). This is the step up from lock-free, called wait-free. If you've got a bunch of these algorithms, then when one is doing a serial task others can be doing parallel tasks, keeping a thread pool busy.
e.g. Two threads are used to run the update function, only thread 1 runs a start/end frame function. Thread 2 is paused during this time.
start frame -> update 1 -> 
-> update 2 -> end frame
The update function can write data into two arrays, no syncing of threads required. When both threads are done, a serial task groups the results together and does something with the data.
class Work
{
Thread* pThread = new Thread( StartWorkerThread );
std::vector<Result*> results[NUM_THREADS];

void Start()
{
pThread->Run();
Update( 0 );
}
void StartWorkerThread()
{
Update( 1 );
}

void Update( int threadIndex )
{
ReadBarrier();
for( i=0; i<100; ++i )
{
Result* calculation = //Do Stuff
results[threadIndex].push_back( calculation );
}
WriteBarrier();
if( threadIndex != 0 )
pThread->GoToSleep();
else
{
WaitToFinish( pThread );
Finish();
}
}

void Finish()
{
ReadBarrier();
for( int thread=0; thread<NUM_THREADS; ++thread )
{
for( int i=0; i<100; ++i )
{
results[thread][i]->UseResultSomehow();
}
results[thread].clear();
}
WriteBarrier();
Start();
}
};

0

Share this post


Link to post
Share on other sites
Actually, I believe research now shows that the currently best way to do work queuing is to keep one queue per thread. As long as all threads are busy, there is no contention at all. If a thread runs out of work, then it can steal work from another thread -- typically from the *back* of that thread's queue, although it's also possible to do it from the front. One trick in that case is to CAS the element you will dequeue to NULL; if you fail doing that, you fail the dequeue, and loop back. You can do a safe one-writer many-readers queue that way quite simply.

If the rule is that only the main thread adds jobs to the queues, and/or the rule is that only a thread adds jobs to its own queue, then a single-writer multiple-reader FIFO is exactly what you need!
0

Share this post


Link to post
Share on other sites
Just out of curiosity, how much of your time is being spent inside of functions that are reading/writing from queues?
0

Share this post


Link to post
Share on other sites
In this case, I think the question should be the reverse: Given a certain performance of a queue, how much queuing can you do in your application?

The reason is that future software architecture will look different from current software architecture. Small tasks that are assigned to worked thread pools are probably going to be an important part of that. Once you start writing your software like that, queuing costs are going to become noticeable.

Btw: Visual Studio 2010 is going to include a lightweight tasking/queuing library along these lines (but with more primitives than just queuing).
0

Share this post


Link to post
Share on other sites
Quote:
Original post by hplus0603
In this case, I think the question should be the reverse: Given a certain performance of a queue, how much queuing can you do in your application?

The reason is that future software architecture will look different from current software architecture. Small tasks that are assigned to worked thread pools are probably going to be an important part of that. Once you start writing your software like that, queuing costs are going to become noticeable.

Btw: Visual Studio 2010 is going to include a lightweight tasking/queuing library along these lines (but with more primitives than just queuing).


Oh, agreed entirely. My question was more an attempt to probe into whether this is a real problem that's being solved right now, or a case of premature optimization.
0

Share this post


Link to post
Share on other sites
Quote:
Original post by kyoryu
My question was more an attempt to probe into whether this is a real problem that's being solved right now, or a case of premature optimization.
I'm building an actor/entity system at the moment, where to allow parallel updates of entities, all inter-entity function calls must be queued (to be executed at a safe moment). This could blow out anywhere from 100 to 1M queue ops per frame.
Traditional locks can take hundreds of cycles to process. If we estimate 200 cycles of a 2.4GHz CPU, that's 1/12th of a second to do 1M lock operations.
Lock-free queues are an order of magnitude faster, and regular queues are an order of magnitude faster again.

Seeing I'm making something so reliant on queues, I've ditched all lock-free containers up front. It is a premature optimisation ;), but it's common sense that with this frequency of usage I've got to avoid using any scalability busting approach (and that means no potential synchronisation).
0

Share this post


Link to post
Share on other sites
Cool, I love the Actor model, I honestly believe that we will see a drift towards it in the near future, as it handles concurrency much better.

From my own experiences implementing Actor systems, I'd really recommend just starting out with a locking queue first, to get a performance baselines at least. If it's encapsulated in a class, you can get an idea of what performance is like, and then make improvements from there. It is very possible for a lockless implementation to be slower than a locked implementation, depending on the efficiency of the lockless queue.
0

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!


Register a new account

Sign in

Already have an account? Sign in here.


Sign In Now
Sign in to follow this  
Followers 0