• Advertisement
  • Popular Tags

  • Popular Now

  • Advertisement
  • Similar Content

    • By Ty Typhoon
      I like to build my A - Team now.
       
      I need loyal people who can trust and believe in a dream.
      If you got time and no need for direct pay please contact me now.
       
      We cant pay now, you will recieve a lifetime percentage if the released game will give earnings. 
      If I get the best people together for a team, everything should be possible.
       
       
      What i need:
      - Programmer c++
      - Unity / Unreal - we must check whats possible, please share your experience with me.
      - Sculpter, 3D Artist
      - Animator
      - Marketing / Promotion 
       
       
      What i do:
      - Studio Owner
      - Director
      - Recruit exactly you
      - Sounddesign
      - Main theme composing
      - Vocals
      - Game design
      - Gun, swords, shields and weapon design
      - Character, plants and animal design
       
       
      Please dont ask about the Name of the Game, about Designs or Screenshots.
      The game will be defintitly affected about our and your skills if you join the team.
       
       
      Planned for the big Game:
      - 1st person shooter
      - online multiplayer
      - character manipulation
      - complete big open world with like lifetime actions and reactions
      - gunstore with many items to buy
      - many upgrades for players
      - specials like mini games
       
      So if you are interested in joining a team with a nearly complete game idea, contact me now and tell me what you can do.
       
      discord:
      joerg federmann composing#2898
       
       
    • By codelyoko373
      I wasn't sure if this would be the right place for a topic like this so sorry if it isn't.
      I'm currently working on a project for Uni using FreeGLUT to make a simple solar system simulation. I've got to the point where I've implemented all the planets and have used a Scene Graph to link them all together. The issue I'm having with now though is basically the planets and moons orbit correctly at their own orbit speeds.
      I'm not really experienced with using matrices for stuff like this so It's likely why I can't figure out how exactly to get it working. This is where I'm applying the transformation matrices, as well as pushing and popping them. This is within the Render function that every planet including the sun and moons will have and run.
      if (tag != "Sun") { glRotatef(orbitAngle, orbitRotation.X, orbitRotation.Y, orbitRotation.Z); } glPushMatrix(); glTranslatef(position.X, position.Y, position.Z); glRotatef(rotationAngle, rotation.X, rotation.Y, rotation.Z); glScalef(scale.X, scale.Y, scale.Z); glDrawElements(GL_TRIANGLES, mesh->indiceCount, GL_UNSIGNED_SHORT, mesh->indices); if (tag != "Sun") { glPopMatrix(); } The "If(tag != "Sun")" parts are my attempts are getting the planets to orbit correctly though it likely isn't the way I'm meant to be doing it. So I was wondering if someone would be able to help me? As I really don't have an idea on what I would do to get it working. Using the if statement is truthfully the closest I've got to it working but there are still weird effects like the planets orbiting faster then they should depending on the number of planets actually be updated/rendered.
    • By BenjaminBouchet
      Learning game development in Unreal Engine could be a daunting task for someone who don’t know where to start, and a cumbersome process if you don’t organize your progression correctly. One thing commonly known by experienced developers and by people unfamiliar with coding: mastering a development language is a long and difficult task.
      From blueprints to C++ in Unreal Engine
      If you want to learn fast, you need a good learning strategy. Unreal Engine contains a very powerful tool which you can use to learn C++ faster: its blueprint system. Blueprints are extremely easy to learn (and you may already have a good knowledge of them). Thus you can conveniently use them as a guide for writing code in C++. This is the reason why I am writing a tutorial series on how to make the transition from Unreal Engine blueprints to C++.
      Learn and practice C++
      Following this tutorial, you’ll acquire new concepts of C++ programming in every chapter. Then following chapters will give you reasons to reuse and practice those same concepts. There’s no better way to wire you brain.
      Link to the tutorial: [Tutorial] Learn C++ in Unreal Engine 4 by making a powerful camera
      Please do send me as much feedback as you want. I’ll be considering every constructive remarks and taking them into consideration. Your feedback will help me to improve and update the existing chapters and to make the next one better.

      View full story
    • By BenjaminBouchet
      Learning game development in Unreal Engine could be a daunting task for someone who don’t know where to start, and a cumbersome process if you don’t organize your progression correctly. One thing commonly known by experienced developers and by people unfamiliar with coding: mastering a development language is a long and difficult task.
      From blueprints to C++ in Unreal Engine
      If you want to learn fast, you need a good learning strategy. Unreal Engine contains a very powerful tool which you can use to learn C++ faster: its blueprint system. Blueprints are extremely easy to learn (and you may already have a good knowledge of them). Thus you can conveniently use them as a guide for writing code in C++. This is the reason why I am writing a tutorial series on how to make the transition from Unreal Engine blueprints to C++.
      Learn and practice C++
      Following this tutorial, you’ll acquire new concepts of C++ programming in every chapter. Then following chapters will give you reasons to reuse and practice those same concepts. There’s no better way to wire you brain.
      Link to the tutorial: [Tutorial] Learn C++ in Unreal Engine 4 by making a powerful camera
      Please do send me as much feedback as you want. I’ll be considering every constructive remarks and taking them into consideration. Your feedback will help me to improve and update the existing chapters and to make the next one better.
    • By mrDIMAS
      Hello everyone! I need to fill lua table with functions from script file like this:
      function init() end function update() end I need to create table on stack and fill it with this functions from specified file. How can I do this?
  • Advertisement
  • Advertisement
Sign in to follow this  

C++ What is wrong with my thread pool?

Recommended Posts

I have written a mathy simulation and introduced parallel computing using the available cpu cores - based on a simple thread pool written in C++/11.

At the very start it seemed to work, but then it starts to freeze reproducable after some frames - but sometimes it works just fine.

 

But i have absolutly no clue why it wont work... Can someone can look into that? Its not that much code.

I looking at the code since hours and cannot figure out what is wrong...

 

Here is the code:

 

#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
#include <atomic>

struct ThreadPoolTask {
	size_t startIndex;
	size_t endIndex;
	float deltaTime;
	uint8_t padding0[4];
	std::function<void(const size_t, const size_t, const float)> func;
};

struct ThreadPool {
	std::atomic<bool> stopped;
	std::vector<std::thread> threads;
	std::atomic<int> pendingCount;
	std::deque<ThreadPoolTask> queue;
	std::mutex queueMutex;
	std::mutex completeMutex;
	std::condition_variable queueSignal;
	std::condition_variable completeSignal;

	ThreadPool(const size_t threadCount = std::thread::hardware_concurrency());
	~ThreadPool();

	void AddTask(const ThreadPoolTask &task);
	void WaitUntilDone();
	void WorkerThreadProc();
	void CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
};

ThreadPool::ThreadPool(const size_t threadCount) :
	stopped(false),
	pendingCount(0) {
	for (size_t workerIndex = 0; workerIndex < threadCount; ++workerIndex) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
	}
}

ThreadPool::~ThreadPool() {
	stopped = true;
	queueSignal.notify_all();
	for (size_t workerIndex = 0; workerIndex < threads.size(); ++workerIndex)
		threads[workerIndex].join();
}

void ThreadPool::AddWork(const ThreadPoolTask &entry) {
	{
		std::unique_lock<std::mutex> lock(queueMutex);
		queue.push_back(entry);
	}
	pendingCount++;
}

void ThreadPool::WaitUntilDone() {
	queueSignal.notify_all();
	std::unique_lock<std::mutex> lock(completeMutex);
	while (pendingCount > 0) {
		completeSignal.wait(lock);
	}
}

void ThreadPool::WorkerThreadProc() {
	ThreadPoolTask group;
	while (!stopped) {
		{
			std::unique_lock<std::mutex> lock(queueMutex);
			while (queue.empty()) {
				queueSignal.wait(lock);
			}
			group = queue.front();
			queue.pop_front();
		}
		group.func(group.startIndex, group.endIndex, group.deltaTime);
		if (--pendingCount == 0) {
			completeSignal.notify_one();
		}
	}
}

void ThreadPool::CreateTasks(const size_t itemCount, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime) {
	if (itemCount > 0) {
		const size_t itemsPerTask = std::max((size_t)1, itemCount / threads.size());
		ThreadPoolTask task = {};
		task.func = function;
		task.deltaTime = deltaTime;
		for (size_t itemIndex = 0; itemIndex < itemCount; itemIndex += itemsPerTask) {
			task.startIndex = itemIndex;
			task.endIndex = std::min(itemIndex + itemsPerTask - 1, itemCount - 1);
			AddTask(task);
		}
	}
}

void main() {
  workerPool.CreateTasks(itemCount, [=](const size_t startIndex, const size_t endIndex, const float deltaTime) {
      this->DoingSomeMathyWork(startIndex, endIndex, deltaTime);
  }, deltaTime);
  workerPool.WaitUntilDone();
}

 

Edited by Finalspace

Share this post


Link to post
Share on other sites
Advertisement

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:

if (--pendingCount == 0) 

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

Share this post


Link to post
Share on other sites
1 hour ago, Kylotan said:

First up, I'm guessing that's not your real code, because you have a prototype of AddTask but a function body of AddWork...

Second... if it freezes, can't you use a debugger to see where it's hanging?

Third: you have things like 'completeMutex' which is only referenced in one place. Either that's redundant, or you're using it elsewhere and that matters (see above about real code, perhaps?)

Finally... lines like this look a bit suspect to me:


if (--pendingCount == 0) 

...because what happens if something else increments pendingCount between you decrementing it and then comparing it? Sometimes it doesn't matter, sometimes it does, and I'm not sure in your case. If there's the chance of adding extra tasks then there's the possibility that pendingCount gets incremented at precisely the wrong moment and you never do the signal that was expected at that point.

Personally it looks to me like you have too many synchronisation primitives there for what should be a relatively simple operation. But I've not tried to do this with standard C++ so maybe I'm wrong.

You guessed right -> i renamed AddWork to AddTask - but forgot to update the implementation.

Well, debugging threading code is mostly hard, but in this case it was easy. The main thread was waiting forever for the tasks to be finished, but even when the tasks was finished (pendingCount == 0) it wont get signaled properly.

 

I tried a lot, like decrement the pendingCount and signal it always - the waiting is looped anyway, but this has not worked either.

 

Now i use a spinlock instead and this works perfectly:

void ThreadPool::WaitUntilDone() {
	queueSignal.notify_all();
	while (pendingCount > 0) {
		std::this_thread::sleep_for(std::chrono::microseconds(100));
	}
}

void ThreadPool::WorkerThreadProc() {
	ThreadPoolTask task;
	while (!stopped) {
		{
			std::unique_lock<std::mutex> lock(queueMutex);
			while (queue.empty()) {
				queueSignal.wait(lock);
			}
			task = queue.front();
			queue.pop_front();
		}
		task.func(task.startIndex, task.endIndex, task.deltaTime);
		--pendingCount;
	}
}

 

Share this post


Link to post
Share on other sites
21 minutes ago, Finalspace said:

Now i use a spinlock instead and this works perfectly:

Seems like you found a workaround instead of a comprehensive solution. If you are going to use atomic variable, then make sure they are being used effectively..with that said and without checking the documentation of std::atomic, if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

Share this post


Link to post
Share on other sites

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

Share this post


Link to post
Share on other sites
Quote

if the -- operator is not atomic they you'll have race conditions like Kylotan pointed out.

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

Or maybe it got on the same cacheline and therefore we got a false sharing problem here?

 

Normally i would use the atomic operations directly and dont fiddle around with the C++ threading library, but i wanted to give it a try.

 

Quote

Your original code's problem is that you manipulate pendingCount outside of a lock. Spinning is NOT a fix for threading incorrectness bugs, it is a band-aid and it WILL bite back eventually.

In my case the spinlock works - it should simply wait until all work are done - thats it. Also it makes the code much simpler. There are no race conditions or it wont bite back later. The only downside if this method, you may get a penanility of sleeping the minimum amount the os scheduler checks for the thread but only when std::this_thread::sleep() using the underlaying "sleep" from the operating system (scheduler granularity) - but no idea what the c++ implementation are doing -> I cannot look into it, there is no source for visual studio.

Edited by Finalspace

Share this post


Link to post
Share on other sites

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.

# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>


// ----- ThreadPool -----
class ThreadPool {

	private:
		// internal types
		struct ThreadPoolTask {
			size_t start_index;
			size_t end_index;
			float delta_time;
			uint8_t padding0[4];			// ??
			std::function<void(const size_t, const size_t, const float)> func;
			};

		// atomic variables
		// you could make this non-atomic and use the mutex for accessing this as well
		// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
		// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
		// but for 'shits'n'giggles' why not leave this as atomic...
		std::atomic_size_t pending_count;

		// variables protected by the mutex
		std::mutex mutex;
		bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
		std::vector<std::thread> threads;
		std::deque<ThreadPoolTask> queue;
		std::condition_variable queue_signal;

		// internal functions
		void WorkerThreadProc();

	public:
		ThreadPool(const size_t thread_count = std::thread::hardware_concurrency());
		~ThreadPool();

		ThreadPool(const ThreadPool&) = delete;
		ThreadPool& operator=(const ThreadPool&) = delete;
		ThreadPool(ThreadPool&&) = delete;
		ThreadPool& operator=(ThreadPool&&) = delete;
  
		void WaitUntilDone();
		void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
	};


ThreadPool::ThreadPool(const size_t thread_count) {

	// three cheers for std::memory_order_relaxed
	// really its not necessary, as there's no point in saving what amounts to maybe 2ns
	// in a constructor of all places...
	pending_count.store(0, std::memory_order_relaxed);

	// since the other threads haven't started yet, no need to lock
	// the thread creation will do whatever memory synchronization is required for us
	// which for x86/x64 is probably nothing anyways
	run = true;
	threads.reserve(thread_count);
	for (size_t i = 0; i < thread_count; ++i) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
		}
	}

ThreadPool::~ThreadPool() {

	std::unique_lock<std::mutex> lock(mutex);
	run = false;
	queue_signal.notify_all();
	lock.unlock();

	for (auto& thread : threads) {
		thread.join();
		}
	}

void ThreadPool::WaitUntilDone() {
	//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
	// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
	// its easier just to spin-loop on the atomic counter
	// properly synchronizing multiple condition variables is not for the faint of heart
	while (pending_count.load(std::memory_order_relaxed) > 0) {
		std::this_thread::yield();			// sleep for also works here
		}
	}

void ThreadPool::WorkerThreadProc() {

	ThreadPoolTask group;
	std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

	while (true) {
		
		// attempt to load next task
		lock.lock();
		while (true) {
			if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
			if (!run) return;
			queue_signal.wait(lock);
			}
		group = queue.front();
		queue.pop_front();
		lock.unlock();

		// execute task
		group.func(group.start_index, group.end_index, group.delta_time);
		pending_count.fetch_sub(1, std::memory_order_relaxed);
		}
	}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
	if (item_count == 0) return;

	// init variables
	size_t threads_size = threads.size();
	size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

	// init task
	ThreadPoolTask task = {};
	task.func = function;
	task.delta_time = delta_time;

	// exception safety consideration...
	// normally I woudn't use a std::dequeue for this sort of thing
	// I have a hand written circular queue container which has a Reserve() function
	// here I would do something like: queue.Reserve(queue.Count() + threads_size)
	// this would ensure all subsequent queue.Push() functions would not throw
	// which would give you strong exception safety
	// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
	// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
	// it really depends on how you read the spec, though as far as I know it works for all implementations
	// of vector even if its a 'grey area' of the spec
	// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
	// provided of course the constructor doesn't throw
	// course in this case since you're using std::function which can throw, you'll either have to live with
	// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
	// ...
	// its 2:30 am so I'm not sure how much sense that makes

	// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
	std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

	// add tasks to queue
	size_t tasks_added = 0;
	try {
		for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
			task.start_index = i;
			task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
			queue.push_back(task);
			}
		queue_signal.notify_all();			// pretty good chance you're adding more than one item
		}
	catch (...) {
		// strong exception roll back
		// really, like I said above I hate having to do this and so a Reserve() and using something other than
		// std::function is actually my choice if I were to actually write this
		// that or just live with weak because if we go OOM here, do we really care about partial results?
		// but since were he're, for completeness, why not
		for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
		throw;
		}

	// done
	pending_count.fetch_add(tasks_added, std::memory_order_relaxed);
	}


// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask2(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask3(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}


// ----- main -----
void main() {

	// test 1
	/*
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);

	std::cout << "waiting..." << std::endl;
	pool.WaitUntilDone();
	*/

	// test 2
	{
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);
	std::cout << "waiting..." << std::endl;
	// test destructor wait for us
	}

	// done
	std::cout << "done" << std::endl;
	getchar();
	}

 

Edited by Ryan_001

Share this post


Link to post
Share on other sites
3 hours ago, Finalspace said:

endingCount is atomic - look at the declaration!

Therefore it doesent matter if its incremented in a lock or not, isnt it?

Also i would had expected to be a compare and exchange or just a atomic increment...

There is a big difference between "I can decrement this atomically, and I can compare it atomically" and "I can decrement and compare this atomically". You are doing the first, which is fine providing that you don't expect the comparison to be equal to the exact result of the decrement. If you need to see the exact result of the decrement, you need the second operation, which you weren't using.

Share this post


Link to post
Share on other sites
5 hours ago, Ryan_001 said:

So it seems Atlas Reactor couldn't find me any games tonight, and I was a little bored, so I re-wrote your ThreadPool.  It compiles and runs under VS 2017.


# include <thread>
# include <mutex>
# include <condition_variable>
# include <vector>
# include <deque>
# include <atomic>
# include <iostream>
# include <functional>
# include <algorithm>


// ----- ThreadPool -----
class ThreadPool {

	private:
		// internal types
		struct ThreadPoolTask {
			size_t start_index;
			size_t end_index;
			float delta_time;
			uint8_t padding0[4];			// ??
			std::function<void(const size_t, const size_t, const float)> func;
			};

		// atomic variables
		// you could make this non-atomic and use the mutex for accessing this as well
		// the only real change would be in the WaitUntilDone() function you'd need to acquire/release
		// the lock every check, which isn't a bad thing, the performance will be near identical to an atomic
		// but for 'shits'n'giggles' why not leave this as atomic...
		std::atomic_size_t pending_count;

		// variables protected by the mutex
		std::mutex mutex;
		bool run;						// there's no point to having this atomic, all accesses are always checked while in a lock
		std::vector<std::thread> threads;
		std::deque<ThreadPoolTask> queue;
		std::condition_variable queue_signal;

		// internal functions
		void WorkerThreadProc();

	public:
		ThreadPool(const size_t thread_count = std::thread::hardware_concurrency());
		~ThreadPool();

		ThreadPool(const ThreadPool&) = delete;
		ThreadPool& operator=(const ThreadPool&) = delete;
		ThreadPool(ThreadPool&&) = delete;
		ThreadPool& operator=(ThreadPool&&) = delete;
  
		void WaitUntilDone();
		void CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float deltaTime);
	};


ThreadPool::ThreadPool(const size_t thread_count) {

	// three cheers for std::memory_order_relaxed
	// really its not necessary, as there's no point in saving what amounts to maybe 2ns
	// in a constructor of all places...
	pending_count.store(0, std::memory_order_relaxed);

	// since the other threads haven't started yet, no need to lock
	// the thread creation will do whatever memory synchronization is required for us
	// which for x86/x64 is probably nothing anyways
	run = true;
	threads.reserve(thread_count);
	for (size_t i = 0; i < thread_count; ++i) {
		threads.push_back(std::thread(&ThreadPool::WorkerThreadProc, this));
		}
	}

ThreadPool::~ThreadPool() {

	std::unique_lock<std::mutex> lock(mutex);
	run = false;
	queue_signal.notify_all();
	lock.unlock();

	for (auto& thread : threads) {
		thread.join();
		}
	}

void ThreadPool::WaitUntilDone() {
	//queueSignal.notify_all();				// shouldn't be here, we don't need to wake threads
	// having multiple condition variables could in theory work, but unless you call WaitUntilDone() often
	// its easier just to spin-loop on the atomic counter
	// properly synchronizing multiple condition variables is not for the faint of heart
	while (pending_count.load(std::memory_order_relaxed) > 0) {
		std::this_thread::yield();			// sleep for also works here
		}
	}

void ThreadPool::WorkerThreadProc() {

	ThreadPoolTask group;
	std::unique_lock<std::mutex> lock(mutex, std::defer_lock);

	while (true) {
		
		// attempt to load next task
		lock.lock();
		while (true) {
			if (!queue.empty()) break;		// I'm assuming we want to finish all threads prior to exiting
			if (!run) return;
			queue_signal.wait(lock);
			}
		group = queue.front();
		queue.pop_front();
		lock.unlock();

		// execute task
		group.func(group.start_index, group.end_index, group.delta_time);
		pending_count.fetch_sub(1, std::memory_order_relaxed);
		}
	}

void ThreadPool::CreateTasks(const size_t item_count, const std::function<void(const size_t, const size_t, const float)> &function, const float delta_time) {
	if (item_count == 0) return;

	// init variables
	size_t threads_size = threads.size();
	size_t items_per_task = std::max<size_t>(1, item_count / threads_size);

	// init task
	ThreadPoolTask task = {};
	task.func = function;
	task.delta_time = delta_time;

	// exception safety consideration...
	// normally I woudn't use a std::dequeue for this sort of thing
	// I have a hand written circular queue container which has a Reserve() function
	// here I would do something like: queue.Reserve(queue.Count() + threads_size)
	// this would ensure all subsequent queue.Push() functions would not throw
	// which would give you strong exception safety
	// std::dequeue does not support reserve() so you only get weak (or have to manually roll back)
	// there's also the issue of whether std::vector<T>::reserve() even allows for strong exception
	// it really depends on how you read the spec, though as far as I know it works for all implementations
	// of vector even if its a 'grey area' of the spec
	// in all containers I've personally written its valid to Reserve() and know that you get no exceptions
	// provided of course the constructor doesn't throw
	// course in this case since you're using std::function which can throw, you'll either have to live with
	// weak exception safety (which is good for 99.999% of stuff) or roll back the queue by hand
	// ...
	// its 2:30 am so I'm not sure how much sense that makes

	// lock as late as possible so that calculations that don't need to be locked for, aren't in the lock
	std::lock_guard<std::mutex> lock(mutex);		// lock once and then add all items, don't lock for every single item added

	// add tasks to queue
	size_t tasks_added = 0;
	try {
		for (size_t i = 0; i < item_count; i += items_per_task, ++tasks_added) {
			task.start_index = i;
			task.end_index = std::min(i + items_per_task, item_count);			// when in doubt use a half open internval as opposed to a closed interval
			queue.push_back(task);
			}
		queue_signal.notify_all();			// pretty good chance you're adding more than one item
		}
	catch (...) {
		// strong exception roll back
		// really, like I said above I hate having to do this and so a Reserve() and using something other than
		// std::function is actually my choice if I were to actually write this
		// that or just live with weak because if we go OOM here, do we really care about partial results?
		// but since were he're, for completeness, why not
		for (size_t i = 0; i < tasks_added; ++i) queue.pop_back();
		throw;
		}

	// done
	pending_count.fetch_add(tasks_added, std::memory_order_relaxed);
	}


// ----- test function ----
void TestTask1(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask1 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask2(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask2 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}

void TestTask3(const size_t start, const size_t end, const float f) {
	std::cout << "TestTask3 start = " << start << ", end = " << end << ", f = " << f << std::endl;
	std::this_thread::sleep_for(std::chrono::milliseconds(2000));
	}


// ----- main -----
void main() {

	// test 1
	/*
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);

	std::cout << "waiting..." << std::endl;
	pool.WaitUntilDone();
	*/

	// test 2
	{
	ThreadPool pool;
	pool.CreateTasks(5, TestTask1, 1.1f);
	pool.CreateTasks(50, TestTask2, 2.2f);
	pool.CreateTasks(500, TestTask3, 3.3f);
	std::cout << "waiting..." << std::endl;
	// test destructor wait for us
	}

	// done
	std::cout << "done" << std::endl;
	getchar();
	}

Awesome, that is a much better solution - thanks! Its much better to use the atomic methods directly, instead of relying on the overloaded operators.

Also using yield removed the os scheduler thread waiting penality ;)

Now it is much much faster.

Edited by Finalspace

Share this post


Link to post
Share on other sites

You are still assuming that the decrement operators are actually atomic...like I mentioned I did not look at the spec, but iirc before the standard C++ supported atomics, the Boost C++ atomic template operators were not atomic, one would have to use the compare and exchange methods to guarantee atomicity. This looks like the approach the re-write too. 

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this  

  • Advertisement