# Threadpool with abortable jobs and then-function

## Recommended Posts

Hello everybody!

I have written a multipurpose-threadpool based on boost::asio where I can file jobs and receive a handle to abort jobs. When adding a job, I can (must, at the moment) provide a "then" function that is called upon finishing the job. This function gets as argument the handle (to check which job has been done if multiple jobs are added with the same "then" function) and the return value of the job function.

My questions are:

1) Is this thread-safe?

2) Is there optimization potential? For example, do I copy the shared_ptr too frequently? I presume I could move it within my lambda that I pass to post because h is copied within the lambda, but I am not sure.

3) Anything else?

This is the code:

    #include <boost/asio.hpp>
#include <memory>
#include <atomic>

class Handle
{
public:
using SharedPtrBool = std::shared_ptr<std::atomic<bool>>;

Handle() : abort(new std::atomic<bool>(false)) {}

// checks whether handles are equal and whether both are not aborted
bool EqualAndUnaborted(const Handle& rhs) const
{
// any abort flag equal to true?
if (*abort)
return false;

// do pointers equal?
return abort.get() == rhs.abort.get();
}

void Abort()
{
*abort = true;
}

// get identifier
int GetIdentifier() const
{
static_assert(sizeof(int) == sizeof(decltype(abort.get())), "identifier type (int) size is not equal to pointer size");
return reinterpret_cast<int>(abort.get());
}

private:
SharedPtrBool abort;

};

{
public:
using SharedPtrBool = Handle::SharedPtrBool;

: work(ioService)
{
for (int n = 0; n < threads; ++n)
}

{
ioService.stop();
}

template<typename T, typename U>
Handle AddJob(T func, U then)
{
std::cout << "Job added" << std::endl;
Handle h;
ioService.post([h, func, then](){
then(h, func(h.abort));
});

return h;
}

private:
boost::asio::io_service ioService;
boost::asio::io_service::work work;
};



Here is some test program (compilable, if you just put it directly below the code above):

    Handle h1, h2, h3;

void done(Handle h, int r)
{
if (h.EqualAndUnaborted(h1))
{
std::cout << "First with " << r << std::endl;
}
else if (h.EqualAndUnaborted(h2))
{
std::cout << "Second with " << r << std::endl;
}
else if (h.EqualAndUnaborted(h3))
{
std::cout << "Third with " << r << std::endl;
}
}

int main()
{

int m = 0;
for (int n = 0; n < 999999999 && !*abort; ++n)
if (n % 1500 == 0)
++m;
return m;
}, done);

int m = 0;
for (int n = 0; n < 9999999 && !*abort; ++n)
if (n % 2000 == 0)
++m;
return m;
}, done);

int m = 0;
for (int n = 0; n < 9999999 && !*abort; ++n)
if (n % 1900 == 0)
++m;
return m;
}, done);

h1.Abort();

std::cout << "aborted" << std::endl;

}



Thank you very much in advance!

Edited by IceMichael

##### Share on other sites

1) Probably. But your example code is non-deterministic (output ordering and whether the Abort call does anything).
2) Of course; you're using boost :wink:
Instead of Handle having the abort member of type std::shared_ptr<std::atomic<bool>>, could it just be std::atomic<bool> instead? Shared-ptr says that you don't know what the lifetime of your abort flag is, which seems overkill. Instead of AddJob returning a Handle by value (and done taking a Handle by value), AddJob could take a Handle by reference, done could take it by const-reference, and Handle could become non-copyable.
3)Design wise, why do you need to abort jobs in the first place? Typically in game, jobs are very small (sub millisecond), so I guess this is more general, such as for tools / non-game GUI stuff too?

##### Share on other sites

1)

What would you suggest here? If Abort does nothing because the job is already aborted, then this is fine, which is why the handler checks whether it is aborted and ignores the result, then.

Output ordering is not considered, you are right. Currently, I do not see why I would need this, though. The jobs are supposed to be independent on each other.

2)

The question is, how would my main thread store this handle then, and when should I release the handle? The then-function lives in the threadpool thread, so I cannot destroy it there because the main thread could simultaneously try to abort it. At least, I would have to synchronize the access, then. Would you suggest this? I would love to remove the shared_ptr overhead.

3) Yeah, exactly, I would also like to use this pool for jobs that could take a longer time. The user should be able to abort jobs or start new ones.

##### Share on other sites

But why do you need to abort jobs? Or are do you mean a Coroutine's yield?

##### Share on other sites

What I have in mind is indeed a user-interface thing. The user can start jobs with specific parameters but parameters might change. For example, he might start a job and then recognize "ahh, this value is wrong", change it and restart the job. Or he might just decide that the result of the job is no longer interesting for him and might want to abort it for good.

This is one use case I have in mind. For me, aborting jobs just seems like something that I would like to have, when I have a queue of jobs that a user might control.

I do not like any software, where some resource-taking tasks are started and you have to wait until it is finally over, although you have decided - meanwhile - to not be interested in the result anymore. So, the user interface thread should work at the same time and I do not want to put calculations into the thread that is also responsible for accepting user input and reacting to that, so that the user experiences a fluid software, although some background jobs might need time.

I have never really worked with coroutines but, by my understanding, coroutine's yield seem like an interruption, where things can be continued later. In my case, it would be a complete abortion of the job.

Edited by IceMichael

##### Share on other sites

If you have such large tasks that the time to execute them is noticeable to the user, then you should probably just break those tasks into smaller units (e.g. <100ms to execute), so that if you do need to abort/restart a computation you can just remove the unexecuted task segments from the queue. You'll also get better parallelism by making your tasks smaller (but not too small), since the threads in the pool can be kept busy continuously.

Aborting jobs will either be intrusive (job has to continually poll to see if it should stop), or destructive (killing the thread). Neither are great options.

When you talk about desiring a responsive user interface, this can be most easily achieved by having the UI run on a separate thread at a high update rate (e.g. 60Hz), so that it never has to wait on the completion of any complex tasks. The UI thread responds to events from the user and then quickly adds jobs to the thread pool as needed. When the jobs finish they can call a completion handler that locks a mutex and updates the UI display based on the new computation, so the waiting is minimal.

Edited by Aressera

##### Share on other sites
I'm not sure that passing the handle is a good idea.
It should be an implémentation detail.
You should go with the concept of promises like in Javascript, i like how simple and easy to use they are.
Probably one of the best design for an async lib.

##### Share on other sites

I am not absolutely sure on how to solve the issue with removing the handle.

Aressara:

My tasks are oftentimes just really big loops that go through a large number of cases. Splitting them into smaller tasks would be possible, but it would increase the coordination effort of the tasks, as well. I would need to have all the outputs coordinated at some point, so that their part results are merged into the large result which I desire. And only then, the "then" function would be called. Seems like a lot of overhead to me. On the other hand, passing some abort flag seems to be quite fine. Of course, my thread function needs to have it, then. Then again, I would also like to offer some AddJob method overload without the need of an abort flag in the case that the job should not be stoppable.

My experience with multithreading is limited, I have to say, but breaking down jobs into smaller tasks and coordinating the results seems to make my issue more complicated, the code harder to grasp and maintenance harder than just introducing some flag.

What you describe in your third paragraph is actually what I can do with my solution. If I break down tasks, how would the architecture look like? At the moment, I would have completion handlers for each subtask. But maybe (most likely) I am not getting your idea completely, so far.

Eddycharly:

I have just looked into the promise concept shortly. Here it is described ( https://blog.codecentric.de/en/2015/03/cancelable-async-operations-promises-javascript/ ) with the Blueprint library, where you can stop any subordinated promise that is still pending. So, I would need to split my task into, say, 100 tasks that would normally run sequentially but here, this would be 100 cascaded promises? Could you elaborate on whether this would be your idea or how would be your rough implementation idea with something like a promise in my case?

Edited by IceMichael

##### Share on other sites
Can't you have a sort of promise interface and an implementation wrapping the handle ?
Of course you need to expose à cancelation flag, this is user responsibility to abort processing when requested. There's no way to do it from outside.

As a personal preference, i like the bluebird implementation of js promises.

I didnt write c++ code for a long time. Maybe ill give it a try for fun, but i can't promise ;)

##### Share on other sites

I like your pun. :)

Well, if it is only about the interface I do not really see the difference to promises. In Bluebird, you can call promise.cancel(); and in my case you can call handle.cancel();

Where is the perceived advantage or in what regard should I change my handle's interface in your opinion? I appreciate your offer to write a code example, I just am not sure whether this definitely shows which I should change at my existing code, so it might be even less effort for you to just point out the difference (since I do not know the difference yet, I may be wrong, of course).

Edited by IceMichael

##### Share on other sites
I thought the handle came from the boost lib that's why i said it should not be exposed but i was mistaken, sorry. I realised my error when reading your code again.

Anyway, what i like in promises is how they can be chained. It would be nice if your handle could handle that ;)

##### Share on other sites

Yes, this would be a nice extension. :) It's not my handle that does this, though, you can just do it with the then() function. I think in JS you put the then() method to the promise and in my case you put it to the AddJob function... does not make a whole lot of difference to me. But I am very open to be shown wrong!

##### Share on other sites
The obvious question: if this isn't for learning purposes, why aren't you using an existing task library like Intel's Threading Building Blocks or Microsoft's Parallel Programming Library? Writing a safe, easy, and efficient task system is _scary hard_. Anything you write is almost certainly going to be worse than the existing solutions unless you have very particular uncommon constraints.

##### Share on other sites

I have not had looked into them too much before your post, to be honest. TBB has no license that I can/am willing to use. PPL looks nice but I want to stay OS-independent. I also do not need everything from it and I indeed want to boost my learning with multithreading. My questions above remain

Edited by IceMichael

##### Share on other sites

I have not had looked into them too much before your post, to be honest. TBB has no license that I can/am willing to use. PPL looks nice but I want to stay OS-independent. I also do not need everything from it and I indeed want to boost my learning with multithreading. My questions above remain

There is the COM license, but I can understand what you mean.

Anyways, you say -big- loops. What do you mean big loops exactly? If they are simply going through a large list of data and processing, then you can speed it up by dumping the data listing into multiple tasks. That one loop would just be something that would get called in the main thread.

If you're running into problems where a task needs information in order to complete, than you can do a directed graph to sort out your dependencies.

However, if tasks are doing a back and forth sort of deal, then you will need to use Co-routines. Which are basically fibers in windows I think. They can be just as good as regular threads when used correctly. But these do better fit the whole needing to abort bit.

Because most cases, your tasks are so small, that they get completed very quickly.

##### Share on other sites

Actually, having had second look, PPL's GPL has a runtime exception that allows dynamic linking, which (in my eyes) basically promotes it to a LGPL-like license.

Anyways, you say -big- loops. What do you mean big loops exactly? If they are simply going through a large list of data and processing, then you can speed it up by dumping the data listing into multiple tasks. That one loop would just be something that would get called in the main thread.

Yes, a large array of data is walked through but the results of those tasks have to be evaluated again to form the real overall result. This evaluation is not time-consuming, though, and could be done in the main thread.

What do you mean, this main loop would be called from the main thread? It should not be blocking, so you probably just mean spawning many small tasks? I am not sure if you are already assuming some task framework here. (and for your directed graph solution, I believe that you do assume this)

A more "meta" question: Most of all of your suggestions here go into the direction of making smaller tasks and somehow merging the results of those tasks. Do I understand correctly that this is because you would like to cancel the tasks without having to introduce some abort variable directly into the job that is passed, e.g. sharedAbort in

AddJob([](shared_ptr<atomic<bool>> sharedAbort){});

because this solution is intrusive and you would like to inhibit this by any means?

##### Share on other sites
With the way your framework and example are written, notice that it's up to each individual job functor to be a good citizen and periodically check the abort flag. You can implement non-abortable jobs by simply choosing not to check and respond to that flag... However, maybe it should be the other way around.

I would use a layered approach, where your thread-pool / job system itself is not aware of 'aborting' as a concept. Doing so actually removes the need for the Handle class completely, making your job system a lot simpler.
Then, given this 'simple' job system that does one thing (and does it well), you can just as easily implement abortable jobs as another layer on top of it. Same goes for jobs with dependencies, jobs that need to report when they're complete, etc -- all these extra responsibilities can be layered on top of the simple/dumb/reusable job system.

e.g. the simplest example:
        std::atomic<bool> abort = false, completed = false;
int m = 0;
for (int n = 0; n < 9999999 && !*abort; ++n)
if (n % 1900 == 0)
++m;
return abort ? -1 : m;
}, [&completed](int m){completed = true; done(m);} );
abort = true;
while(!completed){}

##### Share on other sites

You mix the abort and a job end event capability into the sheduler which isn't used by the sheduler.

h1 = pool.AddJob([](ThreadPool::SharedPtrBool abort){ int m = 0; for (int n = 0; n < 999999999 && !*abort; ++n) if (n % 1500 == 0) ++m; return m; }, done);

You pass a abort parameter and use this only in the job itself but you never use it in the sheduler which means it's task specific data and you mess up your AddJob function by restricting it to this kind of function. I think it's better to use a void pointer or a parameter class which can be derived from to pass your task specific data.

void pointer has zero dependencies but you can get caught by wrong reinterpret_cast and no possibility for auto cleanup.

A basic class introduce virtual function calls but you can safely use static_cast to get the right data for your task and you can move the data to the thread and let them cleaned up after the execution.

{
public:
virtual void Execute();

virtual void SendMessage(int Id){};

private:
std::function m_Callback;

};

{
public:

enum { ABORT_MSG = 0 };

virtual void SendMessage(int Id) override { if (Id == ABORT_MSG) m_Abort = true; }
bool GetAbort()const;
private:
std::atomic<bool> m_Abort;
};

int m = 0;
for (int n = 0; n < 9999999 && !task.GetAbort(); ++n)
if (n % 1900 == 0) ++m;
if (n < 9999999)// triggered abort
std::cout << "First job abort!" << std::endl;

// task should be capable to spawn new tasks to implement divide and conquer strategy which scales really good with different amount of logical processor

BaseTask* jobIsComplete = new BaseTask([](BaseTask& Instance){ std::cout << "First job is complete" << std::endl; });

});

// You should pass the job instance to the pool with shared pointer, auto pointer or similar mechanics
auto h1 = pool.AddJob(job);

Using messages instead to of communicating with the job instance directly, because you get in trouble if the job is already gone and cleaned up.

Move the data for a job into the scope of the job because you get a mess up if you have to synchronize arrays and not thread safe classes, it's cache friendly and on top you can free the data after the job is done(in the sheduler).

Your AddJob should not block and you should return only a handle not a object instance and allow only to work with the handle on the public API of your ThreadPool.

If you use real object instance people can do a messup with no longer existing jobs because only the sheduler knows if they are still exists and you need thread safe code which should be handled by the sheduler too.

##### Share on other sites

Hodgman:

Yes, I ended up (for now, still very open to improvement ideas) utilizing this approach. I still need a shared_ptr since I do not join my job-thread / wait for it to end and could at any time cancel it, which could be just after the thread has finished. But it seems to be a bit more clean to have it the way that my scheduler is really very thin. Frameworks like PPL, however, have some quite invasive abort token, anyways. I think, the way it ends a thread is even with exceptions which I am not sure if I am a fan of in this context. Would you also suggest that they redo their concept a bit?

TAK2004:

I like your idea of moving the responsibility for aborting a task into a Task instead of having access to the abort flag. However, if the user has to cast in order to prevent using a virtual method, it does seem even more intrusive than just passing an atomic bool into it. Also, it would be very easy to just call the virtual methods when a downcast would prevent us from using them. If we cast anyway, we could just remove the virtual keywords, could we not?

Also, I would really be interested in your synchronization of SendMessage in ThreadPool. You have to make sure there that the job still exists and otherwise discard the message. You say that the scheduler can clean up the thread after execution but how? Would you do this in the event loop of the scheduler, thus omitting the current boost::asio way of scheduling like I do?

The main takeaway from your code for me is: Use messages instead of direct handlers and I like that a lot! Thank you!

Edited by IceMichael

##### Share on other sites

Hodgman:

Yes, I ended up (for now, still very open to improvement ideas) utilizing this approach. I still need a shared_ptr since I do not join my job-thread / wait for it to end and could at any time cancel it, which could be just after the thread has finished. But it seems to be a bit more clean to have it the way that my scheduler is really very thin. Frameworks like PPL, however, have some quite invasive abort token, anyways. I think, the way it ends a thread is even with exceptions which I am not sure if I am a fan of in this context. Would you also suggest that they redo their concept a bit?

TAK2004:

I like your idea of moving the responsibility for aborting a task into a Task. However, if the user has to cast in order to prevent using a virtual method, it does seem even more intrusive than just passing an atomic bool into it.

Also, I would really be interested in your synchronization of SendMessage in ThreadPool. You have to make sure there that the job still exists and otherwise discard the message. You say that the scheduler can clean up the thread after execution but how? Would you do this in the event loop of the scheduler, thus omitting the current boost::asio way of scheduling like I do?

The main takeaway from your code for me is: Use messages instead of direct handlers and I like that a lot! Thank you!

You don't cast to prevent the virtual method, I just put the Abort mechanic in a special task because not every task have to be canceled.

In game and tool programming the minority of tasks need this feature and the base class should be as simple and slim as possible.

You can put the abort in the BaseTask if you prefere it this way.

In the future you will encounter tasks which need a couple of data and/or have to give back results.

Then you can derive from BaseTask and put all the pointer, references and values in the task and no longer care take care of the ownership because the task take it.

A good example is the processing the meshes of a model. You create a BoundingVolumeTask class which contains a pointer to the vertices, offset and element count.

You set the references in task to the real min and max vectors you want to get back, then set the vertex pointer, offset and count and add the job.

Your producer don't care about offset and count and don't need to hold the variables somewhere till the job is done and this simplifies your code.

{
public:
void SetMesh(float* Data, size_t Start, size_t, Count);
void SetMinMaxResult(vec3f& Min, vec3f&Max);
virtual Execute()override{
if (task.m_Count > 33)
{
subTask->SetMesh(m_Data, m_Start, m_Count >> 1);
subTask2->SetMesh(m_Data, m_Start + (m_Count >> 1), m_Count >> 1);
}
else
{
vec3f min=vec3f::INF,max=vec3f::NEG_INF;
for (size_t i = m_Start; i < m_Start+ m_Count; ++i)
{
min.x = min.x > m_Data[i*3] ? m_Data[i*3] : min.x;
max.x = max.x > m_Data[i*3] ? m_Data[i*3] : max.x;
min.y = min.y > m_Data[i*3+1] ? m_Data[i*3+1] : min.y;
max.y = max.y > m_Data[i*3+1] ? m_Data[i*3+1] : max.y;
min.z = min.z > m_Data[i*3+2] ? m_Data[i*3+2] : min.z;
max.z = max.z > m_Data[i*3+2] ? m_Data[i*3+2] : max.z;
}
m_Min.AssignSmallest(min);// spinlocks with atomic foo
m_Max.AssignLargest(max);// spinlocks with atomic foo
}
}
private:
vec3f *m_Min,*m_Max;// need sync
float* m_Data;// need sync
size_t m_Start;
size_t m_Count;
}

vec3f min,max;
getAABB.SetMinMaxResult(min,max);
getAABB.SetMesh(model.GetVertices().GetPtr(), 0, model.GetVertices().Size());

This is a more complex example(not complete) which split the current task into 2 new tasks till the task is small enought and jumps into the processing.

It makes no sence to use a lambda expression because this task is special and it's easier to override the Execute method.

You can make it even more complicated by expecting that the model and min,max instances can vanish during the execution or it could be that you use the min/max to early.

Then you need a job counter(atomic int) which increase for each created task and decrease on scope end of Execute() and check the counter before access min/max.

Sync the model with the task needs to send the Abort message and/or wait till the job counter reach 0 and block the removal of the model.

It seems I used the wrong words. You should abort a task not a thread.

The thread pool should create for each logical processor a thread at the creation and throw them away at process end.

I abort/cancel threads only in the internal API.

My public API only support to pause the whole sheduler in different ways(gentle, forcefull, blocking, disable).

https://github.com/tak2004/Organesson/blob/master/code/console/console.cpp

Abort a task on the other side is done in userspace because I rarely used it and wasn't worth it to put it into a class.

To send a message to a task you should create a message queue for each worker thread which can processed by the task.

GetAbort e.g. can look into the queue and process all messages which are assigned to it's instance.

This way you only increase the workload for the tasks which use this feature because other task will never process them and throw them away after they are done.

If you want to remove a job you have to go through the job list and can get pretty messy depending on your implementation of the job list and this will slow down your sheduler in general.

##### Share on other sites

Actually, having had second look, PPL's GPL has a runtime exception that allows dynamic linking, which (in my eyes) basically promotes it to a LGPL-like license.

Have a third look.  Last I checked, the runtime exception allows static linking.  Would you use the C stdlib that comes with GCC?   Nearly all programs compiled with GCC do.  This is literally the same license.  I think GNU made their stdlib GPL because they're GNU --that's what they do.  But, they added a minimal "exception" that basically weakens the GPL to "If you modify specifically our code, you must publish specifically those modifications.  The end."  because they knew that making all programs compiled with GCC implicitly GPL would doom GCC.  TBB is the same deal.

## Create an account or sign in to comment

You need to be a member in order to leave a comment

## Create an account

Sign up for a new account in our community. It's easy!

Register a new account

• ## Partner Spotlight

• ### Forum Statistics

• Total Topics
627657
• Total Posts
2978472

• 10
• 12
• 22
• 13
• 33