Sign in to follow this  

Here's an implementation of a lock-free multi-reader/multi-writer array-based queue

This topic is 3317 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I've tested the following code and it works very well. I started with the algorithm from "A Practical Nonblocking Queue Algorithm Using Compare-And-Swap", however I found an error in their algorithm (after a very protracted debugging section to first find the error and then check that it was not due to an implementation mistake). Searching other references to this paper, I came across "Formal Verification of An Array-based Nonblocking Queue" which discusses the same error, and provides an fix as well as a small enhancement. I was still getting occasionally an error, but a load barrier fixed that (see code). This code might still have some bugs despite my best effort, so I make no warranties whatsoever regarding it. Additionally, if you do not substantially change it, please retain the copyright notice. A couple of notes are in order: - if you enqueue/dequeue in a busy loop waiting for a non-full/non-empty queue, performance might be better if you add a slight delay after each failed operation, at least an _mm_pause (or __yield if you're on AMD64); - there weird-looking casting with dereference and address-of operators around item is because if T is something like an int or float, where there is a standard conversion defined, at least MSVC gives an error saying that static_cast should be used instead of reinterpret_cast (yet if there's no such conversion, the reverse happens and it says reinterpret_cast should be used); if anyone knows of a cleaner solution, please post (and C-style cast doesn't count); - _ReadBarrier is only a compiler reordering barrier, but seems sufficient in my testing (_WriteBarrier also works, as does _ReadWriteBarrier); using _mm_lfence, which is a CPU reordering barrier, also seems to work, but _mm_sfence doesn't (the latter is strange given _WriteBarrier works, but I have no explanation); do not remove the barrier as with some optimization options you will get errors (albeit infrequent for most setups); - this won't work on a 64-bit systems; using _InterlockedCompareExchange128 might allow an easy conversion, however; - and, I have not tested this on an AMD processor (though I expect it should work).
// Copyright (C) 2008 Borislav Trifonov
// Based on algorithm from "Formal Verification of An Array-based Nonblocking Queue"

// TODO: For 64-bit system, if cannot adapt to _InterlockedCompare64Exchange128() or _InterlockedCompareExchange128(),
// use instead algorithm from Fig.5 in "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives"

#if !defined ATOMIC_QUEUE
#define ATOMIC_QUEUE

#include <exception>
#include <cassert>
#include "processor.h"

template<class T, unsigned long size>
class AtomicQueue
{
public:
	class Exc : public std::exception
	{
	public:
		inline Exc(const char []);
	};
	inline AtomicQueue(void);
	inline ~AtomicQueue(void);
	inline bool Push(T const); // Returns false if full
	inline bool Pop(T &); // Returns false if emtpy
	inline T operator[](unsigned long); // No error checking
private:
	__declspec(align(4)) volatile unsigned long head, tail;
	unsigned long long *volatile buff; // TODO: Find out if the volatile is necessary
};

template<class T, unsigned long size>
inline AtomicQueue<T, size>::AtomicQueue(void) : head(0), tail(0), buff(0)
{
	assert(sizeof(T) == 4);
	assert(sizeof(unsigned long) == 4);
	assert(sizeof(unsigned long long) == 8);
	buff = static_cast<unsigned long long *>(_mm_malloc(size * sizeof(unsigned long long), 8));
	if (!buff) throw Exc("Not enough memory");
	for (int i = 0; i < size; ++i) buff[i] = 0ull;
}

#include <iostream>
template<class T, unsigned long size>
inline AtomicQueue<T, size>::~AtomicQueue(void)
{
	_mm_free(buff);
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Push(T const item)
{
	while (true)
	{
		__declspec(align(4)) unsigned long const t(tail), i(t % size), h(head);
		__declspec(align(8)) unsigned long long const a(buff[i]);
		_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
		if (t != tail) continue;
		if (t == head + size)
		{
			if (buff[h % size] & 0xFFFFFFFFull && h == head) return false;
			_InterlockedCompareExchange(&head, h + 1, h);
			continue;
		}
		// Item is stored in lower two bytes since the interlocked intrinsics ignore the MSB
		if (a & 0xFFFFFFFFull)
		{
			if (buff[i] & 0xFFFFFFFFull) _InterlockedCompareExchange(&tail, t + 1, t);
		}
		else if (_InterlockedCompareExchange64(buff + i, static_cast<unsigned long long>(*reinterpret_cast<unsigned long const *>(&item))
| ((a & 0xFFFFFFFF00000000ull) + 0x100000000ull), a) == static_cast<long long>(a))
		{
			_InterlockedCompareExchange(&tail, t + 1, t);
			return true;
		}
	}
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Pop(T &item)
{
	while (true)
	{
		__declspec(align(4)) unsigned long const h(head), i(h % size), t(tail);
		__declspec(align(8)) unsigned long long const a(buff[i]);
		_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
		if (h != head) continue;
		if (h == tail)
		{
			if (!(buff[t % size] & 0xFFFFFFFFull) && t == tail) return false;
			_InterlockedCompareExchange(&tail, t + 1, t);
			continue;
		}
		if (!(a & 0xFFFFFFFFull))
		{
			if (!(buff[i] & 0xFFFFFFFFull)) _InterlockedCompareExchange(&head, h + 1, h);
		}
		else if (_InterlockedCompareExchange64(buff + i, (a & 0xFFFFFFFF00000000ull) + 0x100000000ull, a) == static_cast<long long>(a))
		{
			_InterlockedCompareExchange(&head, h + 1, h);
			unsigned long dummy(a & 0xFFFFFFFFull);
			item = *reinterpret_cast<T *>(&dummy);
			return true;
		}
	}
}

template<class T, unsigned long size>
inline T AtomicQueue<T, size>::operator[](unsigned long i)
{
	unsigned long dummy(buff[(head + i) % size] & 0xFFFFFFFFull);
	return *reinterpret_cast<T *>(&dummy);
}

template<class T, unsigned long size>
inline AtomicQueue<T, size>::Exc::Exc(const char msg[]) : std::exception(msg)
{
}

#endif

Comments welcome. I hope someone besides me finds it useful. The processor.h header is simply my way of avoiding having to include Windows header files. It needs some additions before it can be used on Linux and so is incomplete, but here it is anyway.
#if !defined PROCESSOR_H
#define PROCESSOR_H

// Note: MSVC may require /Oi compiler option

// CPU load/store reordering barriers and fast yield
// TODO: For other than _MSC_VER; might be OK for __ICL
#if defined _M_IA64 // TODO: Probably no IA64 support needed
extern "C" __mf
#pragma intrinsic(__mf)
#define MemoryBarrier __mf
extern "C" void __yield(void);
#pragma intrinsic(__yield)
#define YieldProcessor __yield
#else
extern "C" void _mm_mfence(void);
#pragma intrinsic(_mm_mfence)
extern "C" void _mm_lfence(void);
#pragma intrinsic(_mm_lfence)
extern "C" void _mm_sfence(void);
#pragma intrinsic(_mm_sfence)
extern "C" void _mm_pause(void);
#pragma intrinsic(_mm_pause)
#define YieldProcessor _mm_pause
#if defined _M_AMD64 // TODO: Check if this is OK if load barrier needed
extern "C" void __faststorefence(void);
#pragma intrinsic(__faststorefence)
#define MemoryBarrier __faststorefence
#elif defined _M_IX86
/*__forceinline void MemoryBarrier(void)
{
	long Barrier;
	__asm xchg Barrier, eax
}*/
#define MemoryBarrier _mm_mfence
#else
#error "Unsupported environment"
#endif
#endif

// Compiler load/store reordering barriers
// TODO: Check if these are implied by the CPU barriers
#if defined __ICL || defined __ICC || defined __ECC
#define _ReadWriteBarrier __memory_barrier
#define _ReadBarrier __memory_barrier
#define _WriteBarrier __memory_barrier
#elif defined _MSC_VER
extern "C" void _ReadWriteBarrier(void);
#pragma intrinsic(_ReadWriteBarrier)
extern "C" void _ReadBarrier(void);
#pragma intrinsic(_ReadBarrier)
extern "C" void _WriteBarrier(void);
#pragma intrinsic(_WriteBarrier)
#elif defined __GNUC__ // TODO: Other options for read- or write-only barriers?
#define _ReadWriteBarrier() __asm__ __volatile__("" ::: "memory")
#define _ReadBarrier() __asm__ __volatile__("" ::: "memory")
#define _WriteBarrier() __asm__ __volatile__("" ::: "memory")
#else
#error "Unsupported environment"
#endif

// Interlocked intrinsics
// TODO: For other than _MSC_VER; might be OK for __ICL;
// note that __GNUC__ __sync_bool_compare_and_swap does the comparison but _Interlocked... doesn't
extern "C" long _InterlockedCompareExchange(volatile unsigned long *, unsigned long, unsigned long);
#pragma intrinsic(_InterlockedCompareExchange)
extern "C" long long _InterlockedCompareExchange64(volatile unsigned long long *, unsigned long long, unsigned long long);
#pragma intrinsic(_InterlockedCompareExchange64)

// Aligned memory allocator and deallocator
#if defined __ICL || defined __ICC || defined __ECC // TODO: May not be necessary; also, probably __ECC (IA64) support not needed
extern "C" void* __cdecl _mm_malloc(size_t, size_t);
extern "C" void __cdecl _mm_free(void *);
#elif defined _MSC_VER
#include <malloc.h>
#elif defined __GNUC__
#include <mm_malloc.h>
#else
#error "Unsupported environment"
#endif

#endif

[Edited by - Prune on November 17, 2008 2:48:24 PM]

Share this post


Link to post
Share on other sites
In my multi-threaded library, Flow, the queue implementation I used was lock-based, because I didn't have the time to design and implement a lock-free one:

template <typename T>
class channel
{
typedef T value_t;
typedef boost::optional<value_t> optional_t;
typedef std::queue<value_t> queue_t;
typedef boost::mutex mutex_t;
typedef mutex_t::scoped_lock lock_t;
typedef boost::condition condition_t;

// Input-output elements ================================================

// Any operation over the data passes through the
// data mutex. This allows sync of both input and
// output threads.

queue_t data;
volatile bool is_over;
mutex_t data_mutex;
condition_t written_to;

// Reference counting elements ==========================================

// Any operation over the handles passes through
// the handle mutex.

mutex_t handle_mutex;
volatile unsigned output_handles;
volatile unsigned input_handles;
volatile bool is_silent;

public:

// Initialize a clean channel
channel() :
is_over(false),
output_handles(0),
input_handles(0),
is_silent(false)
{
}

void acquire(bool input)
{
lock_t lock(handle_mutex);
if (input)
++input_handles;
else
++output_handles;
}

// Release a reference to the flow.
// - If all threads cease to reference the flow, then
// the memory is deallocated with operator delete
// - If all input threads cease to reference the flow,
// sets the is-over flag and wakes up an output
// thread.
// - If all output threads cease to reference the
// flow, sets the is-silent flag.
void release(bool input)
{
bool should_delete = false;

{
lock_t lock(handle_mutex);

if (input)
{
if (--input_handles == 0)
{
lock_t lock(data_mutex);
is_over = true;
written_to.notify_all();
}
}
else
{
if (--output_handles == 0)
{
is_silent = true;
}
}

should_delete = (input_handles + output_handles == 0);
}

if (should_delete)
{
delete this;
}
}

// Is the flow silent?
bool silent()
{
lock_t lock(handle_mutex);
return is_silent;
}

// Add an element to the flow.
// - If any output threads were waiting for input,
// one of them is reactivated and reads the input.
// - Otherwise, stores the element for later collection.
void add(const value_t &t)
{
lock_t lock(data_mutex);
data.push(t);
written_to.notify_one();
}

// Attempt to read an element from the flow.
// - If the flow is over, leaves the argument untouched.
// - If the flow is currently empty, but there are still
// input threads, blocks until input (or an is-over
// signal) arrives.
void get(optional_t &t)
{
lock_t lock(data_mutex);

while (!is_over && data.empty())
{
written_to.wait(lock);
}

if (is_over && data.empty())
{
return;
}

t = data.front();
data.pop();
}
};



Your lock-free queue would be quite useful as an alternative implementation, especially since it might improve performance. Two problems I see with this are:

  • The fixed queue size. Algorithms that use the flow library may rely on the fact that the writer can dump a heapload of data into a flow before even creating the thread to read it.
  • The signature of the "bool pop(T&)" function, which assumes that an object of type T is already present (which, of course, is an assumption that the flow library cannot make because it works with arbitrary data). Would it be possible to use an alternative "boost::optional<T> pop()" signature?


(And of course the fact that the state of flows relies on stuff other than the queue, such as reference counting).

Share this post


Link to post
Share on other sites
Some miscellaneous comments.

1. First, the show-stopper (for me). I am not qualified to judge the implementation as I have not delved in to the deeper aspects of lock free programming. It is for this reason that I think providing an *extensive* set of stress tests is very important. This is especially true due to you being unsure about the type of barrier used in certain places. I have more confidence in your ability to do this correctly than in mine, but without these tests I wouldn't touch it with a 10ft barge pole.

2. I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

3. There are a few places where static assertions could be used in place of assert() for earlier errors.

4. It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

5. Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

6. To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

7. I assume the copy constructor and assignment operator should be explicitly disabled?

8. Are your references available online somewhere? Links to them in the source would be nice, also.

So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

Share this post


Link to post
Share on other sites
Quote:
Original post by ToohrVyk
The fixed queue size.

If you allow the queue to be locked when using an allocator would be trivial. If you want a concurrent nonblocking allocator, is not that simple. See the source code for the concurrent vector in Intel's Threaded Building Blocks for an example.

Quote:
The signature of the "bool pop(T&)" function, which assumes that an object of type T is already present (which, of course, is an assumption that the flow library cannot make because it works with arbitrary data). Would it be possible to use an alternative "boost::optional<T> pop()" signature?

Can you summarize boost::optional? My intention was that the queue would only hold pointers, and I changed it to handle any pointer-sized T as an afterthought (since I could alternatively put in indices instead of pointers, or whatever).

Share this post


Link to post
Share on other sites
Quote:
Original post by the_edd
It is for this reason that I think providing an *extensive* set of stress tests is very important.

That is true for the specific implementation, and I'm continuing testing. The algorithm, however, has been verified to be correct in the paper I referenced, "Formal Verification of An Array-based Nonblocking Queue".

I want to note here that I have also contacted the author of "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives", whose paper compared his algorithm to the one I started with ("A Practical Nonblocking Queue Algorithm Using Compare-And-Swap"), and I asked him about the error I had noticed in the previous algorithm--this is before I came across the verification paper and their fix. In his reply, he provided a different fix, that is simpler and I think more efficient--simply move the reading of the array entry to after the line doing the full/empty check. This saves a read of the array entry for some of the possible flows of control through push/pop, but I have not tested it myself yet (also he does not have the retry attempt that the verification paper introduces and you can see in my code). I might implement his fix and compare the two versions for efficiency; I'll post code later.

Quote:
This is especially true due to you being unsure about the type of barrier used in certain places.

The interlocked operations imply barriers on the CPU, and at least MSVC knows to treat them as barriers on the compiler level (on an IA64 system, there are even extra versions of the interlocked instructions depending on whether you might just need acquire or just release fencing). A problem, though, is that MSVC documentation is not very clear or even consistent in some cases. For example, I've not been able to find out if MSVC will know that the _mm_?fence() are barriers. One could alternatively use #pragma omp flush

In regards to the barrier, in MSVC volatile reads are supposed to have acquire semantics, and that should imply a read barrier. All the reads above the barrier will be completed when it hits (unless MSVC has a problem with buff[i] though pointer to volatile should still have the claimed semantics). What is most likely the case here--and I will check the assembly being put out when I have the time--is that the compiler doesn't recompute the new value of i--even though t is updated since tail is volatile, it might not figure out that it needs to recompute i as well, since i doesn't get affected directly by a volatile barrier. This ties in with your next comment.

Quote:
I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

Indeed I intend to remove volatiles as much as possible in the next iteration, after considering http://software.intel.com/en-us/blogs/2007/11/30/volatile-almost-useless-for-multi-threaded-programming/
They should be replaced with explicit barriers, which not only makes things not rely on compiler specifics, but also can improve performance.
By the way, another issue with GCC is that it assumes strict aliasing if you set -O2, at least for C code, not so sure about C++. But my pointer casting breaks that if I understand the rule correctly, so -fnostrict-aliasing might need to be specified for compiling source files including atomic_queue.h

Quote:
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.

Quote:
It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

This relates to the previous reply post. It might be nice, and I'd welcome an example, but I was going for handling the core functionality first. Additionally, as lock-free is to a great extent about efficiency, I think that hiding memory allocations from the user only makes sense if the full range of use cases are considered and optimized, as is the case with STL containers. But this is a good deal of work.

Quote:
Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

Because then I'd have to disable the default constructor, which is a problem if I want to do something like queues = new AtomicQueue<MyType *>[num];

Quote:
To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

I haven't really done template metaprogramming (just used existing code before), so I don't know how to do that...

Quote:
I assume the copy constructor and assignment operator should be explicitly disabled?

They don't really make sense in a concurrent setting. This is the same reason that a multithreaded version of the STL I came across doesn't implement top(), full(), and empty() for their queue (there are similar opinions expressed in comments around the code for some stuff in Intel's Threaded Building Blocks). Perhaps one could lock the queue for copy operations (same with resize), but if you're really dealing with a queue that is expected to have a widely varying size, and you're willing to give up a bit of performance, simply use a linked-list based queue. See "Nonblocking Algorithms and Preemption Safe Locking on Multiprogrammed Shared Memory Multiprocessors" and "Verifying Michael and Scott's Lock-free Queue Algorithm Using Trace Reduction". I think Valve uses something similar in their most recent engine (see their GDC2007 presentation).

Quote:
Are your references available online somewhere? Links to them in the source would be nice, also.

The problem is a dynamic Web (links do die) and journal access for some of the papers. With Google, I only found the two verification papers with full text available on the public Web. Email me if you want full text of any of the others.

Quote:
So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

I have tested this by inserting various combinations of delays at many different locations (to increase the chance that an operation will get interrupted), and have run a few tests for an hour at a time, checking head/tail consistency with pushes and pops carried out, as well as data integrity. The thing I haven't tested yet is what happens when head or tail overflow and wrap around. The interlocked operations ignore the sign bit, so there would be actually two conceptual wrap-arounds for each integer overflow, but that simply means that the maximum reference count is reduced by a factor of two. That still leaves a significant safety margin--you'd need a thread to be preempted for a hell of a long time (proportional to queueSize*numeric_limits<unsigned long>::max()) to have the reference count wrap around before it resumes (and on top, the thread would have to resume on the exact same reference count value for there to be a problem--an extremely unlikely possibility).

[Edited by - Prune on November 16, 2008 3:39:49 AM]

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
Quote:
Original post by the_edd
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.


I think he meant something like boost.StaticAssert.

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
Quote:
Original post by the_edd
It is for this reason that I think providing an *extensive* set of stress tests is very important.

That is true for the specific implementation, and I'm continuing testing. The algorithm, however, has been verified to be correct in the paper I referenced, "Formal Verification of An Array-based Nonblocking Queue".


Formal verification is very nice to have for this kind of thing. But I still wouldn't trust it without tests. I realise a forum isn't the best place to post a test suite, but if you get around to bundling up a zip or creating a public repository, I would strongly encourage you to make the tests available.

Knuth: "Beware of bugs in the above code; I have only proved it correct, not tried it."

Quote:

Quote:
This is especially true due to you being unsure about the type of barrier used in certain places.

The interlocked operations imply barriers on the CPU, and at least MSVC knows to treat them as barriers on the compiler level (on an IA64 system, there are even extra versions of the interlocked instructions depending on whether you might just need acquire or just release fencing). A problem, though, is that MSVC documentation is not very clear or even consistent in some cases. For example, I've not been able to find out if MSVC will know that the _mm_?fence() are barriers. One could alternatively use #pragma omp flush

The MSDN forums are pretty good for this kind of thing. A had a rather hairy question answered there about the interaction of Fibers and volatile variables.

Quote:

Quote:
I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

Indeed I intend to remove volatiles as much as possible in the next iteration, after considering http://software.intel.com/en-us/blogs/2007/11/30/volatile-almost-useless-for-multi-threaded-programming/
They should be replaced with explicit barriers, which not only makes things not rely on compiler specifics, but also can improve performance.

That sounds like a good idea.

Quote:

Quote:
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.

As Gage64 said, I was thinking along the lines of BOOST_STATIC_ASSERT. If you don't want to rely on Boost, it's only 5 lines of code to implement it yourself. The point is that the code won't even compile if these assertions fail:

STATIC_ASSERT(sizeof(T) == 4);
STATIC_ASSERT(sizeof(unsigned long) == 4);
STATIC_ASSERT(sizeof(unsigned long long) == 8);



Quote:

Quote:
It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

This relates to the previous reply post. It might be nice, and I'd welcome an example, but I was going for handling the core functionality first.


I was thinking along the lines of:

template<typename T, std::size_t Size>
struct element_traits
{
typedef T *element_type;
STATIC_ASSERT(sizeof(element_type) == 4);

static void create_at_location(const T &x, void *location) { *static_cast<T **>(location) = new T(x); }
static void destroy_at_location(void *location) { delete *static_cast<T **>(location); }
static T &get(void *location) { return **static_cast<T **>(location); }
};

template<typename T>
struct element_traits<T, 4>
{
typedef T element_type;

static void create_at_location(const T &x, void *location) { new(location) T(x); } // placement new with copy-constructor
static void destroy_at_location(void *location) { static_cast<T *>(location)->~T(); } // manual destructor call
static T &get(void *location) { return *static_cast<T *>(location); }
};



Now in your AtomicQueue<T> you would have:


template<typename T, ...>
class AtomicQueue
{
typedef element_traits<T, sizeof(T)> traits;
};



If you perform all your manipulations through the static functions in traits, then you have support for arbitrary element types. If they're 4 bytes, the elements will be stored by value, else by pointer.

Quote:

Additionally, as lock-free is to a great extent about efficiency, I think that hiding memory allocations from the user only makes sense if the full range of use cases are considered and optimized, as is the case with STL containers. But this is a good deal of work.

You have a point, but at some point someone is going to have to do the allocation. Are you as the container author not in a better position (the best position?) to do that efficiently? You may not be, this isn't a rhetorical question! You could also consider an allocator template parameter.

Quote:

Quote:
Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

Because then I'd have to disable the default constructor, which is a problem if I want to do something like queues = new AtomicQueue<MyType *>[num];

Well I'd scrap the default constructor. Have a single constructor that takes the number of elements. It's more flexible and if you really, really need it, you still have the option to wrap it in a class that gets the size through a template parameter and feeds it to the constructor in its own default constructor. With your design, I cannot create a queue with a size that is decided at runtime.

Also, if you used a std::vector<AtomicQueue<T> >, rather than manually new[]-ing up your array you wouldn't have your proposed problem and you'd get a lot of extra benefits, too.

Quote:

Quote:
To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

I haven't really done template metaprogramming (just used existing code before), so I don't know how to do that...


Something like:


template<bool convertible>
struct shut_the_compiler_up_cast_impl
{
template<typename To, typename From>
static To do_it(const From &x) { return reinterpret_cast<To>(x); }
};

template<>
struct shut_the_compiler_up_cast_impl<true>
{
template<typename To, typename From>
static To do_it(const From &x) { return static_cast<To>(x); } // or maybe just "return x;"?
};

template<typename To, typename From>
To shut_the_compiler_up_cast(From x)
{
const bool conv = boost::is_convertible<From, To>::value;
return shut_the_compiler_up_cast_impl<conv>::do_it<To>(x);
}



Now instead of reinterpret_cast or static_cast, use shut_the_compiler_up_cast.

Again, you could write the is_convertible<> meta function yourself if you didn't want to rely on boost. It's maybe 15 lines but it might make you go cross-eyed if you haven't seen the trick before :)

As I said, it's pretty messy and I think a C-cast with a comment would be better.

Quote:

Quote:
I assume the copy constructor and assignment operator should be explicitly disabled?

They don't really make sense in a concurrent setting.


Exactly. But I can still attempt to copy an AtomicQueue via the compiler generated copy constructor or copy assignment operator. Do this:


template<typename T>
class AtomicQueue
{
private:
AtomicQueue(const AtomicQueue &);
AtomicQueue &operator= (const AtomicQueue &);
};



Now I will get a compiler error if I even attempt to copy an AtomicQueue. You don't even have to define the bodies of those functions because they're impossible to call (unless you do so accidentally inside a member function!). Alternatively, you could inherit boost::noncopyable, which basically adds the above to your class for you.

Quote:

Quote:
Are your references available online somewhere? Links to them in the source would be nice, also.

The problem is a dynamic Web (links do die) and journal access for some of the papers. With Google, I only found the two verification papers with full text available on the public Web. Email me if you want full text of any of the others.


Ok. If you get around to distributing it outside the forum, it might be nice to bundle the papers with it, license restrictions permitting.

Quote:

Quote:
So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

I have tested this by inserting various combinations of delays at many different locations (to increase the chance that an operation will get interrupted), and have run a few tests for an hour at a time, checking head/tail consistency with pushes and pops carried out, as well as data integrity.


But only on your system(s). I have an AMD X2 processor on this machine. I *need* to run the tests on here before I can trust the code. You've gotta supply the tests. This is hard code to write and I applaud you for having a go at it. But since many people won't even be able to understand it or verify it by reading it, there has to be some other kind of reassurance. You may already be planning to include the tests, I just haven't heard you say it yet :)

Share this post


Link to post
Share on other sites
Quote:
Original post by the_edd
I realise a forum isn't the best place to post a test suite, but if you get around to bundling up a zip or creating a public repository, I would strongly encourage you to make the tests available.

Well, this is only a prototype for now.

Quote:
As Gage64 said, I was thinking along the lines of BOOST_STATIC_ASSERT. If you don't want to rely on Boost, it's only 5 lines of code to implement it yourself. The point is that the code won't even compile if these assertions fail:

The simplest I found is this:
template<bool> struct CompileError;
template<> struct CompileError<true>
{
};
#define STATIC_ASSERT(X) (CompileError<(X)>())

Quote:
If you perform all your manipulations through the static functions in traits, then you have support for arbitrary element types. If they're 4 bytes, the elements will be stored by value, else by pointer.

Thanks, I will try to add this when I have the time to mess around with this queue again in a few days.

Quote:
You have a point, but at some point someone is going to have to do the allocation. Are you as the container author not in a better position (the best position?) to do that efficiently? You may not be, this isn't a rhetorical question! You could also consider an allocator template parameter.

It has to do with the primary use of this queue in my code. I have buffers where the skinning and simulation threads (generally OpenMP) dump updated vertex data, then push pointers to the right buffer into the queue, to be collected by the OpenGL render thread. The buffers are preallocated based on the predetermined sizes of meshes, and so I never considered allocation by the queue itself. In the way I see the semantics of this, this queue is meant as simply a conveyor between threads.

Quote:
Well I'd scrap the default constructor. Have a single constructor that takes the number of elements. It's more flexible and if you really, really need it, you still have the option to wrap it in a class that gets the size through a template parameter and feeds it to the constructor in its own default constructor. With your design, I cannot create a queue with a size that is decided at runtime.

Also, if you used a std::vector<AtomicQueue<T> >, rather than manually new[]-ing up your array you wouldn't have your proposed problem and you'd get a lot of extra benefits, too.

OK, I agree with this and will change it.

Quote:
As I said, it's pretty messy and I think a C-cast with a comment would be better.

Yes, it seems that way. I'll go with a C-cast.

Quote:
Now I will get a compiler error if I even attempt to copy an AtomicQueue.

Yes, I use this technique, I simply hadn't considered the issue of copyability until this discussion, which is why I had not included this yet.

Quote:
But only on your system(s). I have an AMD X2 processor on this machine. I *need* to run the tests on here before I can trust the code. You've gotta supply the tests. This is hard code to write and I applaud you for having a go at it. But since many people won't even be able to understand it or verify it by reading it, there has to be some other kind of reassurance. You may already be planning to include the tests, I just haven't heard you say it yet :)

I think I should do that once I have a more finalized version of the queue rather than just this prototype, since some functionality may change.

I think the only aspect that is not too clear in the code is that the head/tail gets incremented even if the DCAS inserting/removing an item fails. Without this, if thread A gets preempted after its DCAS succeeds but it hasn't incremented the position yet, causing thread's B DCAS to fail, B cannot continue until A resumes and completes that increment. So the idea is that B increments the lagging position on A's behalf.

I mentioned in a previous post that there's an alternative way to fix the Shann algorithm, so I'm posting that version of the code here. Notice that now the array is not read an extra time to verify the null. The assumption is that the first read of the array into a doesn't need to happen until after the full/empty check, since there shouldn't be a need to check in the DCAS that the array location is consistent from before that full/empty check--it only needs to make sure it's consistent from before the check whether head/tail is still consistent. I have tested this and it seems to work as well as the first version, but of course there's no formal verification for this version so for now put more stock in the original; the performance improvement is minor in any case. I'm waiting response from both sets of authors as to the reason they chose different approaches. One more important note about this code: though I used the different fix, I still retained the modification from the verification paper that has an additional check for the possibility of a retry in case of full/empty. You can see that this doesn't interact with the other issue because the control flow for that is either a continue or a return.

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(tail), h(head);
//_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86 _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
if (t == head + size)
{
if (buff[h % size] & 0xFFFFFFFFull && h == head) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
__declspec(align(8)) unsigned long long const a(buff[t % size]);
if (t != tail) continue;
// Item is stored in lower two bytes since the interlocked intrinsics ignore the MSB
if (a & 0xFFFFFFFFull) _InterlockedCompareExchange(&tail, t + 1, t);
else if (_InterlockedCompareExchange64(buff + t % size, static_cast<unsigned long long>(*reinterpret_cast<unsigned long const *>(&item))
| ((a & 0xFFFFFFFF00000000ull) + 0x100000000ull), a) == static_cast<long long>(a))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(head), t(tail);
//_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
if (h == tail)
{
if (!(buff[t % size] & 0xFFFFFFFFull) && t == tail) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
__declspec(align(8)) unsigned long long const a(buff[h % size]);
if (h != head) continue;
if (!(a & 0xFFFFFFFFull)) _InterlockedCompareExchange(&head, h + 1, h);
else if (_InterlockedCompareExchange64(buff + h % size, (a & 0xFFFFFFFF00000000ull) + 0x100000000ull, a) == static_cast<long long>(a))
{
_InterlockedCompareExchange(&head, h + 1, h);
unsigned long dummy(a & 0xFFFFFFFFull);
item = *reinterpret_cast<T *>(&dummy);
return true;
}
}
}


I'd like to ask, should I expect a perofmance improvement if I, instead of using bitwise-and to get the lower 32 bits, use a union of a long long with a struct of two longs? I guess the question reduces to whether the & is faster than a struct member access.

[Edited by - Prune on November 17, 2008 2:59:33 PM]

Share this post


Link to post
Share on other sites
Quote:
I'd like to ask, should I expect a perofmance improvement if I, instead of using bitwise-and to get the lower 32 bits, use a union of a long long with a struct of two longs? I guess the question reduces to whether the & is faster than a struct member access.


I'd use the bitwise and. The union will have endianness issues. I'd also expect most compilers to be able to do the right thing there. Check the disassembly in release mode to be certain.

You may also want to use the standard sized types. std::int32_t, etc.

Share this post


Link to post
Share on other sites
Surely there's some way to choose the struct member order based on detected endianness of the system.

With the union, I ended up with cleaner code and the casting issue is gone.

As for int32_t, MSVC at least is not a C-99 compiler so there's no stdint.h; I also want to stay independent of Boost.

Here's a version with some of the updates suggested this far, including removal of volatile (well, it's still there in Acquire(), but no assumptions are made other than the standard C++ definition).

[Edit:] processor.h updated to compile without warnings on Intel Compiler for Windows.
#if !defined PROCESSOR_H
#define PROCESSOR_H

// Note: MSVC may require /Oi compiler option

// CPU load/store reordering barriers and fast yield
// TODO: For other than _MSC_VER and __ICL
#if defined _M_IA64 // TODO: Probably no IA64 support needed
extern "C" __mf
#pragma intrinsic(__mf)
#define MemoryBarrier __mf
extern "C" void __yield(void);
#pragma intrinsic(__yield)
#define YieldProcessor __yield
#else
#if !defined __ICL || __ICC || __ECC
extern "C" void _mm_mfence(void);
#pragma intrinsic(_mm_mfence)
extern "C" void _mm_lfence(void);
#pragma intrinsic(_mm_lfence)
extern "C" void _mm_sfence(void);
#pragma intrinsic(_mm_sfence)
extern "C" void _mm_pause(void);
#pragma intrinsic(_mm_pause)
#endif
#define YieldProcessor _mm_pause
#if defined _M_AMD64 // TODO: Check if this is OK if load barrier needed
extern "C" void __faststorefence(void);
#pragma intrinsic(__faststorefence)
#define MemoryBarrier __faststorefence
#elif defined _M_IX86
/*__forceinline void MemoryBarrier(void)
{
long Barrier;
__asm xchg Barrier, eax
}*/

#define MemoryBarrier _mm_mfence
#else
#error "Unsupported environment"
#endif
#endif

// Compiler load/store reordering barriers
// TODO: Check if these are implied by the CPU barriers; __ECC should use explicit ...with acquire/release instructions instead
#if defined __ICL || __ICC || __ECC
#define _ReadWriteBarrier __memory_barrier
#define _ReadBarrier __memory_barrier
#define _WriteBarrier __memory_barrier
#elif defined _MSC_VER
extern "C" void _ReadWriteBarrier(void);
#pragma intrinsic(_ReadWriteBarrier)
extern "C" void _ReadBarrier(void);
#pragma intrinsic(_ReadBarrier)
extern "C" void _WriteBarrier(void);
#pragma intrinsic(_WriteBarrier)
#elif defined __GNUC__ // TODO: Other options for read- or write-only barriers?
#define _ReadWriteBarrier() __asm__ __volatile__("" ::: "memory")
#define _ReadBarrier() __asm__ __volatile__("" ::: "memory")
#define _WriteBarrier() __asm__ __volatile__("" ::: "memory")
#else
#error "Unsupported environment"
#endif

template<typename T>
__forceinline T Acquire(T volatile const &p)
{
T t = p;
_ReadWriteBarrier();
return t;
}

template<typename S, typename T>
__forceinline void Release(volatile S &p, T const t)
{
_ReadWriteBarrier();
p = t;
}

// Interlocked intrinsics
// TODO: For other than _MSC_VER and __ICL; note that __GNUC__ __sync_bool_compare_and_swap does the comparison but _Interlocked... doesn't
#if !defined __ICL || __ICC || __ECC
extern "C" long _InterlockedCompareExchange(volatile unsigned long *, unsigned long, unsigned long);
#pragma intrinsic(_InterlockedCompareExchange)
extern "C" long long _InterlockedCompareExchange64(volatile unsigned long long *, unsigned long long, unsigned long long);
#pragma intrinsic(_InterlockedCompareExchange64)
#endif

// Aligned memory allocator and deallocator
#if !defined __ICL || __ICC || __ECC
#if defined _MSC_VER
#include <malloc.h>
#elif defined __GNUC__
#include <mm_malloc.h>
#else
#error "Unsupported environment"
#endif
#endif

template<bool> struct CompileError;
template<> struct CompileError<true>
{
};
#define STATIC_ASSERT(X) (CompileError<(X)>())

template<typename S, typename T>
union Pack
{
inline Pack(void);
inline Pack(Pack const &);
inline Pack(unsigned long long const &);
inline Pack(S const &, T const &);
unsigned long long pack;
struct // TODO: Check if the below order is only correct for little-endian architectures
{
S lower;
T upper;
};
};

template<typename S, typename T>
inline Pack<S, T>::Pack(void)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
}

template<typename S, typename T>
inline Pack<S, T>::Pack(Pack const &other)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
pack = other.pack;
}

template<typename S, typename T>
inline Pack<S, T>::Pack(unsigned long long const &op)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
pack = op;
}

template<typename S, typename T>
inline Pack<S, T>::Pack(S const &nl, T const &#957;)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
lower = nl;
upper = nu;
}

#endif

// Copyright (C) 2008 Borislav Trifonov
// Based on algorithm from "A Practical Nonblocking Queue Algorithm Using Compare-And-Swap"

// TODO: For 64-bit system, if cannot adapt to _InterlockedCompare64Exchange128() or _InterlockedCompareExchange128(), use instead algorithm from Fig.5 in "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives"

#if !defined ATOMIC_QUEUE
#define ATOMIC_QUEUE

#include <exception>
#include "processor.h"

template<typename T>
class AtomicQueue
{
public:
class Exc : public std::exception
{
public:
inline Exc(const char []);
};
inline AtomicQueue(unsigned long);
inline ~AtomicQueue(void);
inline bool Push(T const); // Returns false if full
inline bool Pop(T &); // Returns false if emtpy
inline T operator[](unsigned long); // No error checking
private:
inline AtomicQueue(void);
AtomicQueue(AtomicQueue const &);
AtomicQueue &operator=(AtomicQueue const &);
__declspec(align(4)) unsigned long head, tail;
unsigned long size;
typedef Pack<T, unsigned long> AQPack;
AQPack *buff;
};

template<typename T>
inline AtomicQueue<T>::AtomicQueue(unsigned long sz) : head(0), tail(0), size(sz), buff(0)
{
STATIC_ASSERT(sizeof(T) == 4);
STATIC_ASSERT(sizeof(unsigned long) == 4);
STATIC_ASSERT(sizeof(unsigned long long) == 8);
buff = static_cast<AQPack *>(_mm_malloc(size * sizeof(AQPack), 8));
if (!buff) throw Exc("Not enough memory");
for (unsigned long i = 0; i < size; ++i) buff[i].pack = 0ull;
}

#include <iostream>
template<typename T>
inline AtomicQueue<T>::~AtomicQueue(void)
{
_mm_free(buff);
}

template<typename T>
inline bool AtomicQueue<T>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(Acquire(tail)), h(Acquire(head));
__declspec(align(8)) AQPack a(Acquire(buff[t % size].pack));
if (t != Acquire(tail)) continue;
if (t == Acquire(head) + size)
{
if (Acquire(buff[h % size].lower) && h == Acquire(head)) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
if (a.lower)
{
if (Acquire(buff[t % size].lower)) _InterlockedCompareExchange(&tail, t + 1, t);
}
else
{
__declspec(align(8)) AQPack b(item, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[t % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}
}

template<typename T>
inline bool AtomicQueue<T>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(Acquire(head)), t(Acquire(tail));
__declspec(align(8)) AQPack a(Acquire(buff[h % size].pack));
if (h != Acquire(head)) continue;
if (h == Acquire(tail))
{
if (!Acquire(buff[t % size].lower) && t == Acquire(tail)) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
if (!a.lower)
{
if (!Acquire(buff[h % size].lower)) _InterlockedCompareExchange(&head, h + 1, h);
}
else
{
__declspec(align(8)) AQPack b(0, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[h % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&head, h + 1, h);
item = a.lower;
return true;
}
}
}
}

template<typename T>
inline T AtomicQueue<T>::operator[](unsigned long i)
{
return buff[(head + i) % size].lower;
}

template<typename T>
inline AtomicQueue<T>::Exc::Exc(const char msg[]) : std::exception(msg)
{
}

#endif

Alternate version of push/pop:
template<typename T>
inline bool AtomicQueue<T>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(Acquire(tail)), h(Acquire(head));
if (t == h + size)
{
if (Acquire(buff[h % size].lower) && h == Acquire(head)) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
__declspec(align(8)) AQPack a(Acquire(buff[t % size].pack));
if (t != Acquire(tail)) continue;
if (a.lower) _InterlockedCompareExchange(&tail, t + 1, t);
else
{
__declspec(align(8)) AQPack b(item, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[t % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}
}

template<typename T>
inline bool AtomicQueue<T>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(Acquire(head)), t(Acquire(tail));
if (h == t)
{
if (!Acquire(buff[t % size].lower) && t == Acquire(tail)) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
__declspec(align(8)) AQPack a(Acquire(buff[h % size].pack));
if (h != Acquire(head)) continue;
if (!a.lower) _InterlockedCompareExchange(&head, h + 1, h);
else
{
__declspec(align(8)) AQPack b(0, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[h % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&head, h + 1, h);
item = a.lower;
return true;
}
}
}
}


[Edited by - Prune on November 17, 2008 4:57:15 PM]

Share this post


Link to post
Share on other sites
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue

And can you go back and edit your initial post so that it doesn't stretch the page out so widely? [smile] The source boxes shouldn't stretch out like that, but they are. Try breaking up the "TODO" comments into multiple lines and placing some of the trailing comments on their own line. That might resolve the stretching.

Share this post


Link to post
Share on other sites
Quote:
Original post by LessBread
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue

If you look at page 4, where he references fully non-blocking queues, he refers to Michael and Scott's algorithm, which I also referred to above when I mentioned linked-list-based queues; that algorithm is also discussed in one of the verification papers I mentioned.
For both array and linked-list implementations, one appears to require a DCAS, but Sutter is in fact mistaken--it's not necessary. See the single-word CAS paper I referenced earlier (I'm going to forward that reference to Sutter). Now, this is a bit of a trick since, though this last example is an array-based queue, it does use a linked list for the LL/SC emulation, but the efficiency impact is small--indeed, this approach is actually faster than DCAS on systems with more than a few processors.

Quote:
And can you go back and edit your initial post so that it doesn't stretch the page out so widely? [smile] The source boxes shouldn't stretch out like that, but they are. Try breaking up the "TODO" comments into multiple lines and placing some of the trailing comments on their own line. That might resolve the stretching.

Done.

[Edited by - Prune on November 17, 2008 3:43:20 PM]

Share this post


Link to post
Share on other sites
Quote:
Original post by LessBread
Prune, how does your implementation compare with Herb Sutter's exploration of concurrency in the last few issues of Dr. Dobb's? Writing a Generalized Concurrent Queue
I can't speak for Prune's implementation, but Sutter teaches us that locality is very important for concurrency. Therefore I was confused by his choice of a linked-list over an array when implementing the queue in his article - wouldn't an array be much more cache friendly?

Share this post


Link to post
Share on other sites
I posted it on Sutter's blog.

Interestingly, Intel's Compiler doesn't seem to have an intrinsic for the double quad-word CAS cmpxchg16b, even though MSVC does (_InterlockedCompareExchange128), though MSDN warns that it's slow. Perhaps this omission is an indication Intel might drop that instruction from future x86-64 CPUs (there have been rumors going back to 2004) :(
Itanium only has cmp8xchg16 (_InterlockedCompare64Exchange128) which, though it swaps two quad-words, only compares one, so it's basically useless for this algorithm, and on IA64 the Evequoz algorithm would need to be used. Given the slowness consideration about cmpxchg16b, that algorithm might actually be comparable in speed on x86-64 as well.

Bleh, linking to other stuff that uses the intrinsics, such as SDL, gives

"error C2733: second C linkage of overloaded function '_InterlockedCompareExchange' not allowed"

So I had to change the arguments of the interlocked intrinsics in processor.h back to long from unsigned long.

[Edited by - Prune on November 17, 2008 6:45:09 PM]

Share this post


Link to post
Share on other sites
Going back to your sizeof(T)==4 restriction - AFAIK this isn't required.

The only variables that need to be atomic (i.e. need to use interlocked exchange) are the head/tail variables - the actual data array shouldn't require any special protection. See Suttors words on "do work, then publish" - the actual data array is the work, and head/tail are used to publish.

My FIFO uses a special Index class for the head/tail vars. This class uses 32-bit Compare And Swap and allows the head/tail vars to be incremented while making sure that they cannot increment past each other (turning a full queue into an empty queue by accident). There's also some hackery to avoid the ABA problem. Locking is also provided to allow the array to be resized. 16bits are given to the index, 15 to "solving" ABA, and 1 bit acts as a mutex.

N.B. The TAtomic class wraps up the CAS operation with the function SetIfEqual(new,old).
//copyright 2008 Hodgman - Free for personal or educational use only ;)
enum IncrementType
{
Pre,
Post
};

/** @brief Integer type that maintains a counter of modifications made.
* The Counted Index is designed to avoid the "ABA problem" in lock-free code.
* It can also act as a fast user-space mutex.
* @todo finish documenting this class
*/

class CCountedIndex
{
public:
CCountedIndex(uint32 v=0) : i(v) {}
CCountedIndex(const CCountedIndex& o) : i(o.i) {}

/** Tries to lock the mutex bit
* @return true if the mutex was locked, else false */

bool Lock()
{
uint v = i;
if( v & LockMask )
return false;
return i.SetIfEqual( v | LockMask, v ); //set lock
}
/** Unlocks the mutex bit. The mutex is assumed to be locked prior to calling. */
void Unlock()
{
uint v = i;
ASSERT( v & LockMask );
i = v & ~LockMask; //remove lock
}
/** Assigns a new value to the index bits. No thread-safety checks! */
void Set( uint32 index )
{
uint v = i;
ASSERT( v & LockMask );
uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter
((index)&IndexMask) | //set index
LockMask; //keep lock
i = n;
}
/** Increment the index bits. May fail due to concurrent modifications.
* @param limit One past the maximum index value. Index will wrap to 0 if limit is reached.
* @return true if the index was incremented, else false */

bool Increment( uint32 limit )
{
uint v = i;
if( v & LockMask )
return false;
uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter
Inc(v&IndexMask,limit); //increment index
return i.SetIfEqual( n, v );
}
/** . */
template<IncrementType T, bool L>
uint32 Increment( uint32 limit, uint32 full, bool& fail )
{
uint32 v = i;
ASSERT( !L || (v & LockMask) )
if( !L && (v & LockMask) )
{
return std::numeric_limits<uint32>::max();
}
uint oldIndex = v&IndexMask;
uint newIndex = Inc(oldIndex, limit); //increment index
fail = (newIndex == full);
if( fail )
return std::numeric_limits<uint32>::max();
int n = newIndex | ((v+(1<<IndexBits))&CountMask) |//increment counter
(L?LockMask:0);
return i.SetIfEqual( n, v )
? (T==Pre?oldIndex:newIndex)
: std::numeric_limits<uint32>::max();
}
/** . */
template<IncrementType T, bool L>
uint32 Increment( uint32 limit, const CCountedIndex& original )
{
uint32 v = i;
ASSERT( !L || (v & LockMask) )
if( v != original.i || (!L&&(v & LockMask)) )
return std::numeric_limits<uint32>::max();
uint oldIndex = v&IndexMask;
uint newIndex = Inc(oldIndex, limit); //increment index
int n = newIndex |
((v+(1<<IndexBits))&CountMask)| //increment counter
(L?LockMask:0);
return i.SetIfEqual( n, v )
? (T==Pre?oldIndex:newIndex)
: std::numeric_limits<uint32>::max();
}
/** . */
void IncrementAndUnlock( uint32 limit )
{
uint v = i;
ASSERT( v & LockMask );
uint n = ((v+(1<<IndexBits))&CountMask) | //increment counter
Inc(v&IndexMask,limit); //increment index
i = n;
}
/** . */
bool IncrementAndLock( uint32 limit )
{
uint v = i;
if( v & LockMask )
return false;
int n = ((v+(1<<IndexBits))&CountMask) | //increment counter
Inc(v&IndexMask,limit) | //increment index
LockMask; //set lock
return i.SetIfEqual( n, v );
}
/** . */
operator uint32() { return i&IndexMask; }
/** . */
bool operator==( uint32 j ) { return (uint32(i)&IndexMask) == j; }
private:
CCountedIndex& operator=(const CCountedIndex& o);
inline int Inc(uint32 i, uint32 size){ ASSERT(i<IndexMask);return (i+1==size)?0:i+1; }
TAtomic<uint32> i;
const static uint IndexMask = 0x0000FFFF;
const static uint CountMask = 0x7FFF0000;
const static uint LockMask = 0x80000000;
const static uint IndexBits = 16;
const static uint CountBits = 15;
const static uint LockBits = 1;
};

Share this post


Link to post
Share on other sites
Quote:
Original post by Hodgman
I can't speak for Prune's implementation, but Sutter teaches us that locality is very important for concurrency. Therefore I was confused by his choice of a linked-list over an array when implementing the queue in his article - wouldn't an array be much more cache friendly?


Well if you have two threads writing to the same cache line often (or simply one reading and one writing), that's very bad for concurrency. By putting the elements in a list, you reduce the chance of this happening. See ~1:18:40 in http://video.google.com/videoplay?docid=-4714369049736584770.

Share this post


Link to post
Share on other sites
Hodgman, I don't see how you'd handle the following situation:
You have a queue as follows: 00X0000 (X represents one element, 0 empty).
Counting from 0, head is at 2 and tail at 3.
Three threads initiate writes. The tail is incremented atomically three times to 6; the threads publish 3, 4, and 5 correspondingly for work. Then they are simultaneously writing into those locations. A reader thread can read locations 3-5 before their writes have completed, which is incorrect behavior.

You could do something with two heads and two tails, but then the code complexity becomes equivalent to the modified Shann algorithm I used.

Note also that there are several formally proved properties about that algorithm's correctness, meeting invariants of the definition of a queue, linearizability, and being fully non-blocking, which is what decided the issue for me. Finally, a 32-bit CAS can be used if you instead consider one 16 bit word to be the reference counter and the other, instead of a pointer or whatever, to be a 16 bit index into an array (or some other subdivision of the 32 bits).

the_edd, one could use a strided "array" instead with elements spaced at least one cache line apart :P

BTW I'm really having a problem with the signed/unsigned thing. How do I avoid having to put reinterpret_cast for the first argument of every _InterlockedExchange??() ? (plus, would the standard conversion preserve bit patterns of the other, non-pointer, arguments? I think this would be the case on 2's complement architectures)

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
Hodgman, I don't see how you'd handle the following situation:
You have a queue as follows: 00X0000 (X represents one element, 0 empty).
Counting from 0, head is at 2 and tail at 3.
Three threads initiate writes. The tail is incremented atomically three times to 6; the threads publish 3, 4, and 5 correspondingly for work. Then they are simultaneously writing into those locations. A reader thread can read locations 3-5 before their writes have completed, which is incorrect behavior.
Sorry, I forgot to mention that my array is a structure like this:
struct Node{
T data;
CAtomic valid;
Node():valid(0){}
Node(const Node&o):data(o.data),valid(o.valid){}
};
TAtomic<Node*> array;
CCountedIndex head, tail;
The 'valid' variable within the array is used as a flag to indicate whether the write has competed yet or not.
If a pop operation occurs on a node that is not yet valid, then it has to wait for the writer to complete it's operation (and set the flag).
Push looks something like:
uint32 limit = 7;//size of array
uint32 full = head;
bool isFull;
uint32 write = tail.Increment<Pre,false>( limit, full, isFull );
if( write != std::numeric_limits<uint32>::max() )//increment succeeded
{
array[write].data = v;//do work
array[write].valid = 1;//then publish
break;//this snippet is inside a while(true) loop
}
else if( isFull )//increment failed because array is full
{
//try to lock head+tail and resize array
}

Share this post


Link to post
Share on other sites
But this is in essence a spin-lock, so your algorithm cannot be considered lock-free. Can you give guarantees that lock convoys, priority inversion, livelock/deadlock, etc. can never occur?

Share this post


Link to post
Share on other sites
Mostly all my thoughts were covered by the above posts (wicked thread and snippets by the way!! I love seeing this calibre of discussion on GDNet, awesome).

Cache line friendly arrays are definitely superior to linked lists for playing nice in the sandbox with L2 Cache (especially on the 360).

Also, I can definitely backup the use of the memory barriers. I have run into this exact same problem doing concurrency programming with a ring buffer on the 360. Certain optimizations would reorder instructions, and the write index of the ring buffer would sometimes miss an increment write before the next read happened, so forcing a memory barrier in a key place flushed out the transaction.

Nice work!

~Graham

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
But this is in essence a spin-lock, so your algorithm cannot be considered lock-free. Can you give guarantees that lock convoys, priority inversion, livelock/deadlock, etc. can never occur?
This spin-lock only occurs when a pop operation tries to pop data that is still being written. Most of the time, a write is very simple operation so this should be a rare occurrence (which is why I chose to spin rather than to return immediately).

To be truly Lock-Free, I have to guarantee that at least one thread will always be making progress, even if all the other threads are paused.
To make it satisfy this condition, the pop operation should just return false (failure) if the write is not yet completed (instead of spinning).

I should probably make this a boolean template argument so the user of my FIFO can choose if they want it to be truly lock free or not. ;)
This would require disabling my resize feature though, as I'm not sure how you would resize/relocate the array in a true lock-free design.

Share this post


Link to post
Share on other sites
Hogman, I'll take a look at your code again tomorrow.

I played around a bit with alignment. In my testing I'm using two writers and two readers. For the following, I set it so that in most cases the queue would be neither full nor empty.

Putting head and tail on different cache lines (align to 32, since align() doesn't officially take a larger number, and then put an intervening member variable) doesn't lead to a speedup I can notice rising above the noise floor of variations between trials.

Strided array access, where the stride value is 4 (assuming 64-bit Core 2 cache lines) and array size is 999 (thus it uses every element of the array) gives a quarter to a third speedup. However, using an array size of 9999 has the opposite effect. So it seems worrying about cache line thrashing is only worth it if the array fits within L1, otherwise the more frequent reads from L2 is the overriding consideration.

One problem due to not knowing the size at compile time is that I can't set the stride at compile time either, and having stride as a variable instead of literal gives me a big performance hit--comparable to the gain I was getting from avoiding the cache line thrashing itself! (Having size as a literal makes much less of a difference--am I running out of registers here?) Any ideas as to how to best resolve this?

(Note: the temporaries in push/pop have alignment specified as well even though they should stay in registers just in case, since they must have those minimum alignments for the interlocked operations, though I guess even there it's only needed for the 64-bit one, since 4-byte basic types are aligned by default to 4-byte boundaries.)

Share this post


Link to post
Share on other sites

This topic is 3317 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this