What is wrong with my thread pool?

Started by
10 comments, last by Ryan_001 6 years, 9 months ago

I have written a mathy simulation and introduced parallel computing using the available cpu cores - based on a simple thread pool written in C++/11.

At the very start it seemed to work, but then it starts to freeze reproducable after some frames - but sometimes it works just fine.

 

But i have absolutly no clue why it wont work... Can someone can look into that? Its not that much code.

I looking at the code since hours and cannot figure out what is wrong...

 

Here is the code:

 


#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <atomic>

struct ThreadPoolTask {
	size_t startIndex;
	size_t endIndex;
	float deltaTime;
	uint8_t padding0[4];
	std::function<void(const size_t, const size_t, const float)> func;
};

struct ThreadPool {
	std::atomic<bool> stopped;
	std::vector<std::thread> threads;
	std::atomic<int> pendingCount;
	std::deque<ThreadPoolTask> queue;
	std::mutex queueMutex;
	std::mutex completeMutex;
	std::condition_variable queueSignal;
	std::condition_variable completeSignal;

	ThreadPool(const size_t threadCount = std::thread::hardware_concurrency());
	~ThreadPool();

	void AddTask(const ThreadPoolTask &task);
	void WaitUntilDone();
	void WorkerThreadProc();
	void CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};

ThreadPool::ThreadPool(const size_t threadCount) :
	stopped(false),
	pendingCount(0) {
	for (size_t workerIndex = 0; workerIndex < threadCount; ++workerIndex) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
	}
}

ThreadPool::~ThreadPool() {
	stopped = true;
	queueSignal.notify_all();
	for (size_t workerIndex = 0; workerIndex < threads.size(); ++workerIndex)
		threads[workerIndex].join();
}

void ThreadPool::AddWork(const ThreadPoolTask &entry) {
	{
		std::unique_lock<std::mutex> lock(queueMutex);
		queue.push_back(entry);
	}
	pendingCount++;
}

void ThreadPool::WaitUntilDone() {
	queueSignal.notify_all();
	std::unique_lock<std::mutex> lock(completeMutex);
	while (pendingCount > 0) {
		completeSignal.wait(lock);
	}
}

void ThreadPool::WorkerThreadProc() {
	ThreadPoolTask group;
	while (!stopped) {
		{
			std::unique_lock<std::mutex> lock(queueMutex);
			while (queue.empty()) {
				queueSignal.wait(lock);
			}
			group = queue.front();
			queue.pop_front();
		}
		group.func(group.startIndex, group.endIndex, group.deltaTime);
		if (--pendingCount == 0) {
			completeSignal.notify_one();
		}
	}
}

void ThreadPool::CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime) {
	if (itemCount > 0) {
		const size_t itemsPerTask = std::max((size_t)1, itemCount / threads.size());
		ThreadPoolTask task = {};
		task.func = function;
		task.deltaTime = deltaTime;
		for (size_t itemIndex = 0; itemIndex < itemCount; itemIndex += itemsPerTask) {
			task.startIndex = itemIndex;
			task.endIndex = std::min(itemIndex + itemsPerTask - 1, itemCount - 1);
			AddTask(task);
		}
	}
}

void main() {
  workerPool.CreateTasks(itemCount, [=](const size_t startIndex, const size_t endIndex, const float deltaTime) {
      this->DoingSomeMathyWork(startIndex, endIndex, deltaTime);
  }, deltaTime);
  workerPool.WaitUntilDone();
}

 

Advertisement

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:


if (--pendingCount == 0) 

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

1 hour ago, Kylotan said:

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:



if (--pendingCount == 0) 

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

You guessed right -> i renamed AddWork to AddTask - but forgot to update the implementation.

Well, debugging threading code is mostly hard, but in this case it was easy. The main thread was waiting forever for the tasks to be finished, but even when the tasks was finished (pendingCount == 0) it wont get signaled properly.

 

I tried a lot, like decrement the pendingCount and signal it always - the waiting is looped anyway, but this has not worked either.

 

Now i use a spinlock instead and this works perfectly:


void ThreadPool::WaitUntilDone() {
	queueSignal.notify_all();
	while (pendingCount > 0) {
		std::this_thread::sleep_for(std::chrono::microseconds(100));
	}
}

void ThreadPool::WorkerThreadProc() {
	ThreadPoolTask task;
	while (!stopped) {
		{
			std::unique_lock<std::mutex> lock(queueMutex);
			while (queue.empty()) {
				queueSignal.wait(lock);
			}
			task = queue.front();
			queue.pop_front();
		}
		task.func(task.startIndex, task.endIndex, task.deltaTime);
		--pendingCount;
	}
}

 

21 minutes ago, Finalspace said:

Now i use a spinlock instead and this works perfectly:

Seems like you found a workaround instead of a comprehensive solution. If you are going to use atomic variable, then make sure they are being used effectively..with that said and without checking the documentation of std::atomic, if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Quote

if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

Or maybe it got on the same cacheline and therefore we got a false sharing problem here?

 

Normally i would use the atomic operations directly and dont fiddle around with the C++ threading library, but i wanted to give it a try.

 

Quote

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

In my case the spinlock works - it should simply wait until all work are done - thats it. Also it makes the code much simpler. There are no race conditions or it wont bite back later. The only downside if this method, you may get a penanility of sleeping the minimum amount the os scheduler checks for the thread but only when std::this_thread::sleep() using the underlaying "sleep" from the operating system (scheduler granularity) - but no idea what the c++ implementation are doing -> I cannot look into it, there is no source for visual studio.

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.


# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>


// ----- ThreadPool -----
class ThreadPool {

	private:
		// internal types
		struct ThreadPoolTask {
			size_t start_index;
			size_t end_index;
			float delta_time;
			uint8_t padding0[4];			// ??
			std::function<void(const size_t, const size_t, const float)> func;
			};

		// atomic variables
		// you could make this non-atomic and use the mutex for accessing this as well
		// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
		// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
		// but for 'shits'n'giggles' why not leave this as atomic...
		std::atomic_size_t pending_count;

		// variables protected by the mutex
		std::mutex mutex;
		bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
		std::vector<std::thread> threads;
		std::deque<ThreadPoolTask> queue;
		std::condition_variable queue_signal;

		// internal functions
		void WorkerThreadProc();

	public:
		ThreadPool(const size_t thread_count = std::thread::hardware_concurrency());
		~ThreadPool();

		ThreadPool(const ThreadPool&) = delete;
		ThreadPool& operator=(const ThreadPool&) = delete;
		ThreadPool(ThreadPool&&) = delete;
		ThreadPool& operator=(ThreadPool&&) = delete;
  
		void WaitUntilDone();
		void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
	};


ThreadPool::ThreadPool(const size_t thread_count) {

	// three cheers for std::memory_order_relaxed
	// really its not necessary, as there's no point in saving what amounts to maybe 2ns
	// in a constructor of all places...
	pending_count.store(0, std::memory_order_relaxed);

	// since the other threads haven't started yet, no need to lock
	// the thread creation will do whatever memory synchronization is required for us
	// which for x86/x64 is probably nothing anyways
	run = true;
	threads.reserve(thread_count);
	for (size_t i = 0; i < thread_count; ++i) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
		}
	}

ThreadPool::~ThreadPool() {

	std::unique_lock<std::mutex> lock(mutex);
	run = false;
	queue_signal.notify_all();
	lock.unlock();

	for (auto& thread : threads) {
		thread.join();
		}
	}

void ThreadPool::WaitUntilDone() {
	//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
	// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
	// its easier just to spin-loop on the atomic counter
	// properly synchronizing multiple condition variables is not for the faint of heart
	while (pending_count.load(std::memory_order_relaxed) > 0) {
		std::this_thread::yield();			// sleep for also works here
		}
	}

void ThreadPool::WorkerThreadProc() {

	ThreadPoolTask group;
	std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

	while (true) {
		
		// attempt to load next task
		lock.lock();
		while (true) {
			if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
			if (!run) return;
			queue_signal.wait(lock);
			}
		group = queue.front();
		queue.pop_front();
		lock.unlock();

		// execute task
		group.func(group.start_index, group.end_index, group.delta_time);
		pending_count.fetch_sub(1, std::memory_order_relaxed);
		}
	}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
	if (item_count == 0) return;

	// init variables
	size_t threads_size = threads.size();
	size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

	// init task
	ThreadPoolTask task = {};
	task.func = function;
	task.delta_time = delta_time;

	// exception safety consideration...
	// normally I woudn't use a std::dequeue for this sort of thing
	// I have a hand written circular queue container which has a Reserve() function
	// here I would do something like: queue.Reserve(queue.Count() + threads_size)
	// this would ensure all subsequent queue.Push() functions would not throw
	// which would give you strong exception safety
	// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
	// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
	// it really depends on how you read the spec, though as far as I know it works for all implementations
	// of vector even if its a 'grey area' of the spec
	// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
	// provided of course the constructor doesn't throw
	// course in this case since you're using std::function which can throw, you'll either have to live with
	// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
	// ...
	// its 2:30 am so I'm not sure how much sense that makes

	// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
	std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

	// add tasks to queue
	size_t tasks_added = 0;
	try {
		for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
			task.start_index = i;
			task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
			queue.push_back(task);
			}
		queue_signal.notify_all();			// pretty good chance you're adding more than one item
		}
	catch (...) {
		// strong exception roll back
		// really, like I said above I hate having to do this and so a Reserve() and using something other than
		// std::function is actually my choice if I were to actually write this
		// that or just live with weak because if we go OOM here, do we really care about partial results?
		// but since were he're, for completeness, why not
		for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
		throw;
		}

	// done
	pending_count.fetch_add(tasks_added, std::memory_order_relaxed);
	}


// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask2(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask3(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}


// ----- main -----
void main() {

	// test 1
	/*
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);

	std::cout << "waiting..." << std::endl;
	pool.WaitUntilDone();
	*/

	// test 2
	{
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);
	std::cout << "waiting..." << std::endl;
	// test destructor wait for us
	}

	// done
	std::cout << "done" << std::endl;
	getchar();
	}

 

3 hours ago, Finalspace said:

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

There is a big difference between "I can decrement this atomically, and I can compare it atomically" and "I can decrement and compare this atomically". You are doing the first, which is fine providing that you don't expect the comparison to be equal to the exact result of the decrement. If you need to see the exact result of the decrement, you need the second operation, which you weren't using.

5 hours ago, Ryan_001 said:

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.



# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>


// ----- ThreadPool -----
class ThreadPool {

	private:
		// internal types
		struct ThreadPoolTask {
			size_t start_index;
			size_t end_index;
			float delta_time;
			uint8_t padding0[4];			// ??
			std::function<void(const size_t, const size_t, const float)> func;
			};

		// atomic variables
		// you could make this non-atomic and use the mutex for accessing this as well
		// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
		// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
		// but for 'shits'n'giggles' why not leave this as atomic...
		std::atomic_size_t pending_count;

		// variables protected by the mutex
		std::mutex mutex;
		bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
		std::vector<std::thread> threads;
		std::deque<ThreadPoolTask> queue;
		std::condition_variable queue_signal;

		// internal functions
		void WorkerThreadProc();

	public:
		ThreadPool(const size_t thread_count = std::thread::hardware_concurrency());
		~ThreadPool();

		ThreadPool(const ThreadPool&) = delete;
		ThreadPool& operator=(const ThreadPool&) = delete;
		ThreadPool(ThreadPool&&) = delete;
		ThreadPool& operator=(ThreadPool&&) = delete;
  
		void WaitUntilDone();
		void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
	};


ThreadPool::ThreadPool(const size_t thread_count) {

	// three cheers for std::memory_order_relaxed
	// really its not necessary, as there's no point in saving what amounts to maybe 2ns
	// in a constructor of all places...
	pending_count.store(0, std::memory_order_relaxed);

	// since the other threads haven't started yet, no need to lock
	// the thread creation will do whatever memory synchronization is required for us
	// which for x86/x64 is probably nothing anyways
	run = true;
	threads.reserve(thread_count);
	for (size_t i = 0; i < thread_count; ++i) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
		}
	}

ThreadPool::~ThreadPool() {

	std::unique_lock<std::mutex> lock(mutex);
	run = false;
	queue_signal.notify_all();
	lock.unlock();

	for (auto& thread : threads) {
		thread.join();
		}
	}

void ThreadPool::WaitUntilDone() {
	//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
	// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
	// its easier just to spin-loop on the atomic counter
	// properly synchronizing multiple condition variables is not for the faint of heart
	while (pending_count.load(std::memory_order_relaxed) > 0) {
		std::this_thread::yield();			// sleep for also works here
		}
	}

void ThreadPool::WorkerThreadProc() {

	ThreadPoolTask group;
	std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

	while (true) {
		
		// attempt to load next task
		lock.lock();
		while (true) {
			if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
			if (!run) return;
			queue_signal.wait(lock);
			}
		group = queue.front();
		queue.pop_front();
		lock.unlock();

		// execute task
		group.func(group.start_index, group.end_index, group.delta_time);
		pending_count.fetch_sub(1, std::memory_order_relaxed);
		}
	}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
	if (item_count == 0) return;

	// init variables
	size_t threads_size = threads.size();
	size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

	// init task
	ThreadPoolTask task = {};
	task.func = function;
	task.delta_time = delta_time;

	// exception safety consideration...
	// normally I woudn't use a std::dequeue for this sort of thing
	// I have a hand written circular queue container which has a Reserve() function
	// here I would do something like: queue.Reserve(queue.Count() + threads_size)
	// this would ensure all subsequent queue.Push() functions would not throw
	// which would give you strong exception safety
	// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
	// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
	// it really depends on how you read the spec, though as far as I know it works for all implementations
	// of vector even if its a 'grey area' of the spec
	// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
	// provided of course the constructor doesn't throw
	// course in this case since you're using std::function which can throw, you'll either have to live with
	// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
	// ...
	// its 2:30 am so I'm not sure how much sense that makes

	// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
	std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

	// add tasks to queue
	size_t tasks_added = 0;
	try {
		for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
			task.start_index = i;
			task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
			queue.push_back(task);
			}
		queue_signal.notify_all();			// pretty good chance you're adding more than one item
		}
	catch (...) {
		// strong exception roll back
		// really, like I said above I hate having to do this and so a Reserve() and using something other than
		// std::function is actually my choice if I were to actually write this
		// that or just live with weak because if we go OOM here, do we really care about partial results?
		// but since were he're, for completeness, why not
		for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
		throw;
		}

	// done
	pending_count.fetch_add(tasks_added, std::memory_order_relaxed);
	}


// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask2(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask3(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}


// ----- main -----
void main() {

	// test 1
	/*
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);

	std::cout << "waiting..." << std::endl;
	pool.WaitUntilDone();
	*/

	// test 2
	{
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);
	std::cout << "waiting..." << std::endl;
	// test destructor wait for us
	}

	// done
	std::cout << "done" << std::endl;
	getchar();
	}

Awesome, that is a much better solution - thanks! Its much better to use the atomic methods directly, instead of relying on the overloaded operators.

Also using yield removed the os scheduler thread waiting penality ;)

Now it is much much faster.

You are still assuming that the decrement operators are actually atomic...like I mentioned I did not look at the spec, but iirc before the standard C++ supported atomics, the Boost C++ atomic template operators were not atomic, one would have to use the compare and exchange methods to guarantee atomicity. This looks like the approach the re-write too. 

This topic is closed to new replies.

Advertisement