Sign in to follow this  

[C++] Problem with Thread Pool implementation

This topic is 3040 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

Hi all, I'm working on a basic C++ thread pool implementation, based on the boost threading library. The idea is that several 'worker' threads share a list of work items which are executed in the order they were added. Here is my class so far:
class ThreadPool
		{
			// Typedefs
			typedef function<void(void)> ThreadStart;

			// Fields
		private:
			list<thread*> threads;
			queue<ThreadStart> workItems;
			mutex workItemsMutex;
			mutex signalMutex;

			// Constructors / Destructor
		public:
			ThreadPool(int numWorkers)
			{
				signalMutex.lock();
				for (int i = 0; i < numWorkers; ++i)
				{
					thread* thr = new thread(boost::bind(&ThreadPool::RunWorker, this));
					this->threads.push_back(thr);
				}
			}
			~ThreadPool()
			{
				mutex::scoped_lock lock(this->workItemsMutex);
				list<thread*>::iterator it = this->threads.begin();

				while (it != this->threads.end())
				{
					delete *it;
					it = this->threads.erase(it);
				}
			}

			// Methods
		public:
			void QueueWorkItem(const ThreadStart& start)
			{
				// Add the work item to the queue
				mutex::scoped_lock lock(this->workItemsMutex);
				this->workItems.push(start);

				// Unlock the work item signal
				signalMutex.unlock();
			}

		private:
			void RunWorker()
			{
				while (true)
				{
					// Wait for the signal from QueueWorkItem
					signalMutex.lock();
					signalMutex.unlock();

					// Attempt to grab a work item from the list
					ThreadStart workItem;
					bool hasWorkItem = false;
					{
						mutex::scoped_lock lock(this->workItemsMutex);
						if (!this->workItems.empty())
						{
							workItem = this->workItems.front();
							this->workItems.pop();
							hasWorkItem = true;
						}
						
						// If the work item list is now empty, lock the signal
						if (this->workItems.empty())
							signalMutex.lock();
					}

					// Run the work item in the context of this thread
					if (hasWorkItem)
						workItem();
				}
			}
		};#

Most of it should be self-explanatory. The 'signalMutex' is an attempt to implement something along the lines of a 'Wait Handle' which informs the workers that they should continue to check for, and process work items. The problem is that the thread pool seems to be getting deadlocked occasionally. I say occasionally because in some cases the thread pool will run correctly, but other times the workers will each run one loop and then freeze. I'm guessing I have a synchronisation issue, but I can't figure out where! Any assistance? :) Any comments or advice on my implementation are also welcome.

Share this post


Link to post
Share on other sites
Hello,

it's fine if you want to make your own, but I just want to let you know, in case you don't yet, that there is a threadpool library, which uses boost.threads:

http://threadpool.sourceforge.net/

Share this post


Link to post
Share on other sites
1) Don't use a mutex as a signal. Use a condition variable, it's what they're designed for.

2) QueueWorkItem presumably runs on a different thread to RunWorker. QueueWorkItem routinely unlocks signalMutex, but never acquires ownership (ie. the thread running QueueWorkItem never actually locks signalMutex). A precondition of the unlock function is that "The current thread owns *this." Since the thread QueueWorkItem runs on never locks signalMutex, it never owns the mutex. Hence, you've violated one of the preconditions.

3) Consider the case when there are no work items (ie. workItems is empty). Take a look at the RunWorker function. Your code will reach the following lines:
if (this->workItems.empty())
signalMutex.lock();

signalMutex is locked. Assume signalMutex has contention (ie. another thread is waiting on it), so RunWorker() blocks at that point. Later on in a different thread, QueueWorkItem() adds a work item and unlocks signalMutex (assuming this even works, as QueueWorkItem's thread doesn't own the mutex). RunWorker then unblocks and continues on its merry way, and remember at this point, hasWorkItem is false. So the following code doesn't execute:
if (hasWorkItem)
workItem();

Then, it goes back to the top of the while loop and relocks the mutex. But since the thread already has ownership of the mutex, and all threads enter deadlock by eternally waiting on signalMutex.

4) Threading is hard. Use the threadpool library. Seriously.

Share this post


Link to post
Share on other sites
It's just an experiment, but thanks for your explanation.

One thing I may well require of a thread pool is the ability to schedule a task only if it will be executed right away - that is, not queued up and executed after another task completes. Do you know whether the threadpool supports this kind of use?

Thanks again.

Share this post


Link to post
Share on other sites
Quote:
Original post by Barguast

One thing I may well require of a thread pool is the ability to schedule a task only if it will be executed right away - that is, not queued up and executed after another task completes. Do you know whether the threadpool supports this kind of use?


You might want to look at boost::asio, io_service is the generic task executor. See dispatch() and post().

As for executing right away - that is only possible if you intend to execute it on current thread, in which case you might as well run it directly. Otherwise, a lot of problems occur since you need to forcibly stop one of other threads, and potentially spawn a new one.

Thread pools are used for deferred processing. They can contain priority queue to determine what to execute first.

Another way to accomplish immediate execution is to spawn a new thread just for that request. Either way, trying to force immediate execution is something not typically done in this type of thread pools, since definition of immediate is problematic.

What are you really trying to do?

Share this post


Link to post
Share on other sites
Quote:
Original post by reptor
Hello,

it's fine if you want to make your own, but I just want to let you know, in case you don't yet, that there is a threadpool library, which uses boost.threads:

http://threadpool.sourceforge.net/


If you decide to go this route, you should know boost.threadpool has been renamed to boost.task . It's in the boost vault, and if I'm not mistaken, is scheduled to be included in the boost libraries soon.
I'm using it for a few things, and it seem to be a very nice piece of software.

Share this post


Link to post
Share on other sites
Thanks Nohup, I didn't know that.

I noticed there was some re-organisation done to that library to make it slip into boost nicely, but I didn't know it is actually going in already.


Edit: I think boost.task is not the same library as the one at threadpool.sourceforge.net, judging from this http://lists.boost.org/Archives/boost/2009/06/152134.php discussion. I'll have to download it and have a look.

Share this post


Link to post
Share on other sites

This topic is 3040 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this