lock-free FIFO/LIFO implementation for Enqueue/Dequeue concurrent access

Started by
12 comments, last by LordShade 16 years, 2 months ago
Hi all, I am including the code to my first attempt at a LIFO/FIFO lock-free queue. It is not up to my performance standards yet, however there is some bug somewhere which causes it to fail when pushing data sometimes :( Basically, right now all I am doing is having 3 threads concurrently try to push 3000 messages each in this FIFO/LIFO. the idea is the following: - Have a predetermined ammount of descriptors pre-allocated so memory allocation happens only at startup, in a thread-safe environment (avoiding lock-free allocation) - Each thread must use a descriptor to state what it wants to do, in order to update head and tail positions - Each descriptor holds a counter variable, that can only be atomically updated. Each thread can only get a descriptor if there are no threads interested at all in that descriptor, including the fact that one descriptor is always allocated for current head/tail/size information. - Memory access is done through CompleteWrite method which updates the descriptors, and if this method fails (which is doing some times) it means that I got (I don't know how) an ABA problem and some thread is trying to update a location which has already been used (which should be impossible with my design, but clearly I have a bug somewhere) - As long as the DESCRIPTOR POOL is equal or greater to Number of threads + 1, this should guarantee concurrent thread access for both Enqueueing and Dequeuing! (Or at least, is what I assume it would happen if it worked) - As each thread atomically selects a descriptor, then updates it until no one changed the current descriptor (m_pDescriptor) then atomically switches the global descriptor pointer with the pointer to its updated descriptor. This should guarantee no ABA issue as the reference counter in the descriptors prevents another thread from using it until no other threads are interested in it (like hazard pointers). - In order to compile, use GCC 4.2.1+ and add -march=i686 flag to it. - My ideas and design are based on the following papers which I include here as a basis for discussion: -> http://www.research.att.com/~bs/lock-free-vector.pdf -> http://www.research.ibm.com/people/m/michael/RC23089.pdf Here is the code, it is FREE for everyone to use and adapt as you wish, no licenses, no nothing, just please if you end up using it or fixing it or improving it please send it back to me:

#include <iostream>
#include <iomanip>
#include <stdexcept>
#include <pthread.h>

/** TO COMPILE:
 *  - Select gcc 4.2.1+
 *  #> g++ -g3 -march=i686 cas.C -lpthread -o cas
 */

namespace LockFree {
   
#define CAS(a,b,c) __sync_bool_compare_and_swap(a,b,c)

enum StackType
{
   FIFO, /// First In, First Out
   LIFO  /// Last In, First Out
};

class Descriptor
{
   public: ///< structors
      Descriptor() : size(0), head(0), tail(0), counter(0) {;}

   public: ///< methods
      void Reset()
      {
         size = 0;
         head = 0;
         tail = 0;
         wd.Reset();
      }

      void Dump()
      {
         std::cout << "this [" << this << "] : size [" << size << "] head [" << head << "] tail [" << tail << "] counter [" << counter << "]" << std::endl;
         wd.Dump();
      }

   public: ///< data
      std::size_t size;
      std::size_t head;
      std::size_t tail;
      std::size_t counter;
      class WriteDescriptor
      {
         public: ///< structors
            WriteDescriptor() : old_value(0), new_value(0), pos(0), pending(false) {;}

         public: ///< methods
            void Reset()
            {
               old_value = 0;
               new_value = 0;
               pos = 0;
               pending = false;
            }
            
            void Dump()
            {
               std::cout << "   |--WD--> OV [" << old_value << "] NV [" << new_value << "] pos [" << pos << "] pending [" << pending << "]" << std::endl;
            }

         public: ///< data
            void*  old_value;
            void*  new_value;
            std::size_t pos;
            bool   pending;
      } wd;
};

template <std::size_t POOL_SIZE>
class PointerStack
{ 
   public:
      PointerStack(StackType _stackType=FIFO, std::size_t _allocUnit=10)
      : m_pDescriptor(0),
        m_stackType(_stackType),
        m_pMemory(0),
        m_allocUnit(_allocUnit),
        m_totalUnits(0),
        m_dPoolSize(POOL_SIZE)
      {
         m_descriptorPool[0].counter++; ///< we fake that the first descriptor is in use so we can use it ;)
         m_pDescriptor = &(m_descriptorPool[0]);
         //DumpDescriptorPool();
      }

      ~PointerStack()
      {
         delete m_pMemory;
         m_pMemory = 0;
         m_pDescriptor = 0;
      }

      bool Push(void* _pVoid)
      {
         CompleteWrite(m_pDescriptor);

         if ( Length() == m_totalUnits )
         {
            std::cout << "Memory Overflow : [" << Length() << "]" << std::endl;
            return false;
         }

         Descriptor* next(0);
         do
         {
            ///< Finding an empty descriptor we could use
            for (std::size_t idx=0; idx<m_dPoolSize; ++idx)
            {
               if (CAS(&m_descriptorPool[idx].counter, 0, 1))
               { ///< we successfully got a descriptor for ourselves
                  next = &m_descriptorPool[idx];
                  next->Reset();
                  (next->wd).new_value = _pVoid;
                  (next->wd).pending = true;
                  break;
               }
            }
         } while (!next);

         Descriptor* current(0);
         std::size_t counter(0);
         while(1)
         { ///< loop until we can add this descriptor to the stack
            if (current)
            { ///< we will get a new current, must decrease our interest on previous current
               do { counter = current->counter; } while (!CAS(&(current->counter), counter, counter-1));
            }

            current = m_pDescriptor; ///< safe to point to m_pDescriptor as it won't be reused until we remove our interest on it

            do { counter = current->counter; } while (!CAS(&(current->counter), counter, counter+1));

            if ( (current->head == current->tail) && Length() >= m_totalUnits )
            { ///< there is no space available to write
               next->Reset();
               CAS(&(next->counter), 1, 0);
               std::cout << "Memory Overflow : [" << Length() << "]" << std::endl;
               return false;
            }

            /// Preparing next with the current values
            (next->wd).old_value = At(current->head);
            (next->wd).pos = current->head;
            next->size = current->size+1;
            next->head = current->head+1;
            next->tail = current->tail;

            if ( next->head > m_totalUnits )
            { ///< wrap around so we can write more from the beginning
               next->head = 0;
               (next->wd).old_value = At(next->head);
               (next->wd).pos = next->head;
               next->head++;
            }

            if ( CAS(&m_pDescriptor, current, next) ) break;
         }
         CompleteWrite(next);
         do { counter = current->counter; } while (!CAS(&(current->counter), counter, counter-2) );
         return true;
      };

      void* Pop(void)
      {
         Descriptor* current(0);
         Descriptor* next(0);
         void* ptr(0);

         do
         {
            ///< Finding an empty descriptor we could use
            for (std::size_t idx=0; idx<m_dPoolSize; ++idx)
            {
               if (CAS(&m_descriptorPool[idx].counter, 0, 1))
               { ///< we successfully got a descriptor for ourselves
                  next = &m_descriptorPool[idx];
                  next->Reset();
                  break;
               }
            }
         } while (!next);

         std::size_t counter(0);
         do
         { ///< loop until we can update the stack with the removed item
            CompleteWrite(m_pDescriptor);

            if (current)
            { ///< we will get a new current, must decrease our interest on previous current
               counter = current->counter;
               while (!CAS(&(current->counter), counter, counter-1)) { counter = current->counter; }
            }
            counter = m_pDescriptor->counter;
            while (!CAS(&(m_pDescriptor->counter), counter, counter+1)) { counter = m_pDescriptor->counter; } ///< now we are interested in this current, increase our interesse on it
            current = m_pDescriptor; ///< safe to point to m_pDescriptor as it won't be reused until we remove our interest on it

            if ( 0 == current->size )
            {
               next->Reset();
               std::cout << "Pop : The queue is empty." << std::endl;
               return 0;
            }

            ///< Setting up next values based on current values
            next->size = current->size-1;
            next->head = current->head;
            next->tail = current->tail;

            switch(m_stackType) 
            {
            case FIFO:
               ptr = At( next->tail );
               next->tail++;
               if ( next->tail >= m_totalUnits )
               {
                  next->tail = 0;
               }
               break;
            case LIFO:
               next->head--;
               ptr = At(next->head);
               break;
            }
         } while (!CAS(&m_pDescriptor, current, next));
         counter = current->counter;
         CAS(&(current->counter), counter, counter-2);
         CompleteWrite(next); ///< attempt to write next (which is the new current) If it fails, don't worry, let the next thread worry.
         return ptr;
      };

      std::size_t Length()
      {
         if ( (m_pDescriptor->wd).pending )
         {
            if ( m_pDescriptor->size > 0 )
               return (m_pDescriptor->size - 1);
            else
               return 0;
         }
         return m_pDescriptor->size;
      };

      void* At(std::size_t idx)
      {
         if ( idx > m_totalUnits )
         { ///< an invalid index was given!
            return 0;
         }
         return m_pMemory[idx];
      }

      bool CompleteWrite(Descriptor* d)
      {
         if ( !(d->wd).pending ) return true;
         if ( CAS(&((d->wd).pending), true, false) )
         {
            if ( !CAS(&(m_pMemory[(d->wd).pos]), (d->wd).old_value, (d->wd).new_value) )
            {
               std::cout << "T[" << pthread_self() << "] Could not update Pos [" << (d->wd).pos << "] OV[" << m_pMemory[(d->wd).pos] << "] Expected OV[" << (d->wd).old_value << "] NV[" << (d->wd).new_value << "]" << std::endl;
               std::cout.flush();
               return false;
            }
         }
         return true;
      }

      /** Reserve can only be called once to create the memory.
       *  If we need to grow, the upper-layer would create a new stack of size(X+Y)
       *  and then all push would go to the new stack, all pop from the previous stack, until
       *  the previous stack is empty. Once the previous stack is empty, pop/unpop operations would
       *  use the new stack, and the previous stack would be deleted (or not).
       */
      void Reserve(std::size_t _numUnits)
      {
         m_totalUnits = _numUnits * m_allocUnit;
         m_pMemory = new void*[m_totalUnits];
         memset(m_pMemory, 0, m_totalUnits*sizeof(void*));
      }

      void DumpDescriptorPool()
      {
         std::cout << "Current [" << m_pDescriptor << "]" << std::endl;
         for (std::size_t idx=0; idx<m_dPoolSize; ++idx)
         {
            m_descriptorPool[idx].Dump();
         }
      }

      void DumpMemory(std::size_t begin=0)
      {
         using std::setfill;
         using std::setw;
         using std::endl;
   
         if ( !m_pMemory)
            return;
         
         Descriptor *current = m_pDescriptor;
         int size = m_totalUnits + 1;
         if ( size > 30 )
            size = 30;
   
         // new line
         std::cout << endl << endl << "PointerStack:[" << size-1 << "]," << " Available to Read:[" << Length() << "]," << " Tail:["
                  << current->tail << "]," << " Head:[" << current->head << "]" << endl << endl;
   
         //////////////////////////////////////////////////////////////////////
         // ptrs
         // TAIL
         for (int i=begin; i<begin+size; ++i)
         {
            if (current->tail == i)
            {
               std::cout << "|" << setw(5) << setfill(' ') << "T" << "|";
               break;
            }
            else
               std::cout << " " << setw(5) << setfill(' ') << " ";
         }
         std::cout << endl;
         // HEAD
         for (int i=begin; i<begin+size; ++i)
         {
            if (current->head == i)
            {
               std::cout << "|" << setw(5) << setfill(' ') << "H" << "|";
               break;
            }
            else
               std::cout << " " << setw(5) << setfill(' ') << " ";
         }
         std::cout << endl;
         //////////////////////////////////////////////////////////////////////
         // header
         for (int i=begin; i<begin+size; ++i)
            std::cout << "|" << setw(5) << setfill('0') << i;
         std::cout << "|" << endl;
         // line delimiter
         for (int i=0; i<size; ++i)
            std::cout << "+" << setw(5) << setfill('-') << "-";
         std::cout << "+" << endl;
   
         //////////////////////////////////////////////////////////////////////
         // body
         for (int i=begin; i<begin+size-1; ++i)
         {
            if (m_pMemory)
               std::cout << "|" << setw(5) << setfill('0') << *(int*)m_pMemory ;
            else
               std::cout << "|" << setw(5) << setfill('0') << 0;
         }
         std::cout << "|" << endl;
         // line delimiter
         for (int i=begin; i<begin+size; ++i)
            std::cout << "+" << setw(5) << setfill('-') << "-";
         std::cout << "+" << endl << endl;
      }

   protected:
      Descriptor*  m_pDescriptor;                ///< use for lock-free algorithm (insures atomic operations on the stack)
      StackType    m_stackType;                  ///< the type of this stack (LIFO or FIFO)

      void**       m_pMemory;                    ///< internal memory buffer pointer
      std::size_t  m_allocUnit;                  ///< size of a memory unit
      std::size_t  m_totalUnits;                 ///< how many units we have available
      const std::size_t m_dPoolSize;             ///< size of our descriptor pool
      Descriptor   m_descriptorPool[POOL_SIZE];  ///< our pool of descriptors
};
} ///< namespace LockFree

LockFree::PointerStack<10> lockfreeStack(LockFree::FIFO,10);

extern "C"
{ 
   std::size_t count(0);

   void* Dequeue(void* data)
   {
      struct timeval tval_ini ;
      struct timeval tval_end ;
      struct timezone tzp ;

      const std::size_t numMsgs(*reinterpret_cast<std::size_t*>(data));
      pthread_t tid = pthread_self();

      std::size_t msgsPopped(1);

      gettimeofday (&tval_ini, &tzp) ;

      while (msgsPopped < numMsgs)
      {
         while (lockfreeStack.Length() == 0 )
         {
            sleep(1);
         }
         while(lockfreeStack.Length() > 0)
         {
            std::size_t* value = reinterpret_cast<std::size_t*>(lockfreeStack.Pop());
            if (value)
            {
               //std::cout << "[" << tid << "] : Popped [" << msgsPopped << "] = [" << *value << "]" << std::endl;
               //std::cout.flush();
               ++msgsPopped;
            }
            else
            {
               std::cout << "[" << tid << "] : Could not pop msg idx [" << msgsPopped << "]" << std::endl;
               std::cout.flush();
            }
         }
      }
      gettimeofday (&tval_end, &tzp) ;
      double delta = ((double)tval_end.tv_sec * 1000000 + tval_end.tv_usec) - ((double)tval_ini.tv_sec * 1000000 + tval_ini.tv_usec);
      std::cout << "Delta [" << delta/(msgsPopped-1) << "] (Msgs [" << (msgsPopped-1) << "])" << std::endl ;
      return 0;
   }

   void* Enqueue(void* data)
   {
      const std::size_t numMsgs(*reinterpret_cast<std::size_t*>(data));
      pthread_t tid = pthread_self();

      std::size_t* array = new std::size_t[numMsgs];
      for ( std::size_t i=1; i<=numMsgs; ++i )
      {
         std::size_t oldcount = count;
         while ( !CAS(&count, oldcount, oldcount+1) ){ oldcount = count; }
         array[i-1] = oldcount+1;
      }

      for (std::size_t i=0; i<numMsgs; ++i)
      {
         //std::cout << "[" << tid << "] : Pushing [" << array << "]" << std::endl;
         //std::cout.flush();
         if ( !lockfreeStack.Push(&array) )
            std::cout << "TID [" << tid << "] Pushing [" << array << "] failed." << std::endl;
      }

      //delete [] array;
      return 0;
   }
}

int main(int argc, char *argv[])
{
   int rc(0);
   const std::size_t numEnqueuerThreads(3);
   lockfreeStack.Reserve(20000);
   std::size_t numMessagesPushed(1000);
   std::size_t numMessagesPopped(numMessagesPushed*numEnqueuerThreads);
   pthread_t enqueuerThreads[numEnqueuerThreads];
   pthread_t dequeuerThread;
   
   struct timeval tval_ini ;
   struct timeval tval_end ;
   struct timezone tzp ;

   gettimeofday (&tval_ini, &tzp);
   for (std::size_t i=0; i<numEnqueuerThreads; ++i)
   {
      /** Creating the enqueuer thread now */
      int rc = pthread_create(&enqueuerThreads, 0, Enqueue, &numMessagesPushed);
      if (rc)
      {
         std::cout << "ERROR; return code from pthread_create() is [" << rc << "] = [" << strerror(rc) << "]" << std::endl;
         exit(-1);
      }
   }
   void* thread_result;
   for (std::size_t i=0; i<numEnqueuerThreads; ++i)
   {
      pthread_join(enqueuerThreads, &thread_result);
   }
   gettimeofday (&tval_end, &tzp);
   std::size_t length(0);
   length = lockfreeStack.Length();
   double delta = ((double)tval_end.tv_sec * 1000000 + tval_end.tv_usec) - ((double)tval_ini.tv_sec * 1000000 + tval_ini.tv_usec);
   std::cout << "Global Delta [" << delta/length << "] (Msgs [" << length << "])" << std::endl ;
   for ( std::size_t i=1; i<=length; ++i)
   {
      std::size_t* value = reinterpret_cast<std::size_t*>(lockfreeStack.Pop());
      /*
      if (value)
         std::cout << "Popped [" << i << "] = [" << *value << "]" << std::endl;
      */
      if (!value)
         std::cout << "ERROR: Popped [" << i <<"] = [0]" << std::endl; 
   }
   return 0;
}


[Edited by - fcavalcanti on January 30, 2008 8:58:49 AM]
Advertisement
I'm not going to read all that code right now, but have you looked into assembly instructions CMPXCHG8B and CMPXCHG16B? This gives you a mechanism for a commit atom. See Alex Iconescu's discussion. If you're using windows you might not even need to write this, winapi should have what you're looking for.
Firstly, you should really wrap all that in some SOURCE tags. Secondly, if you have them hosted as a zip file somewhere, I'm sure some people would appreciate the link.

Lock-free containers are something I've been thinking about lately, so I'll take a look when I get home from the office.

throw table_exception("(? ???)? ? ???");

Ok, I added the SOURCE tags, I apologize for not doing that before.

I cannot host it in a .zip file anywhere at this moment... sorry.

I am using Linux and GCC 4.2.1 to compile and test. My linux is 2.6.9-55.ELsmp x86_64

I know I am close to getting this thing to work properly... the problem is that I am all out of ideas. The design seems correct but I know there is some little mistake somewhere that doesn't prevent ABA problem... I will continue investigating whenever I have a chance and if I ever get it right I will post the final code here.

Thanks for the suggestion to disassemble... I will try that as well, it might shed some light on the issue.
There's a troubling lack of volatile keyword in your implementation.

I didn't go through the code in enough detail to know how you handled the problem, but there's also no mention of barriers or other memory synchronization primitives.

Passing size_t to CAS primitives is also somewhat pointless. The types stored will always be very platform specific, and while they likely will match the size_t size, that's not a guarantee.
Hummm... maybe volatile would make a difference (I don't think the compiler would be optimizing any of my critical path though) but the whole design is based on the following:

1 - Have a pre-allocated ammount of descriptors (at least N+1 where N is the total number of concurrent threads) that define where the head and tail are in the queue for both insertion and removal from the queue.

2 - Each thread can show interest on the main descriptor (which is "shared" by all) and update it with a new descriptor (which atomically belongs to it) through a CAS instruction, which guarantees that the main descriptor points to any of the pre-allocated descriptors atomically.

3 - The previous main descriptor can only be re-used when its reference count is zero, which means that if at any moment in time a thread shows interest in updating the main descriptor, and it gets changed underneath it, other threads would not be able to use it unless the thread in question updates the reference counting, therefore effectively (or so I hope) taking care of ABA issues. Example:

MD -> points to D1 -> Head: 1, Tail: 0
T1 -> Atomically increases MD's reference counting and only if successfull, read it. Retrieves the next empty descriptor available, D2, atomically, but then gets interrupted by thread 2.
T2 -> Atomically increases MD's reference counting and only if successfull, read it. Retrieves the next empty descriptor available, D3, atomically. Goes ahead and updates the memory with the new data, switching MD to point to D3 -> Head: 2, Tail: 0, and writes the data in the memory. Releases its interest in D1 (previous MD) by 2 counts, but D1 still has one count from T1.
T3 -> Atomically increases MD's reference counting and only if successfull, read it. Retrieves the next empty descriptor available, D4, atomically. Goes ahead and updates the memory with the new data, switching MD to point to D4 -> Head: 3, Tail: 0, and writes the data in the memory. Releases its interest in D3 (previous MD) by 2 counts, but D1 still has one count from T1 and D3 now is available to be reused by any other threads.
T1 -> Gets back to control, atomically realizes that MD points to D4 now instead of D1, so it releases its interest in D1 (thus making it available now for reuse) while showing interest in D4 (the current descriptor).

The memory were the Push/Pop takes place is also previously allocated, so we don't have to worry about concurrent memory allocation/deallocation (this queue does not 'grow').

The whole point is that this is a queue of void* pointers to data pre-allocated somewhere else. This is a Pointer queue which allows one to pass data from Thread 1 to Thread 2 via a pointer.
Quote:I don't think the compiler would be optimizing any of my critical path though


Have you tried marking shared members as volatile?

Have you verified the generated assembly to ensure that compiler didn't re-arrange the instructions?

Can you prove that CPU per-core caches and pipelines won't stomp on each other?


Lock-less and lock-free algorithms are a nightmare to debug. Don't make any assumption about anything, unless you have written document stating you can do it. In that case, test it only rigorously. Assumptions are the worst thing you can make here.
Quote:Lock-less and lock-free algorithms are a nightmare to debug. Don't make any assumption about anything, unless you have written document stating you can do it.


Hummm... I guess I am assuming too much. I mean, I hoped the CAS instruction would be truly atomic, and as the whole thing depends on that then if it is not, I would be screwed.

However, I did test it a lot first (the CAS instruction) and there are a lot of papers out there that use it and I think at this point I can safely assume that it is atomic.

I was more curious to find out if my premise (my Push routine lets say) is truly correct or if I have a design flaw on it (assuming my CAS call is truly atomic).
Quote:Original post by fcavalcanti
Quote:Lock-less and lock-free algorithms are a nightmare to debug. Don't make any assumption about anything, unless you have written document stating you can do it.


Hummm... I guess I am assuming too much. I mean, I hoped the CAS instruction would be truly atomic, and as the whole thing depends on that then if it is not, I would be screwed.


CAS is atomic, that's the only reason it exists. But the rest of your code isn't. So the logic surrounding the CAS statements may not be consistent across processors.

The lock-free code is not linear. Two threads grab the same entry. Somewhere half-way through, load the variables into registers. Then, one thread gets ahead, and pushes and pops thousands of elements. When the other thread gets a chance to run, values it stored in registers are no longer correct.

This is why you mark shared variables as volatile. While the code you wrote is sequential and correct, the generated assembly might not be. Even the most basic compiler will re-arrange the local and shared variables used, possibly store them in registers only.
So you are saying that the m_pDescriptor (which is shared) should be volatile, even though it only change within a CAS instruction?

I will go over the whole code, and I will make all shared variables volatile... hopefully that would do the trick then! :)

It is my first attempt to a lock-free algorithm, so I thought that by using CAS I would not need to "volatile" my shared stuff, but I guess you are correct, I should have done that.

Thanks,

P.S. I will post soon the results :O

This topic is closed to new replies.

Advertisement