So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool. It compiles and runs under VS 2017.
# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>
// ----- ThreadPool -----
class ThreadPool {
private:
// internal types
struct ThreadPoolTask {
size_t start_index;
size_t end_index;
float delta_time;
uint8_t padding0[4]; // ??
std::function<void(const size_t, const size_t, const float)> func;
};
// atomic variables
// you could make this non-atomic and use the mutex for accessing this as well
// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
// but for 'shits'n'giggles' why not leave this as atomic...
std::atomic_size_t pending_count;
// variables protected by the mutex
std::mutex mutex;
bool run; // there's no point to having this atomic, all accesses are always checked while in a lock
std::vector<std::thread> threads;
std::deque<ThreadPoolTask> queue;
std::condition_variable queue_signal;
// internal functions
void WorkerThreadProc();
public:
ThreadPool(const size_t thread_count = std::thread::hardware_concurrency());
~ThreadPool();
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
void WaitUntilDone();
void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};
ThreadPool::ThreadPool(const size_t thread_count) {
// three cheers for std::memory_order_relaxed
// really its not necessary, as there's no point in saving what amounts to maybe 2ns
// in a constructor of all places...
pending_count.store(0, std::memory_order_relaxed);
// since the other threads haven't started yet, no need to lock
// the thread creation will do whatever memory synchronization is required for us
// which for x86/x64 is probably nothing anyways
run = true;
threads.reserve(thread_count);
for (size_t i = 0; i < thread_count; ++i) {
threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
}
}
ThreadPool::~ThreadPool() {
std::unique_lock<std::mutex> lock(mutex);
run = false;
queue_signal.notify_all();
lock.unlock();
for (auto& thread : threads) {
thread.join();
}
}
void ThreadPool::WaitUntilDone() {
//queueSignal.notify_all(); // shouldn't be here, we don't need to wake threads
// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
// its easier just to spin-loop on the atomic counter
// properly synchronizing multiple condition variables is not for the faint of heart
while (pending_count.load(std::memory_order_relaxed) > 0) {
std::this_thread::yield(); // sleep for also works here
}
}
void ThreadPool::WorkerThreadProc() {
ThreadPoolTask group;
std::unique_lock<std::mutex> lock(mutex, std::defer_lock);
while (true) {
// attempt to load next task
lock.lock();
while (true) {
if (!queue.empty()) break; // I'm assuming we want to finish all threads prior to exiting
if (!run) return;
queue_signal.wait(lock);
}
group = queue.front();
queue.pop_front();
lock.unlock();
// execute task
group.func(group.start_index, group.end_index, group.delta_time);
pending_count.fetch_sub(1, std::memory_order_relaxed);
}
}
void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
if (item_count == 0) return;
// init variables
size_t threads_size = threads.size();
size_t items_per_task = std::max<size_t>(1, item_count / threads_size);
// init task
ThreadPoolTask task = {};
task.func = function;
task.delta_time = delta_time;
// exception safety consideration...
// normally I woudn't use a std::dequeue for this sort of thing
// I have a hand written circular queue container which has a Reserve() function
// here I would do something like: queue.Reserve(queue.Count() + threads_size)
// this would ensure all subsequent queue.Push() functions would not throw
// which would give you strong exception safety
// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
// it really depends on how you read the spec, though as far as I know it works for all implementations
// of vector even if its a 'grey area' of the spec
// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
// provided of course the constructor doesn't throw
// course in this case since you're using std::function which can throw, you'll either have to live with
// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
// ...
// its 2:30 am so I'm not sure how much sense that makes
// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
std::lock_guard<std::mutex> lock(mutex); // lock once and then add all items, don't lock for every single item added
// add tasks to queue
size_t tasks_added = 0;
try {
for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
task.start_index = i;
task.end_index = std::min(i + items_per_task, item_count); // when in doubt use a half open internval as opposed to a closed interval
queue.push_back(task);
}
queue_signal.notify_all(); // pretty good chance you're adding more than one item
}
catch (...) {
// strong exception roll back
// really, like I said above I hate having to do this and so a Reserve() and using something other than
// std::function is actually my choice if I were to actually write this
// that or just live with weak because if we go OOM here, do we really care about partial results?
// but since were he're, for completeness, why not
for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
throw;
}
// done
pending_count.fetch_add(tasks_added, std::memory_order_relaxed);
}
// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
void TestTask2(const size_t start, const size_t end, const float f) {
std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
void TestTask3(const size_t start, const size_t end, const float f) {
std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
// ----- main -----
void main() {
// test 1
/*
ThreadPool pool;
pool.CreateTasks(5, TestTask1, 1.1f);
pool.CreateTasks(50, TestTask2, 2.2f);
pool.CreateTasks(500, TestTask3, 3.3f);
std::cout << "waiting..." << std::endl;
pool.WaitUntilDone();
*/
// test 2
{
ThreadPool pool;
pool.CreateTasks(5, TestTask1, 1.1f);
pool.CreateTasks(50, TestTask2, 2.2f);
pool.CreateTasks(500, TestTask3, 3.3f);
std::cout << "waiting..." << std::endl;
// test destructor wait for us
}
// done
std::cout << "done" << std::endl;
getchar();
}