Threadpool with abortable jobs and then-function

Started by
19 comments, last by corysama 7 years, 7 months ago

Hello everybody!

I have written a multipurpose-threadpool based on boost::asio where I can file jobs and receive a handle to abort jobs. When adding a job, I can (must, at the moment) provide a "then" function that is called upon finishing the job. This function gets as argument the handle (to check which job has been done if multiple jobs are added with the same "then" function) and the return value of the job function.

My questions are:

1) Is this thread-safe?

2) Is there optimization potential? For example, do I copy the shared_ptr too frequently? I presume I could move it within my lambda that I pass to post because h is copied within the lambda, but I am not sure.

3) Anything else?

This is the code:


    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <memory>
    #include <atomic>
    
    class Handle
    {
    public:
    	using SharedPtrBool = std::shared_ptr<std::atomic<bool>>;
    
    	Handle() : abort(new std::atomic<bool>(false)) {}
    
    	// checks whether handles are equal and whether both are not aborted
    	bool EqualAndUnaborted(const Handle& rhs) const
    	{
    		// any abort flag equal to true?
    		if (*abort)
    			return false;
    		
    		// do pointers equal?
    		return abort.get() == rhs.abort.get();
    	}
    
    	// aborts thread
    	void Abort()
    	{
    		*abort = true;
    	}
    
    	// get identifier
    	int GetIdentifier() const
    	{
    		static_assert(sizeof(int) == sizeof(decltype(abort.get())), "identifier type (int) size is not equal to pointer size");
    		return reinterpret_cast<int>(abort.get());
    	}
    
    private:
    	SharedPtrBool abort;
    
    	friend class ThreadPool;
    };
    
    class ThreadPool
    {
    public:
    	using SharedPtrBool = Handle::SharedPtrBool;
    
    	ThreadPool(int threads)
    		: work(ioService)
    	{
    		for (int n = 0; n < threads; ++n)
    			threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    	}
    
    	~ThreadPool()
    	{
    		ioService.stop();
    		threadPool.join_all();
    	}
    
    	template<typename T, typename U>
    	Handle AddJob(T func, U then)
    	{
    		std::cout << "Job added" << std::endl;
    		Handle h;
    		ioService.post([h, func, then](){
    			then(h, func(h.abort));
    		});
    
    		return h;
    	}
    
    	ThreadPool(const ThreadPool&) = delete;
    	ThreadPool& operator=(const ThreadPool&) = delete;
    	ThreadPool(ThreadPool&&) = delete;
    	ThreadPool& operator=(ThreadPool&&) = delete;
    
    private:
    	boost::asio::io_service ioService;
    	boost::thread_group threadPool;
    	boost::asio::io_service::work work;
    };

Here is some test program (compilable, if you just put it directly below the code above):


    Handle h1, h2, h3;
    
    void done(Handle h, int r)
    {
    	if (h.EqualAndUnaborted(h1))
    	{
    		std::cout << "First with " << r << std::endl;
    	}
    	else if (h.EqualAndUnaborted(h2))
    	{
    		std::cout << "Second with " << r << std::endl;
    	}
    	else if (h.EqualAndUnaborted(h3))
    	{
    		std::cout << "Third with " << r << std::endl;
    	}
    }
    
    int main()
    {
    	ThreadPool pool(4);
    
    	h1 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 999999999 && !*abort; ++n)
    			if (n % 1500 == 0)
    				++m;
    		return m;
    	}, done);
    
    	h2 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 9999999 && !*abort; ++n)
    			if (n % 2000 == 0)
    				++m;
    		return m;
    	}, done);
    
    	h3 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 9999999 && !*abort; ++n)
    			if (n % 1900 == 0)
    				++m;
    		return m;
    	}, done);
    
    	boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
    
    	h1.Abort();
    
    	std::cout << "aborted" << std::endl;
    
    	boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
    }

Thank you very much in advance!

Advertisement

1) Probably. But your example code is non-deterministic (output ordering and whether the Abort call does anything).
2) Of course; you're using boost :wink:
Instead of Handle having the abort member of type std::shared_ptr<std::atomic<bool>>, could it just be std::atomic<bool> instead? Shared-ptr says that you don't know what the lifetime of your abort flag is, which seems overkill. Instead of AddJob returning a Handle by value (and done taking a Handle by value), AddJob could take a Handle by reference, done could take it by const-reference, and Handle could become non-copyable.
3)Design wise, why do you need to abort jobs in the first place? Typically in game, jobs are very small (sub millisecond), so I guess this is more general, such as for tools / non-game GUI stuff too?

Thank you for your answer!

1)

What would you suggest here? If Abort does nothing because the job is already aborted, then this is fine, which is why the handler checks whether it is aborted and ignores the result, then.

Output ordering is not considered, you are right. Currently, I do not see why I would need this, though. The jobs are supposed to be independent on each other.

2)

The question is, how would my main thread store this handle then, and when should I release the handle? The then-function lives in the threadpool thread, so I cannot destroy it there because the main thread could simultaneously try to abort it. At least, I would have to synchronize the access, then. Would you suggest this? I would love to remove the shared_ptr overhead.

3) Yeah, exactly, I would also like to use this pool for jobs that could take a longer time. The user should be able to abort jobs or start new ones.

But why do you need to abort jobs? Or are do you mean a Coroutine's yield?

What I have in mind is indeed a user-interface thing. The user can start jobs with specific parameters but parameters might change. For example, he might start a job and then recognize "ahh, this value is wrong", change it and restart the job. Or he might just decide that the result of the job is no longer interesting for him and might want to abort it for good.

This is one use case I have in mind. For me, aborting jobs just seems like something that I would like to have, when I have a queue of jobs that a user might control.

I do not like any software, where some resource-taking tasks are started and you have to wait until it is finally over, although you have decided - meanwhile - to not be interested in the result anymore. So, the user interface thread should work at the same time and I do not want to put calculations into the thread that is also responsible for accepting user input and reacting to that, so that the user experiences a fluid software, although some background jobs might need time.

I have never really worked with coroutines but, by my understanding, coroutine's yield seem like an interruption, where things can be continued later. In my case, it would be a complete abortion of the job.

If you have such large tasks that the time to execute them is noticeable to the user, then you should probably just break those tasks into smaller units (e.g. <100ms to execute), so that if you do need to abort/restart a computation you can just remove the unexecuted task segments from the queue. You'll also get better parallelism by making your tasks smaller (but not too small), since the threads in the pool can be kept busy continuously.

Aborting jobs will either be intrusive (job has to continually poll to see if it should stop), or destructive (killing the thread). Neither are great options.

When you talk about desiring a responsive user interface, this can be most easily achieved by having the UI run on a separate thread at a high update rate (e.g. 60Hz), so that it never has to wait on the completion of any complex tasks. The UI thread responds to events from the user and then quickly adds jobs to the thread pool as needed. When the jobs finish they can call a completion handler that locks a mutex and updates the UI display based on the new computation, so the waiting is minimal.

I'm not sure that passing the handle is a good idea.
It should be an implémentation detail.
You should go with the concept of promises like in Javascript, i like how simple and easy to use they are.
Probably one of the best design for an async lib.

Thanks for your answers, as well!

I am not absolutely sure on how to solve the issue with removing the handle.

Aressara:

My tasks are oftentimes just really big loops that go through a large number of cases. Splitting them into smaller tasks would be possible, but it would increase the coordination effort of the tasks, as well. I would need to have all the outputs coordinated at some point, so that their part results are merged into the large result which I desire. And only then, the "then" function would be called. Seems like a lot of overhead to me. On the other hand, passing some abort flag seems to be quite fine. Of course, my thread function needs to have it, then. Then again, I would also like to offer some AddJob method overload without the need of an abort flag in the case that the job should not be stoppable.

My experience with multithreading is limited, I have to say, but breaking down jobs into smaller tasks and coordinating the results seems to make my issue more complicated, the code harder to grasp and maintenance harder than just introducing some flag.

What you describe in your third paragraph is actually what I can do with my solution. If I break down tasks, how would the architecture look like? At the moment, I would have completion handlers for each subtask. But maybe (most likely) I am not getting your idea completely, so far.

Eddycharly:

I have just looked into the promise concept shortly. Here it is described ( https://blog.codecentric.de/en/2015/03/cancelable-async-operations-promises-javascript/ ) with the Blueprint library, where you can stop any subordinated promise that is still pending. So, I would need to split my task into, say, 100 tasks that would normally run sequentially but here, this would be 100 cascaded promises? Could you elaborate on whether this would be your idea or how would be your rough implementation idea with something like a promise in my case?

Can't you have a sort of promise interface and an implementation wrapping the handle ?
Of course you need to expose à cancelation flag, this is user responsibility to abort processing when requested. There's no way to do it from outside.

As a personal preference, i like the bluebird implementation of js promises.

I didnt write c++ code for a long time. Maybe ill give it a try for fun, but i can't promise ;)

I like your pun. :)

Well, if it is only about the interface I do not really see the difference to promises. In Bluebird, you can call promise.cancel(); and in my case you can call handle.cancel();

Where is the perceived advantage or in what regard should I change my handle's interface in your opinion? I appreciate your offer to write a code example, I just am not sure whether this definitely shows which I should change at my existing code, so it might be even less effort for you to just point out the difference (since I do not know the difference yet, I may be wrong, of course).

This topic is closed to new replies.

Advertisement