Jump to content
  • Advertisement
Sign in to follow this  
Prune

Here's an implementation of a lock-free multi-reader/multi-writer array-based queue

This topic is 3676 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I've tested the following code and it works very well. I started with the algorithm from "A Practical Nonblocking Queue Algorithm Using Compare-And-Swap", however I found an error in their algorithm (after a very protracted debugging section to first find the error and then check that it was not due to an implementation mistake). Searching other references to this paper, I came across "Formal Verification of An Array-based Nonblocking Queue" which discusses the same error, and provides an fix as well as a small enhancement. I was still getting occasionally an error, but a load barrier fixed that (see code). This code might still have some bugs despite my best effort, so I make no warranties whatsoever regarding it. Additionally, if you do not substantially change it, please retain the copyright notice. A couple of notes are in order: - if you enqueue/dequeue in a busy loop waiting for a non-full/non-empty queue, performance might be better if you add a slight delay after each failed operation, at least an _mm_pause (or __yield if you're on AMD64); - there weird-looking casting with dereference and address-of operators around item is because if T is something like an int or float, where there is a standard conversion defined, at least MSVC gives an error saying that static_cast should be used instead of reinterpret_cast (yet if there's no such conversion, the reverse happens and it says reinterpret_cast should be used); if anyone knows of a cleaner solution, please post (and C-style cast doesn't count); - _ReadBarrier is only a compiler reordering barrier, but seems sufficient in my testing (_WriteBarrier also works, as does _ReadWriteBarrier); using _mm_lfence, which is a CPU reordering barrier, also seems to work, but _mm_sfence doesn't (the latter is strange given _WriteBarrier works, but I have no explanation); do not remove the barrier as with some optimization options you will get errors (albeit infrequent for most setups); - this won't work on a 64-bit systems; using _InterlockedCompareExchange128 might allow an easy conversion, however; - and, I have not tested this on an AMD processor (though I expect it should work).
// Copyright (C) 2008 Borislav Trifonov
// Based on algorithm from "Formal Verification of An Array-based Nonblocking Queue"

// TODO: For 64-bit system, if cannot adapt to _InterlockedCompare64Exchange128() or _InterlockedCompareExchange128(),
// use instead algorithm from Fig.5 in "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives"

#if !defined ATOMIC_QUEUE
#define ATOMIC_QUEUE

#include <exception>
#include <cassert>
#include "processor.h"

template<class T, unsigned long size>
class AtomicQueue
{
public:
	class Exc : public std::exception
	{
	public:
		inline Exc(const char []);
	};
	inline AtomicQueue(void);
	inline ~AtomicQueue(void);
	inline bool Push(T const); // Returns false if full
	inline bool Pop(T &); // Returns false if emtpy
	inline T operator[](unsigned long); // No error checking
private:
	__declspec(align(4)) volatile unsigned long head, tail;
	unsigned long long *volatile buff; // TODO: Find out if the volatile is necessary
};

template<class T, unsigned long size>
inline AtomicQueue<T, size>::AtomicQueue(void) : head(0), tail(0), buff(0)
{
	assert(sizeof(T) == 4);
	assert(sizeof(unsigned long) == 4);
	assert(sizeof(unsigned long long) == 8);
	buff = static_cast<unsigned long long *>(_mm_malloc(size * sizeof(unsigned long long), 8));
	if (!buff) throw Exc("Not enough memory");
	for (int i = 0; i < size; ++i) buff = 0ull;
}

#include <iostream>
template<class T, unsigned long size>
inline AtomicQueue<T, size>::~AtomicQueue(void)
{
	_mm_free(buff);
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Push(T const item)
{
	while (true)
	{
		__declspec(align(4)) unsigned long const t(tail), i(t % size), h(head);
		__declspec(align(8)) unsigned long long const a(buff);
		_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
		if (t != tail) continue;
		if (t == head + size)
		{
			if (buff[h % size] & 0xFFFFFFFFull && h == head) return false;
			_InterlockedCompareExchange(&head, h + 1, h);
			continue;
		}
		// Item is stored in lower two bytes since the interlocked intrinsics ignore the MSB
		if (a & 0xFFFFFFFFull)
		{
			if (buff & 0xFFFFFFFFull) _InterlockedCompareExchange(&tail, t + 1, t);
		}
		else if (_InterlockedCompareExchange64(buff + i, static_cast<unsigned long long>(*reinterpret_cast<unsigned long const *>(&item))
| ((a & 0xFFFFFFFF00000000ull) + 0x100000000ull), a) == static_cast<long long>(a))
		{
			_InterlockedCompareExchange(&tail, t + 1, t);
			return true;
		}
	}
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Pop(T &item)
{
	while (true)
	{
		__declspec(align(4)) unsigned long const h(head), i(h % size), t(tail);
		__declspec(align(8)) unsigned long long const a(buff);
		_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
		if (h != head) continue;
		if (h == tail)
		{
			if (!(buff[t % size] & 0xFFFFFFFFull) && t == tail) return false;
			_InterlockedCompareExchange(&tail, t + 1, t);
			continue;
		}
		if (!(a & 0xFFFFFFFFull))
		{
			if (!(buff & 0xFFFFFFFFull)) _InterlockedCompareExchange(&head, h + 1, h);
		}
		else if (_InterlockedCompareExchange64(buff + i, (a & 0xFFFFFFFF00000000ull) + 0x100000000ull, a) == static_cast<long long>(a))
		{
			_InterlockedCompareExchange(&head, h + 1, h);
			unsigned long dummy(a & 0xFFFFFFFFull);
			item = *reinterpret_cast<T *>(&dummy);
			return true;
		}
	}
}

template<class T, unsigned long size>
inline T AtomicQueue<T, size>::operator[](unsigned long i)
{
	unsigned long dummy(buff[(head + i) % size] & 0xFFFFFFFFull);
	return *reinterpret_cast<T *>(&dummy);
}

template<class T, unsigned long size>
inline AtomicQueue<T, size>::Exc::Exc(const char msg[]) : std::exception(msg)
{
}

#endif

Comments welcome. I hope someone besides me finds it useful. The processor.h header is simply my way of avoiding having to include Windows header files. It needs some additions before it can be used on Linux and so is incomplete, but here it is anyway.
#if !defined PROCESSOR_H
#define PROCESSOR_H

// Note: MSVC may require /Oi compiler option

// CPU load/store reordering barriers and fast yield
// TODO: For other than _MSC_VER; might be OK for __ICL
#if defined _M_IA64 // TODO: Probably no IA64 support needed
extern "C" __mf
#pragma intrinsic(__mf)
#define MemoryBarrier __mf
extern "C" void __yield(void);
#pragma intrinsic(__yield)
#define YieldProcessor __yield
#else
extern "C" void _mm_mfence(void);
#pragma intrinsic(_mm_mfence)
extern "C" void _mm_lfence(void);
#pragma intrinsic(_mm_lfence)
extern "C" void _mm_sfence(void);
#pragma intrinsic(_mm_sfence)
extern "C" void _mm_pause(void);
#pragma intrinsic(_mm_pause)
#define YieldProcessor _mm_pause
#if defined _M_AMD64 // TODO: Check if this is OK if load barrier needed
extern "C" void __faststorefence(void);
#pragma intrinsic(__faststorefence)
#define MemoryBarrier __faststorefence
#elif defined _M_IX86
/*__forceinline void MemoryBarrier(void)
{
	long Barrier;
	__asm xchg Barrier, eax
}*/
#define MemoryBarrier _mm_mfence
#else
#error "Unsupported environment"
#endif
#endif

// Compiler load/store reordering barriers
// TODO: Check if these are implied by the CPU barriers
#if defined __ICL || defined __ICC || defined __ECC
#define _ReadWriteBarrier __memory_barrier
#define _ReadBarrier __memory_barrier
#define _WriteBarrier __memory_barrier
#elif defined _MSC_VER
extern "C" void _ReadWriteBarrier(void);
#pragma intrinsic(_ReadWriteBarrier)
extern "C" void _ReadBarrier(void);
#pragma intrinsic(_ReadBarrier)
extern "C" void _WriteBarrier(void);
#pragma intrinsic(_WriteBarrier)
#elif defined __GNUC__ // TODO: Other options for read- or write-only barriers?
#define _ReadWriteBarrier() __asm__ __volatile__("" ::: "memory")
#define _ReadBarrier() __asm__ __volatile__("" ::: "memory")
#define _WriteBarrier() __asm__ __volatile__("" ::: "memory")
#else
#error "Unsupported environment"
#endif

// Interlocked intrinsics
// TODO: For other than _MSC_VER; might be OK for __ICL;
// note that __GNUC__ __sync_bool_compare_and_swap does the comparison but _Interlocked... doesn't
extern "C" long _InterlockedCompareExchange(volatile unsigned long *, unsigned long, unsigned long);
#pragma intrinsic(_InterlockedCompareExchange)
extern "C" long long _InterlockedCompareExchange64(volatile unsigned long long *, unsigned long long, unsigned long long);
#pragma intrinsic(_InterlockedCompareExchange64)

// Aligned memory allocator and deallocator
#if defined __ICL || defined __ICC || defined __ECC // TODO: May not be necessary; also, probably __ECC (IA64) support not needed
extern "C" void* __cdecl _mm_malloc(size_t, size_t);
extern "C" void __cdecl _mm_free(void *);
#elif defined _MSC_VER
#include <malloc.h>
#elif defined __GNUC__
#include <mm_malloc.h>
#else
#error "Unsupported environment"
#endif

#endif

[Edited by - Prune on November 17, 2008 2:48:24 PM]

Share this post


Link to post
Share on other sites
Advertisement
In my multi-threaded library, Flow, the queue implementation I used was lock-based, because I didn't have the time to design and implement a lock-free one:

template <typename T>
class channel
{
typedef T value_t;
typedef boost::optional<value_t> optional_t;
typedef std::queue<value_t> queue_t;
typedef boost::mutex mutex_t;
typedef mutex_t::scoped_lock lock_t;
typedef boost::condition condition_t;

// Input-output elements ================================================

// Any operation over the data passes through the
// data mutex. This allows sync of both input and
// output threads.

queue_t data;
volatile bool is_over;
mutex_t data_mutex;
condition_t written_to;

// Reference counting elements ==========================================

// Any operation over the handles passes through
// the handle mutex.

mutex_t handle_mutex;
volatile unsigned output_handles;
volatile unsigned input_handles;
volatile bool is_silent;

public:

// Initialize a clean channel
channel() :
is_over(false),
output_handles(0),
input_handles(0),
is_silent(false)
{
}

void acquire(bool input)
{
lock_t lock(handle_mutex);
if (input)
++input_handles;
else
++output_handles;
}

// Release a reference to the flow.
// - If all threads cease to reference the flow, then
// the memory is deallocated with operator delete
// - If all input threads cease to reference the flow,
// sets the is-over flag and wakes up an output
// thread.
// - If all output threads cease to reference the
// flow, sets the is-silent flag.
void release(bool input)
{
bool should_delete = false;

{
lock_t lock(handle_mutex);

if (input)
{
if (--input_handles == 0)
{
lock_t lock(data_mutex);
is_over = true;
written_to.notify_all();
}
}
else
{
if (--output_handles == 0)
{
is_silent = true;
}
}

should_delete = (input_handles + output_handles == 0);
}

if (should_delete)
{
delete this;
}
}

// Is the flow silent?
bool silent()
{
lock_t lock(handle_mutex);
return is_silent;
}

// Add an element to the flow.
// - If any output threads were waiting for input,
// one of them is reactivated and reads the input.
// - Otherwise, stores the element for later collection.
void add(const value_t &t)
{
lock_t lock(data_mutex);
data.push(t);
written_to.notify_one();
}

// Attempt to read an element from the flow.
// - If the flow is over, leaves the argument untouched.
// - If the flow is currently empty, but there are still
// input threads, blocks until input (or an is-over
// signal) arrives.
void get(optional_t &t)
{
lock_t lock(data_mutex);

while (!is_over && data.empty())
{
written_to.wait(lock);
}

if (is_over && data.empty())
{
return;
}

t = data.front();
data.pop();
}
};



Your lock-free queue would be quite useful as an alternative implementation, especially since it might improve performance. Two problems I see with this are:

  • The fixed queue size. Algorithms that use the flow library may rely on the fact that the writer can dump a heapload of data into a flow before even creating the thread to read it.
  • The signature of the "bool pop(T&)" function, which assumes that an object of type T is already present (which, of course, is an assumption that the flow library cannot make because it works with arbitrary data). Would it be possible to use an alternative "boost::optional<T> pop()" signature?


(And of course the fact that the state of flows relies on stuff other than the queue, such as reference counting).

Share this post


Link to post
Share on other sites
Some miscellaneous comments.

1. First, the show-stopper (for me). I am not qualified to judge the implementation as I have not delved in to the deeper aspects of lock free programming. It is for this reason that I think providing an *extensive* set of stress tests is very important. This is especially true due to you being unsure about the type of barrier used in certain places. I have more confidence in your ability to do this correctly than in mine, but without these tests I wouldn't touch it with a 10ft barge pole.

2. I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

3. There are a few places where static assertions could be used in place of assert() for earlier errors.

4. It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

5. Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

6. To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

7. I assume the copy constructor and assignment operator should be explicitly disabled?

8. Are your references available online somewhere? Links to them in the source would be nice, also.

So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

Share this post


Link to post
Share on other sites
Quote:
Original post by ToohrVyk
The fixed queue size.

If you allow the queue to be locked when using an allocator would be trivial. If you want a concurrent nonblocking allocator, is not that simple. See the source code for the concurrent vector in Intel's Threaded Building Blocks for an example.

Quote:
The signature of the "bool pop(T&)" function, which assumes that an object of type T is already present (which, of course, is an assumption that the flow library cannot make because it works with arbitrary data). Would it be possible to use an alternative "boost::optional<T> pop()" signature?

Can you summarize boost::optional? My intention was that the queue would only hold pointers, and I changed it to handle any pointer-sized T as an afterthought (since I could alternatively put in indices instead of pointers, or whatever).

Share this post


Link to post
Share on other sites
Quote:
Original post by the_edd
It is for this reason that I think providing an *extensive* set of stress tests is very important.

That is true for the specific implementation, and I'm continuing testing. The algorithm, however, has been verified to be correct in the paper I referenced, "Formal Verification of An Array-based Nonblocking Queue".

I want to note here that I have also contacted the author of "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives", whose paper compared his algorithm to the one I started with ("A Practical Nonblocking Queue Algorithm Using Compare-And-Swap"), and I asked him about the error I had noticed in the previous algorithm--this is before I came across the verification paper and their fix. In his reply, he provided a different fix, that is simpler and I think more efficient--simply move the reading of the array entry to after the line doing the full/empty check. This saves a read of the array entry for some of the possible flows of control through push/pop, but I have not tested it myself yet (also he does not have the retry attempt that the verification paper introduces and you can see in my code). I might implement his fix and compare the two versions for efficiency; I'll post code later.

Quote:
This is especially true due to you being unsure about the type of barrier used in certain places.

The interlocked operations imply barriers on the CPU, and at least MSVC knows to treat them as barriers on the compiler level (on an IA64 system, there are even extra versions of the interlocked instructions depending on whether you might just need acquire or just release fencing). A problem, though, is that MSVC documentation is not very clear or even consistent in some cases. For example, I've not been able to find out if MSVC will know that the _mm_?fence() are barriers. One could alternatively use #pragma omp flush

In regards to the barrier, in MSVC volatile reads are supposed to have acquire semantics, and that should imply a read barrier. All the reads above the barrier will be completed when it hits (unless MSVC has a problem with buff though pointer to volatile should still have the claimed semantics). What is most likely the case here--and I will check the assembly being put out when I have the time--is that the compiler doesn't recompute the new value of i--even though t is updated since tail is volatile, it might not figure out that it needs to recompute i as well, since i doesn't get affected directly by a volatile barrier. This ties in with your next comment.

Quote:
I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

Indeed I intend to remove volatiles as much as possible in the next iteration, after considering http://software.intel.com/en-us/blogs/2007/11/30/volatile-almost-useless-for-multi-threaded-programming/
They should be replaced with explicit barriers, which not only makes things not rely on compiler specifics, but also can improve performance.
By the way, another issue with GCC is that it assumes strict aliasing if you set -O2, at least for C code, not so sure about C++. But my pointer casting breaks that if I understand the rule correctly, so -fnostrict-aliasing might need to be specified for compiling source files including atomic_queue.h

Quote:
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.

Quote:
It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

This relates to the previous reply post. It might be nice, and I'd welcome an example, but I was going for handling the core functionality first. Additionally, as lock-free is to a great extent about efficiency, I think that hiding memory allocations from the user only makes sense if the full range of use cases are considered and optimized, as is the case with STL containers. But this is a good deal of work.

Quote:
Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

Because then I'd have to disable the default constructor, which is a problem if I want to do something like queues = new AtomicQueue<MyType *>[num];

Quote:
To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

I haven't really done template metaprogramming (just used existing code before), so I don't know how to do that...

Quote:
I assume the copy constructor and assignment operator should be explicitly disabled?

They don't really make sense in a concurrent setting. This is the same reason that a multithreaded version of the STL I came across doesn't implement top(), full(), and empty() for their queue (there are similar opinions expressed in comments around the code for some stuff in Intel's Threaded Building Blocks). Perhaps one could lock the queue for copy operations (same with resize), but if you're really dealing with a queue that is expected to have a widely varying size, and you're willing to give up a bit of performance, simply use a linked-list based queue. See "Nonblocking Algorithms and Preemption Safe Locking on Multiprogrammed Shared Memory Multiprocessors" and "Verifying Michael and Scott's Lock-free Queue Algorithm Using Trace Reduction". I think Valve uses something similar in their most recent engine (see their GDC2007 presentation).

Quote:
Are your references available online somewhere? Links to them in the source would be nice, also.

The problem is a dynamic Web (links do die) and journal access for some of the papers. With Google, I only found the two verification papers with full text available on the public Web. Email me if you want full text of any of the others.

Quote:
So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

I have tested this by inserting various combinations of delays at many different locations (to increase the chance that an operation will get interrupted), and have run a few tests for an hour at a time, checking head/tail consistency with pushes and pops carried out, as well as data integrity. The thing I haven't tested yet is what happens when head or tail overflow and wrap around. The interlocked operations ignore the sign bit, so there would be actually two conceptual wrap-arounds for each integer overflow, but that simply means that the maximum reference count is reduced by a factor of two. That still leaves a significant safety margin--you'd need a thread to be preempted for a hell of a long time (proportional to queueSize*numeric_limits<unsigned long>::max()) to have the reference count wrap around before it resumes (and on top, the thread would have to resume on the exact same reference count value for there to be a problem--an extremely unlikely possibility).

[Edited by - Prune on November 16, 2008 3:39:49 AM]

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
Quote:
Original post by the_edd
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.


I think he meant something like boost.StaticAssert.

Share this post


Link to post
Share on other sites
Quote:
Original post by Prune
Quote:
Original post by the_edd
It is for this reason that I think providing an *extensive* set of stress tests is very important.

That is true for the specific implementation, and I'm continuing testing. The algorithm, however, has been verified to be correct in the paper I referenced, "Formal Verification of An Array-based Nonblocking Queue".


Formal verification is very nice to have for this kind of thing. But I still wouldn't trust it without tests. I realise a forum isn't the best place to post a test suite, but if you get around to bundling up a zip or creating a public repository, I would strongly encourage you to make the tests available.

Knuth: "Beware of bugs in the above code; I have only proved it correct, not tried it."

Quote:

Quote:
This is especially true due to you being unsure about the type of barrier used in certain places.

The interlocked operations imply barriers on the CPU, and at least MSVC knows to treat them as barriers on the compiler level (on an IA64 system, there are even extra versions of the interlocked instructions depending on whether you might just need acquire or just release fencing). A problem, though, is that MSVC documentation is not very clear or even consistent in some cases. For example, I've not been able to find out if MSVC will know that the _mm_?fence() are barriers. One could alternatively use #pragma omp flush

The MSDN forums are pretty good for this kind of thing. A had a rather hairy question answered there about the interaction of Fibers and volatile variables.

Quote:

Quote:
I noticed that there are provisions for GCC and MSVC in the code, but you also use volatile qualification for some of your variables. This worries me because the behavior and semantics of volatile vary between GCC ans MSVC, and even between different versions of MSVC. Has this been taken in to account? What properties are you assuming about volatile variables? I would document these assumptions (and in fact *all* assumptions you make). I don't know what guarantees ICC provides above the standard, if any.

Indeed I intend to remove volatiles as much as possible in the next iteration, after considering http://software.intel.com/en-us/blogs/2007/11/30/volatile-almost-useless-for-multi-threaded-programming/
They should be replaced with explicit barriers, which not only makes things not rely on compiler specifics, but also can improve performance.

That sounds like a good idea.

Quote:

Quote:
There are a few places where static assertions could be used in place of assert() for earlier errors.

Do you mean #error? I don't know of any other C++ standard static assertion mechanism. But assert() gets compiled away in Release builds anyway.

As Gage64 said, I was thinking along the lines of BOOST_STATIC_ASSERT. If you don't want to rely on Boost, it's only 5 lines of code to implement it yourself. The point is that the code won't even compile if these assertions fail:

STATIC_ASSERT(sizeof(T) == 4);
STATIC_ASSERT(sizeof(unsigned long) == 4);
STATIC_ASSERT(sizeof(unsigned long long) == 8);



Quote:

Quote:
It would be nice if you could support arbitrary data types, rather than just types of a particular size. I suspect this might be possible with a little meta-programming if you held a dynamically allocated copy of each element via a pointer in the case where T isn't 4 bytes in size (assuming sizeof(T*) == 4).

This relates to the previous reply post. It might be nice, and I'd welcome an example, but I was going for handling the core functionality first.


I was thinking along the lines of:

template<typename T, std::size_t Size>
struct element_traits
{
typedef T *element_type;
STATIC_ASSERT(sizeof(element_type) == 4);

static void create_at_location(const T &x, void *location) { *static_cast<T **>(location) = new T(x); }
static void destroy_at_location(void *location) { delete *static_cast<T **>(location); }
static T &get(void *location) { return **static_cast<T **>(location); }
};

template<typename T>
struct element_traits<T, 4>
{
typedef T element_type;

static void create_at_location(const T &x, void *location) { new(location) T(x); } // placement new with copy-constructor
static void destroy_at_location(void *location) { static_cast<T *>(location)->~T(); } // manual destructor call
static T &get(void *location) { return *static_cast<T *>(location); }
};



Now in your AtomicQueue<T> you would have:


template<typename T, ...>
class AtomicQueue
{
typedef element_traits<T, sizeof(T)> traits;
};



If you perform all your manipulations through the static functions in traits, then you have support for arbitrary element types. If they're 4 bytes, the elements will be stored by value, else by pointer.

Quote:

Additionally, as lock-free is to a great extent about efficiency, I think that hiding memory allocations from the user only makes sense if the full range of use cases are considered and optimized, as is the case with STL containers. But this is a good deal of work.

You have a point, but at some point someone is going to have to do the allocation. Are you as the container author not in a better position (the best position?) to do that efficiently? You may not be, this isn't a rhetorical question! You could also consider an allocator template parameter.

Quote:

Quote:
Why is the size a template parameter, rather than a constructor argument? Is this a constraint imposed by the implementation?

Because then I'd have to disable the default constructor, which is a problem if I want to do something like queues = new AtomicQueue<MyType *>[num];

Well I'd scrap the default constructor. Have a single constructor that takes the number of elements. It's more flexible and if you really, really need it, you still have the option to wrap it in a class that gets the size through a template parameter and feeds it to the constructor in its own default constructor. With your design, I cannot create a queue with a size that is decided at runtime.

Also, if you used a std::vector<AtomicQueue<T> >, rather than manually new[]-ing up your array you wouldn't have your proposed problem and you'd get a lot of extra benefits, too.

Quote:

Quote:
To get around your static_cast/reinterpret_cast problem, you could employ an is_convertible<> meta-function to choose the correct kind of cast at compile time. However, I would actually find a C-style cast with a comment more readable. I too much prefer the C++ style casts, but this is some gnarly-ass code (that's not an insult -- it simply has to be!) and throwing a C-style cast in wouldn't bother me if you documented the reason.

I haven't really done template metaprogramming (just used existing code before), so I don't know how to do that...


Something like:


template<bool convertible>
struct shut_the_compiler_up_cast_impl
{
template<typename To, typename From>
static To do_it(const From &x) { return reinterpret_cast<To>(x); }
};

template<>
struct shut_the_compiler_up_cast_impl<true>
{
template<typename To, typename From>
static To do_it(const From &x) { return static_cast<To>(x); } // or maybe just "return x;"?
};

template<typename To, typename From>
To shut_the_compiler_up_cast(From x)
{
const bool conv = boost::is_convertible<From, To>::value;
return shut_the_compiler_up_cast_impl<conv>::do_it<To>(x);
}



Now instead of reinterpret_cast or static_cast, use shut_the_compiler_up_cast.

Again, you could write the is_convertible<> meta function yourself if you didn't want to rely on boost. It's maybe 15 lines but it might make you go cross-eyed if you haven't seen the trick before :)

As I said, it's pretty messy and I think a C-cast with a comment would be better.

Quote:

Quote:
I assume the copy constructor and assignment operator should be explicitly disabled?

They don't really make sense in a concurrent setting.


Exactly. But I can still attempt to copy an AtomicQueue via the compiler generated copy constructor or copy assignment operator. Do this:


template<typename T>
class AtomicQueue
{
private:
AtomicQueue(const AtomicQueue &);
AtomicQueue &operator= (const AtomicQueue &);
};



Now I will get a compiler error if I even attempt to copy an AtomicQueue. You don't even have to define the bodies of those functions because they're impossible to call (unless you do so accidentally inside a member function!). Alternatively, you could inherit boost::noncopyable, which basically adds the above to your class for you.

Quote:

Quote:
Are your references available online somewhere? Links to them in the source would be nice, also.

The problem is a dynamic Web (links do die) and journal access for some of the papers. With Google, I only found the two verification papers with full text available on the public Web. Email me if you want full text of any of the others.


Ok. If you get around to distributing it outside the forum, it might be nice to bundle the papers with it, license restrictions permitting.

Quote:

Quote:
So in summary, this could be kick-ass, but as it stands I can't have any confidence in it. Your code needs a means to gain my trust by documenting your assumptions and reference material, and by providing a comprehensive set of tests, etc...

I have tested this by inserting various combinations of delays at many different locations (to increase the chance that an operation will get interrupted), and have run a few tests for an hour at a time, checking head/tail consistency with pushes and pops carried out, as well as data integrity.


But only on your system(s). I have an AMD X2 processor on this machine. I *need* to run the tests on here before I can trust the code. You've gotta supply the tests. This is hard code to write and I applaud you for having a go at it. But since many people won't even be able to understand it or verify it by reading it, there has to be some other kind of reassurance. You may already be planning to include the tests, I just haven't heard you say it yet :)

Share this post


Link to post
Share on other sites
Quote:
Original post by the_edd
I realise a forum isn't the best place to post a test suite, but if you get around to bundling up a zip or creating a public repository, I would strongly encourage you to make the tests available.

Well, this is only a prototype for now.

Quote:
As Gage64 said, I was thinking along the lines of BOOST_STATIC_ASSERT. If you don't want to rely on Boost, it's only 5 lines of code to implement it yourself. The point is that the code won't even compile if these assertions fail:

The simplest I found is this:
template<bool> struct CompileError;
template<> struct CompileError<true>
{
};
#define STATIC_ASSERT(X) (CompileError<(X)>())

Quote:
If you perform all your manipulations through the static functions in traits, then you have support for arbitrary element types. If they're 4 bytes, the elements will be stored by value, else by pointer.

Thanks, I will try to add this when I have the time to mess around with this queue again in a few days.

Quote:
You have a point, but at some point someone is going to have to do the allocation. Are you as the container author not in a better position (the best position?) to do that efficiently? You may not be, this isn't a rhetorical question! You could also consider an allocator template parameter.

It has to do with the primary use of this queue in my code. I have buffers where the skinning and simulation threads (generally OpenMP) dump updated vertex data, then push pointers to the right buffer into the queue, to be collected by the OpenGL render thread. The buffers are preallocated based on the predetermined sizes of meshes, and so I never considered allocation by the queue itself. In the way I see the semantics of this, this queue is meant as simply a conveyor between threads.

Quote:
Well I'd scrap the default constructor. Have a single constructor that takes the number of elements. It's more flexible and if you really, really need it, you still have the option to wrap it in a class that gets the size through a template parameter and feeds it to the constructor in its own default constructor. With your design, I cannot create a queue with a size that is decided at runtime.

Also, if you used a std::vector<AtomicQueue<T> >, rather than manually new[]-ing up your array you wouldn't have your proposed problem and you'd get a lot of extra benefits, too.

OK, I agree with this and will change it.

Quote:
As I said, it's pretty messy and I think a C-cast with a comment would be better.

Yes, it seems that way. I'll go with a C-cast.

Quote:
Now I will get a compiler error if I even attempt to copy an AtomicQueue.

Yes, I use this technique, I simply hadn't considered the issue of copyability until this discussion, which is why I had not included this yet.

Quote:
But only on your system(s). I have an AMD X2 processor on this machine. I *need* to run the tests on here before I can trust the code. You've gotta supply the tests. This is hard code to write and I applaud you for having a go at it. But since many people won't even be able to understand it or verify it by reading it, there has to be some other kind of reassurance. You may already be planning to include the tests, I just haven't heard you say it yet :)

I think I should do that once I have a more finalized version of the queue rather than just this prototype, since some functionality may change.

I think the only aspect that is not too clear in the code is that the head/tail gets incremented even if the DCAS inserting/removing an item fails. Without this, if thread A gets preempted after its DCAS succeeds but it hasn't incremented the position yet, causing thread's B DCAS to fail, B cannot continue until A resumes and completes that increment. So the idea is that B increments the lagging position on A's behalf.

I mentioned in a previous post that there's an alternative way to fix the Shann algorithm, so I'm posting that version of the code here. Notice that now the array is not read an extra time to verify the null. The assumption is that the first read of the array into a doesn't need to happen until after the full/empty check, since there shouldn't be a need to check in the DCAS that the array location is consistent from before that full/empty check--it only needs to make sure it's consistent from before the check whether head/tail is still consistent. I have tested this and it seems to work as well as the first version, but of course there's no formal verification for this version so for now put more stock in the original; the performance improvement is minor in any case. I'm waiting response from both sets of authors as to the reason they chose different approaches. One more important note about this code: though I used the different fix, I still retained the modification from the verification paper that has an additional check for the possibility of a retry in case of full/empty. You can see that this doesn't interact with the other issue because the control flow for that is either a continue or a return.

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(tail), h(head);
//_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86 _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
if (t == head + size)
{
if (buff[h % size] & 0xFFFFFFFFull && h == head) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
__declspec(align(8)) unsigned long long const a(buff[t % size]);
if (t != tail) continue;
// Item is stored in lower two bytes since the interlocked intrinsics ignore the MSB
if (a & 0xFFFFFFFFull) _InterlockedCompareExchange(&tail, t + 1, t);
else if (_InterlockedCompareExchange64(buff + t % size, static_cast<unsigned long long>(*reinterpret_cast<unsigned long const *>(&item))
| ((a & 0xFFFFFFFF00000000ull) + 0x100000000ull), a) == static_cast<long long>(a))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}

template<class T, unsigned long size>
inline bool AtomicQueue<T, size>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(head), t(tail);
//_ReadBarrier(); // TODO: Test any time environment changed: on _MSC_VER/_M_IX86
// _WriteBarrier() and _mm_lfence() also seem sufficient, but _mm_sfence() is not
if (h == tail)
{
if (!(buff[t % size] & 0xFFFFFFFFull) && t == tail) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
__declspec(align(8)) unsigned long long const a(buff[h % size]);
if (h != head) continue;
if (!(a & 0xFFFFFFFFull)) _InterlockedCompareExchange(&head, h + 1, h);
else if (_InterlockedCompareExchange64(buff + h % size, (a & 0xFFFFFFFF00000000ull) + 0x100000000ull, a) == static_cast<long long>(a))
{
_InterlockedCompareExchange(&head, h + 1, h);
unsigned long dummy(a & 0xFFFFFFFFull);
item = *reinterpret_cast<T *>(&dummy);
return true;
}
}
}


I'd like to ask, should I expect a perofmance improvement if I, instead of using bitwise-and to get the lower 32 bits, use a union of a long long with a struct of two longs? I guess the question reduces to whether the & is faster than a struct member access.

[Edited by - Prune on November 17, 2008 2:59:33 PM]

Share this post


Link to post
Share on other sites
Quote:
I'd like to ask, should I expect a perofmance improvement if I, instead of using bitwise-and to get the lower 32 bits, use a union of a long long with a struct of two longs? I guess the question reduces to whether the & is faster than a struct member access.


I'd use the bitwise and. The union will have endianness issues. I'd also expect most compilers to be able to do the right thing there. Check the disassembly in release mode to be certain.

You may also want to use the standard sized types. std::int32_t, etc.

Share this post


Link to post
Share on other sites
Surely there's some way to choose the struct member order based on detected endianness of the system.

With the union, I ended up with cleaner code and the casting issue is gone.

As for int32_t, MSVC at least is not a C-99 compiler so there's no stdint.h; I also want to stay independent of Boost.

Here's a version with some of the updates suggested this far, including removal of volatile (well, it's still there in Acquire(), but no assumptions are made other than the standard C++ definition).

[Edit:] processor.h updated to compile without warnings on Intel Compiler for Windows.
#if !defined PROCESSOR_H
#define PROCESSOR_H

// Note: MSVC may require /Oi compiler option

// CPU load/store reordering barriers and fast yield
// TODO: For other than _MSC_VER and __ICL
#if defined _M_IA64 // TODO: Probably no IA64 support needed
extern "C" __mf
#pragma intrinsic(__mf)
#define MemoryBarrier __mf
extern "C" void __yield(void);
#pragma intrinsic(__yield)
#define YieldProcessor __yield
#else
#if !defined __ICL || __ICC || __ECC
extern "C" void _mm_mfence(void);
#pragma intrinsic(_mm_mfence)
extern "C" void _mm_lfence(void);
#pragma intrinsic(_mm_lfence)
extern "C" void _mm_sfence(void);
#pragma intrinsic(_mm_sfence)
extern "C" void _mm_pause(void);
#pragma intrinsic(_mm_pause)
#endif
#define YieldProcessor _mm_pause
#if defined _M_AMD64 // TODO: Check if this is OK if load barrier needed
extern "C" void __faststorefence(void);
#pragma intrinsic(__faststorefence)
#define MemoryBarrier __faststorefence
#elif defined _M_IX86
/*__forceinline void MemoryBarrier(void)
{
long Barrier;
__asm xchg Barrier, eax
}*/

#define MemoryBarrier _mm_mfence
#else
#error "Unsupported environment"
#endif
#endif

// Compiler load/store reordering barriers
// TODO: Check if these are implied by the CPU barriers; __ECC should use explicit ...with acquire/release instructions instead
#if defined __ICL || __ICC || __ECC
#define _ReadWriteBarrier __memory_barrier
#define _ReadBarrier __memory_barrier
#define _WriteBarrier __memory_barrier
#elif defined _MSC_VER
extern "C" void _ReadWriteBarrier(void);
#pragma intrinsic(_ReadWriteBarrier)
extern "C" void _ReadBarrier(void);
#pragma intrinsic(_ReadBarrier)
extern "C" void _WriteBarrier(void);
#pragma intrinsic(_WriteBarrier)
#elif defined __GNUC__ // TODO: Other options for read- or write-only barriers?
#define _ReadWriteBarrier() __asm__ __volatile__("" ::: "memory")
#define _ReadBarrier() __asm__ __volatile__("" ::: "memory")
#define _WriteBarrier() __asm__ __volatile__("" ::: "memory")
#else
#error "Unsupported environment"
#endif

template<typename T>
__forceinline T Acquire(T volatile const &p)
{
T t = p;
_ReadWriteBarrier();
return t;
}

template<typename S, typename T>
__forceinline void Release(volatile S &p, T const t)
{
_ReadWriteBarrier();
p = t;
}

// Interlocked intrinsics
// TODO: For other than _MSC_VER and __ICL; note that __GNUC__ __sync_bool_compare_and_swap does the comparison but _Interlocked... doesn't
#if !defined __ICL || __ICC || __ECC
extern "C" long _InterlockedCompareExchange(volatile unsigned long *, unsigned long, unsigned long);
#pragma intrinsic(_InterlockedCompareExchange)
extern "C" long long _InterlockedCompareExchange64(volatile unsigned long long *, unsigned long long, unsigned long long);
#pragma intrinsic(_InterlockedCompareExchange64)
#endif

// Aligned memory allocator and deallocator
#if !defined __ICL || __ICC || __ECC
#if defined _MSC_VER
#include <malloc.h>
#elif defined __GNUC__
#include <mm_malloc.h>
#else
#error "Unsupported environment"
#endif
#endif

template<bool> struct CompileError;
template<> struct CompileError<true>
{
};
#define STATIC_ASSERT(X) (CompileError<(X)>())

template<typename S, typename T>
union Pack
{
inline Pack(void);
inline Pack(Pack const &);
inline Pack(unsigned long long const &);
inline Pack(S const &, T const &);
unsigned long long pack;
struct // TODO: Check if the below order is only correct for little-endian architectures
{
S lower;
T upper;
};
};

template<typename S, typename T>
inline Pack<S, T>::Pack(void)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
}

template<typename S, typename T>
inline Pack<S, T>::Pack(Pack const &other)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
pack = other.pack;
}

template<typename S, typename T>
inline Pack<S, T>::Pack(unsigned long long const &op)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
pack = op;
}

template<typename S, typename T>
inline Pack<S, T>::Pack(S const &nl, T const &#957;)
{
STATIC_ASSERT(sizeof(S) == 4 && sizeof(T) == 4);
lower = nl;
upper = nu;
}

#endif

// Copyright (C) 2008 Borislav Trifonov
// Based on algorithm from "A Practical Nonblocking Queue Algorithm Using Compare-And-Swap"

// TODO: For 64-bit system, if cannot adapt to _InterlockedCompare64Exchange128() or _InterlockedCompareExchange128(), use instead algorithm from Fig.5 in "Non-Blocking Concurrent FIFO Queues with Single Word Synchronization Primitives"

#if !defined ATOMIC_QUEUE
#define ATOMIC_QUEUE

#include <exception>
#include "processor.h"

template<typename T>
class AtomicQueue
{
public:
class Exc : public std::exception
{
public:
inline Exc(const char []);
};
inline AtomicQueue(unsigned long);
inline ~AtomicQueue(void);
inline bool Push(T const); // Returns false if full
inline bool Pop(T &); // Returns false if emtpy
inline T operator[](unsigned long); // No error checking
private:
inline AtomicQueue(void);
AtomicQueue(AtomicQueue const &);
AtomicQueue &operator=(AtomicQueue const &);
__declspec(align(4)) unsigned long head, tail;
unsigned long size;
typedef Pack<T, unsigned long> AQPack;
AQPack *buff;
};

template<typename T>
inline AtomicQueue<T>::AtomicQueue(unsigned long sz) : head(0), tail(0), size(sz), buff(0)
{
STATIC_ASSERT(sizeof(T) == 4);
STATIC_ASSERT(sizeof(unsigned long) == 4);
STATIC_ASSERT(sizeof(unsigned long long) == 8);
buff = static_cast<AQPack *>(_mm_malloc(size * sizeof(AQPack), 8));
if (!buff) throw Exc("Not enough memory");
for (unsigned long i = 0; i < size; ++i) buff.pack = 0ull;
}

#include <iostream>
template<typename T>
inline AtomicQueue<T>::~AtomicQueue(void)
{
_mm_free(buff);
}

template<typename T>
inline bool AtomicQueue<T>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(Acquire(tail)), h(Acquire(head));
__declspec(align(8)) AQPack a(Acquire(buff[t % size].pack));
if (t != Acquire(tail)) continue;
if (t == Acquire(head) + size)
{
if (Acquire(buff[h % size].lower) && h == Acquire(head)) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
if (a.lower)
{
if (Acquire(buff[t % size].lower)) _InterlockedCompareExchange(&tail, t + 1, t);
}
else
{
__declspec(align(8)) AQPack b(item, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[t % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}
}

template<typename T>
inline bool AtomicQueue<T>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(Acquire(head)), t(Acquire(tail));
__declspec(align(8)) AQPack a(Acquire(buff[h % size].pack));
if (h != Acquire(head)) continue;
if (h == Acquire(tail))
{
if (!Acquire(buff[t % size].lower) && t == Acquire(tail)) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
if (!a.lower)
{
if (!Acquire(buff[h % size].lower)) _InterlockedCompareExchange(&head, h + 1, h);
}
else
{
__declspec(align(8)) AQPack b(0, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[h % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&head, h + 1, h);
item = a.lower;
return true;
}
}
}
}

template<typename T>
inline T AtomicQueue<T>::operator[](unsigned long i)
{
return buff[(head + i) % size].lower;
}

template<typename T>
inline AtomicQueue<T>::Exc::Exc(const char msg[]) : std::exception(msg)
{
}

#endif

Alternate version of push/pop:
template<typename T>
inline bool AtomicQueue<T>::Push(T const item)
{
while (true)
{
__declspec(align(4)) unsigned long const t(Acquire(tail)), h(Acquire(head));
if (t == h + size)
{
if (Acquire(buff[h % size].lower) && h == Acquire(head)) return false;
_InterlockedCompareExchange(&head, h + 1, h);
continue;
}
__declspec(align(8)) AQPack a(Acquire(buff[t % size].pack));
if (t != Acquire(tail)) continue;
if (a.lower) _InterlockedCompareExchange(&tail, t + 1, t);
else
{
__declspec(align(8)) AQPack b(item, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[t % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&tail, t + 1, t);
return true;
}
}
}
}

template<typename T>
inline bool AtomicQueue<T>::Pop(T &item)
{
while (true)
{
__declspec(align(4)) unsigned long const h(Acquire(head)), t(Acquire(tail));
if (h == t)
{
if (!Acquire(buff[t % size].lower) && t == Acquire(tail)) return false;
_InterlockedCompareExchange(&tail, t + 1, t);
continue;
}
__declspec(align(8)) AQPack a(Acquire(buff[h % size].pack));
if (h != Acquire(head)) continue;
if (!a.lower) _InterlockedCompareExchange(&head, h + 1, h);
else
{
__declspec(align(8)) AQPack b(0, a.upper + 1);
if (_InterlockedCompareExchange64(&(buff[h % size].pack), b.pack, a.pack) == static_cast<long long>(a.pack))
{
_InterlockedCompareExchange(&head, h + 1, h);
item = a.lower;
return true;
}
}
}
}


[Edited by - Prune on November 17, 2008 4:57:15 PM]

Share this post


Link to post
Share on other sites
Sign in to follow this  

  • Advertisement
×

Important Information

By using GameDev.net, you agree to our community Guidelines, Terms of Use, and Privacy Policy.

GameDev.net is your game development community. Create an account for your GameDev Portfolio and participate in the largest developer community in the games industry.

Sign me up!