A producer/consumer example using C++0x threads

Started by
3 comments, last by Sharlin 14 years, 11 months ago
I've been toying with GCC 4.4 and its experimental C++0x mode and the new threading library in particular. I ended up with a simple example of the producer/consumer pattern and decided to share it here. Comments and critique welcome :) There are a couple of places that could be made less UB prone with a little refactoring; that is left as an exercise to the reader. EDIT: Also available here.
//
// A simple implementation of the well-known Producer/Consumer pattern
// demonstrating the C++0x thread library and certain other new features.
//
// Written by Johannes Dahlström
// This file is in the public domain.
//

#include <thread>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <iostream>
#include <cstdlib>
#include <functional>

using namespace std;


// Represents a pool of data with thread-safe producer/consumer semantics
class work_pool {
 public:
  typedef unique_lock<std::mutex> lock_type;

  work_pool(int max_size) : max_size(max_size) { 
  }

  // Acquires a lock on the pool and returns it
  // A thread MUST hold a lock on the pool 
  // before calling ANY other member functions. 
  lock_type lock() {
    return lock_type(mutex);
  }

  // Pushes a datum to the work pool
  void produce(int i) {
    queue.push(i);
  }

  // Pops a datum from the work pool
  int consume() {
    int res = queue.front();
    queue.pop();
    return res;
  }

  // Returns whether the pool is full
  bool is_full() {
    return queue.size() >= max_size;
  }

  // Returns whether the pool has data
  bool has_data() {
    return !queue.empty();
  }

  // Returns the number of items in the pool
  int size() {
    return queue.size();
  }

  // Atomically releases lock, blocks until there
  // is new data in the pool, then reacquires the lock
  // lock MUST hold the same mutex as the one returned by lock()
  void wait(lock_type& lock) {
    // Wait until being notified AND there is data in the pool
    cvar.wait(lock, bind(&work_pool::has_data, this));
  }

  // Notifies the consumer threads waiting on this pool
  void notify() {
    cvar.notify_all();
  }

 private:
  int max_size;
  std::queue<int> queue;
  std::condition_variable cvar;
  std::mutex mutex;
};


// Produces data into the given work pool.
void produce(int id, work_pool& pool) {
  static int i = 0;
  while(true) {
    // Simulate some heavy work being done
    this_thread::sleep_for(chrono::milliseconds(rand() % 3000));

    // Acquire lock on the work pool
    work_pool::lock_type lock(pool.lock());

    // Push data to pool if not full
    if(!pool.is_full()) {
      pool.produce(i++);
      cerr << " producer " << id << ": " << i << " size=" << pool.size() << '\n';
    }

    // Notify consumers
    pool.notify();
  }
}

// Consumes data from the given work pool.
void consume(int id, work_pool& pool) {

  // Acquire lock on the work pool
  work_pool::lock_type lock(pool.lock());
  
  while(1) {

    // Release lock, wait for new data
    pool.wait(lock);
    // wait reacquires the lock before returning

    // Consume data from pool
    int i = pool.consume();
    cerr << " consumer " << id << ": " << i << " size=" << pool.size() << "\n\n";
    
    // rand is not thread-safe so call it before releasing the lock
    int delay = rand() % 3000;
    
    // Release lock on pool, simulate work being done
    lock.unlock();
    this_thread::sleep_for(chrono::milliseconds(delay));
    lock.lock();
    
  }
}


int main() {

  srand(time(0));

  work_pool pool(10);

  // Spawn producer and consumer threads
  vector<thread> producers, consumers;
  for(int i = 0; i < 4; ++i) {
    // emplace_back constructs the object in-place using perfect forwarding
    // equivalent to push_back(thread(...)) but avoids a potential copy/move
    consumers.emplace_back(consume, i, ref(pool));
    producers.emplace_back(produce, i, ref(pool));
  }

  while(true);
}


Advertisement
One suggestion that comes to mind is using the RAII idiom to acquire and release locks, rather than expecting explicit function calls to do so; this makes your code much cleaner, safer (less chance of forgetting to lock/unlock), and more robust in the face of exceptions or other unusual control flow.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Move lock acquisition into member functions, and use it as RAII. There is no benefit to requiring user to explicitly claim and release locks, which protect internal state of the structure alone. Otherwise, the class serves no real purpose, and I would be better off just manually synchronizing on std::deque - it would be considerably less code.


Quote:if(!pool.is_full()) {
pool.produce(i++);
cerr << " producer " << id << ": " << i << " size=" << pool.size() << '\n';
}


Have produce return bool - if queue is full, return false. Having something being output to console isn't exactly useful for real implementation.


Optionally, provide timeout argument to each operation (may have default of 100ms or something), to allow user to specifiy how long to wait. Blocking indefinitely with no option to override can limit usefulness of such a class in some cases.

And while this is producer/consumer, the structure itself is a queue. Consider using standard C++ terminology of push and pop or similar.
It is unusual practice to have the users of a data structure hold locks on the data structure's internals. It is generally better to have the data structure acquire and release its own locks inside each member function, though you do have to make sure that the lock is held across the entire operation, which may require providing member functions for larger-grained operations.

I wrote an article on a Presentation on Designing Multithreaded Applications in C++0x at ACCU 2009. The sample code from that presentation is also available from my website.
--Author of C++ Concurrency in Action | http://www.manning.com/williamsjust::thread C++0x thread library | http://www.stdthread.co.ukJust Software Solutions Ltd | http://www.justsoftwaresolutions.co.uk
Thanks for the comments!

std::unique_lock already has RAII (with move) semantics (for example, notice that the producer doesn't manually release the lock), so the code should be exception safe as is. The manual locking/unlocking in the consumer was mostly to demonstrate that it's possible if such flexibility is required.

But as you all pointed out, freeing the user from the burden of explicitly holding the lock is a logical next step in improving the abstraction. I didn't have any better things to do, so I wrote an improved version, available here :)

This topic is closed to new replies.

Advertisement