# C++ What is wrong with my thread pool?

This topic is 530 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

## Recommended Posts

I have written a mathy simulation and introduced parallel computing using the available cpu cores - based on a simple thread pool written in C++/11.

At the very start it seemed to work, but then it starts to freeze reproducable after some frames - but sometimes it works just fine.

But i have absolutly no clue why it wont work... Can someone can look into that? Its not that much code.

I looking at the code since hours and cannot figure out what is wrong...

Here is the code:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <atomic>

size_t startIndex;
size_t endIndex;
float deltaTime;
std::function<void(const size_t, const size_t, const float)> func;
};

std::atomic<bool> stopped;
std::atomic<int> pendingCount;
std::mutex queueMutex;
std::mutex completeMutex;
std::condition_variable queueSignal;
std::condition_variable completeSignal;

void WaitUntilDone();
void CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};

stopped(false),
pendingCount(0) {
for (size_t workerIndex = 0; workerIndex < threadCount; ++workerIndex) {
}
}

stopped = true;
queueSignal.notify_all();
for (size_t workerIndex = 0; workerIndex < threads.size(); ++workerIndex)
}

{
std::unique_lock<std::mutex> lock(queueMutex);
queue.push_back(entry);
}
pendingCount++;
}

queueSignal.notify_all();
std::unique_lock<std::mutex> lock(completeMutex);
while (pendingCount > 0) {
completeSignal.wait(lock);
}
}

while (!stopped) {
{
std::unique_lock<std::mutex> lock(queueMutex);
while (queue.empty()) {
queueSignal.wait(lock);
}
group = queue.front();
queue.pop_front();
}
group.func(group.startIndex, group.endIndex, group.deltaTime);
if (--pendingCount == 0) {
completeSignal.notify_one();
}
}
}

void ThreadPool::CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime) {
if (itemCount > 0) {
const size_t itemsPerTask = std::max((size_t)1, itemCount / threads.size());
for (size_t itemIndex = 0; itemIndex < itemCount; itemIndex += itemsPerTask) {
task.endIndex = std::min(itemIndex + itemsPerTask - 1, itemCount - 1);
}
}
}

void main() {
workerPool.CreateTasks(itemCount, [=](const size_t startIndex, const size_t endIndex, const float deltaTime) {
this->DoingSomeMathyWork(startIndex, endIndex, deltaTime);
}, deltaTime);
workerPool.WaitUntilDone();
}

Edited by Finalspace

##### Share on other sites

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:

if (--pendingCount == 0)

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

##### Share on other sites
1 hour ago, Kylotan said:

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:


if (--pendingCount == 0) 

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

You guessed right -> i renamed AddWork to AddTask - but forgot to update the implementation.

Well, debugging threading code is mostly hard, but in this case it was easy. The main thread was waiting forever for the tasks to be finished, but even when the tasks was finished (pendingCount == 0) it wont get signaled properly.

I tried a lot, like decrement the pendingCount and signal it always - the waiting is looped anyway, but this has not worked either.

Now i use a spinlock instead and this works perfectly:

void ThreadPool::WaitUntilDone() {
queueSignal.notify_all();
while (pendingCount > 0) {
}
}

while (!stopped) {
{
std::unique_lock<std::mutex> lock(queueMutex);
while (queue.empty()) {
queueSignal.wait(lock);
}
queue.pop_front();
}
--pendingCount;
}
}

##### Share on other sites
21 minutes ago, Finalspace said:

Now i use a spinlock instead and this works perfectly:

Seems like you found a workaround instead of a comprehensive solution. If you are going to use atomic variable, then make sure they are being used effectively..with that said and without checking the documentation of std::atomic, if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

##### Share on other sites

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

##### Share on other sites
Quote

if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

Or maybe it got on the same cacheline and therefore we got a false sharing problem here?

Normally i would use the atomic operations directly and dont fiddle around with the C++ threading library, but i wanted to give it a try.

Quote

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

In my case the spinlock works - it should simply wait until all work are done - thats it. Also it makes the code much simpler. There are no race conditions or it wont bite back later. The only downside if this method, you may get a penanility of sleeping the minimum amount the os scheduler checks for the thread but only when std::this_thread::sleep() using the underlaying "sleep" from the operating system (scheduler granularity) - but no idea what the c++ implementation are doing -> I cannot look into it, there is no source for visual studio.

Edited by Finalspace

##### Share on other sites

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.

# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>

// ----- ThreadPool -----

private:
// internal types
size_t start_index;
size_t end_index;
float delta_time;
uint8_t padding0[4];			// ??
std::function<void(const size_t, const size_t, const float)> func;
};

// atomic variables
// you could make this non-atomic and use the mutex for accessing this as well
// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
// but for 'shits'n'giggles' why not leave this as atomic...
std::atomic_size_t pending_count;

// variables protected by the mutex
std::mutex mutex;
bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
std::condition_variable queue_signal;

// internal functions

public:

void WaitUntilDone();
void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};

// three cheers for std::memory_order_relaxed
// really its not necessary, as there's no point in saving what amounts to maybe 2ns
// in a constructor of all places...
pending_count.store(0, std::memory_order_relaxed);

// since the other threads haven't started yet, no need to lock
// the thread creation will do whatever memory synchronization is required for us
// which for x86/x64 is probably nothing anyways
run = true;
for (size_t i = 0; i < thread_count; ++i) {
}
}

std::unique_lock<std::mutex> lock(mutex);
run = false;
queue_signal.notify_all();
lock.unlock();

}
}

//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
// its easier just to spin-loop on the atomic counter
// properly synchronizing multiple condition variables is not for the faint of heart
while (pending_count.load(std::memory_order_relaxed) > 0) {
std::this_thread::yield();			// sleep for also works here
}
}

std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

while (true) {

lock.lock();
while (true) {
if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
if (!run) return;
queue_signal.wait(lock);
}
group = queue.front();
queue.pop_front();
lock.unlock();

group.func(group.start_index, group.end_index, group.delta_time);
pending_count.fetch_sub(1, std::memory_order_relaxed);
}
}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
if (item_count == 0) return;

// init variables
size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

// exception safety consideration...
// normally I woudn't use a std::dequeue for this sort of thing
// I have a hand written circular queue container which has a Reserve() function
// here I would do something like: queue.Reserve(queue.Count() + threads_size)
// this would ensure all subsequent queue.Push() functions would not throw
// which would give you strong exception safety
// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
// it really depends on how you read the spec, though as far as I know it works for all implementations
// of vector even if its a 'grey area' of the spec
// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
// provided of course the constructor doesn't throw
// course in this case since you're using std::function which can throw, you'll either have to live with
// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
// ...
// its 2:30 am so I'm not sure how much sense that makes

// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

try {
for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
}
queue_signal.notify_all();			// pretty good chance you're adding more than one item
}
catch (...) {
// strong exception roll back
// really, like I said above I hate having to do this and so a Reserve() and using something other than
// std::function is actually my choice if I were to actually write this
// that or just live with weak because if we go OOM here, do we really care about partial results?
// but since were he're, for completeness, why not
for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
throw;
}

// done
}

// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

void TestTask2(const size_t start, const size_t end, const float f) {
std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

void TestTask3(const size_t start, const size_t end, const float f) {
std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

// ----- main -----
void main() {

// test 1
/*

std::cout << "waiting..." << std::endl;
pool.WaitUntilDone();
*/

// test 2
{
std::cout << "waiting..." << std::endl;
// test destructor wait for us
}

// done
std::cout << "done" << std::endl;
getchar();
}

Edited by Ryan_001

##### Share on other sites
3 hours ago, Finalspace said:

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

There is a big difference between "I can decrement this atomically, and I can compare it atomically" and "I can decrement and compare this atomically". You are doing the first, which is fine providing that you don't expect the comparison to be equal to the exact result of the decrement. If you need to see the exact result of the decrement, you need the second operation, which you weren't using.

##### Share on other sites
5 hours ago, Ryan_001 said:

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.


# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>

// ----- ThreadPool -----

private:
// internal types
size_t start_index;
size_t end_index;
float delta_time;
uint8_t padding0[4];			// ??
std::function<void(const size_t, const size_t, const float)> func;
};

// atomic variables
// you could make this non-atomic and use the mutex for accessing this as well
// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
// but for 'shits'n'giggles' why not leave this as atomic...
std::atomic_size_t pending_count;

// variables protected by the mutex
std::mutex mutex;
bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
std::condition_variable queue_signal;

// internal functions

public:

void WaitUntilDone();
void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};

// three cheers for std::memory_order_relaxed
// really its not necessary, as there's no point in saving what amounts to maybe 2ns
// in a constructor of all places...
pending_count.store(0, std::memory_order_relaxed);

// since the other threads haven't started yet, no need to lock
// the thread creation will do whatever memory synchronization is required for us
// which for x86/x64 is probably nothing anyways
run = true;
for (size_t i = 0; i < thread_count; ++i) {
}
}

std::unique_lock<std::mutex> lock(mutex);
run = false;
queue_signal.notify_all();
lock.unlock();

}
}

//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
// its easier just to spin-loop on the atomic counter
// properly synchronizing multiple condition variables is not for the faint of heart
while (pending_count.load(std::memory_order_relaxed) > 0) {
std::this_thread::yield();			// sleep for also works here
}
}

std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

while (true) {

lock.lock();
while (true) {
if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
if (!run) return;
queue_signal.wait(lock);
}
group = queue.front();
queue.pop_front();
lock.unlock();

group.func(group.start_index, group.end_index, group.delta_time);
pending_count.fetch_sub(1, std::memory_order_relaxed);
}
}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
if (item_count == 0) return;

// init variables
size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

// exception safety consideration...
// normally I woudn't use a std::dequeue for this sort of thing
// I have a hand written circular queue container which has a Reserve() function
// here I would do something like: queue.Reserve(queue.Count() + threads_size)
// this would ensure all subsequent queue.Push() functions would not throw
// which would give you strong exception safety
// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
// it really depends on how you read the spec, though as far as I know it works for all implementations
// of vector even if its a 'grey area' of the spec
// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
// provided of course the constructor doesn't throw
// course in this case since you're using std::function which can throw, you'll either have to live with
// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
// ...
// its 2:30 am so I'm not sure how much sense that makes

// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

try {
for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
}
queue_signal.notify_all();			// pretty good chance you're adding more than one item
}
catch (...) {
// strong exception roll back
// really, like I said above I hate having to do this and so a Reserve() and using something other than
// std::function is actually my choice if I were to actually write this
// that or just live with weak because if we go OOM here, do we really care about partial results?
// but since were he're, for completeness, why not
for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
throw;
}

// done
}

// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

void TestTask2(const size_t start, const size_t end, const float f) {
std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

void TestTask3(const size_t start, const size_t end, const float f) {
std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
}

// ----- main -----
void main() {

// test 1
/*

std::cout << "waiting..." << std::endl;
pool.WaitUntilDone();
*/

// test 2
{
std::cout << "waiting..." << std::endl;
// test destructor wait for us
}

// done
std::cout << "done" << std::endl;
getchar();
}

Awesome, that is a much better solution - thanks! Its much better to use the atomic methods directly, instead of relying on the overloaded operators.

Also using yield removed the os scheduler thread waiting penality

Now it is much much faster.

Edited by Finalspace

##### Share on other sites

You are still assuming that the decrement operators are actually atomic...like I mentioned I did not look at the spec, but iirc before the standard C++ supported atomics, the Boost C++ atomic template operators were not atomic, one would have to use the compare and exchange methods to guarantee atomicity. This looks like the approach the re-write too.

1. 1
2. 2
3. 3
Rutin
15
4. 4
5. 5

• 10
• 9
• 9
• 11
• 11
• ### Forum Statistics

• Total Topics
633682
• Total Posts
3013306
×