Threadpool with abortable jobs and then-function

Started by
19 comments, last by corysama 7 years, 7 months ago
I thought the handle came from the boost lib that's why i said it should not be exposed but i was mistaken, sorry. I realised my error when reading your code again.

Anyway, what i like in promises is how they can be chained. It would be nice if your handle could handle that ;)
Advertisement

Yes, this would be a nice extension. :) It's not my handle that does this, though, you can just do it with the then() function. I think in JS you put the then() method to the promise and in my case you put it to the AddJob function... does not make a whole lot of difference to me. But I am very open to be shown wrong!

The obvious question: if this isn't for learning purposes, why aren't you using an existing task library like Intel's Threading Building Blocks or Microsoft's Parallel Programming Library? Writing a safe, easy, and efficient task system is _scary hard_. Anything you write is almost certainly going to be worse than the existing solutions unless you have very particular uncommon constraints.

Sean Middleditch – Game Systems Engineer – Join my team!

I have not had looked into them too much before your post, to be honest. TBB has no license that I can/am willing to use. PPL looks nice but I want to stay OS-independent. I also do not need everything from it and I indeed want to boost my learning with multithreading. My questions above remain

I have not had looked into them too much before your post, to be honest. TBB has no license that I can/am willing to use. PPL looks nice but I want to stay OS-independent. I also do not need everything from it and I indeed want to boost my learning with multithreading. My questions above remain

There is the COM license, but I can understand what you mean.

Anyways, you say -big- loops. What do you mean big loops exactly? If they are simply going through a large list of data and processing, then you can speed it up by dumping the data listing into multiple tasks. That one loop would just be something that would get called in the main thread.

If you're running into problems where a task needs information in order to complete, than you can do a directed graph to sort out your dependencies.

However, if tasks are doing a back and forth sort of deal, then you will need to use Co-routines. Which are basically fibers in windows I think. They can be just as good as regular threads when used correctly. But these do better fit the whole needing to abort bit.

Because most cases, your tasks are so small, that they get completed very quickly.

Actually, having had second look, PPL's GPL has a runtime exception that allows dynamic linking, which (in my eyes) basically promotes it to a LGPL-like license.

Anyways, you say -big- loops. What do you mean big loops exactly? If they are simply going through a large list of data and processing, then you can speed it up by dumping the data listing into multiple tasks. That one loop would just be something that would get called in the main thread.

Yes, a large array of data is walked through but the results of those tasks have to be evaluated again to form the real overall result. This evaluation is not time-consuming, though, and could be done in the main thread.

What do you mean, this main loop would be called from the main thread? It should not be blocking, so you probably just mean spawning many small tasks? I am not sure if you are already assuming some task framework here. (and for your directed graph solution, I believe that you do assume this)

A more "meta" question: Most of all of your suggestions here go into the direction of making smaller tasks and somehow merging the results of those tasks. Do I understand correctly that this is because you would like to cancel the tasks without having to introduce some abort variable directly into the job that is passed, e.g. sharedAbort in


AddJob([](shared_ptr<atomic<bool>> sharedAbort){});

because this solution is intrusive and you would like to inhibit this by any means?

With the way your framework and example are written, notice that it's up to each individual job functor to be a good citizen and periodically check the abort flag. You can implement non-abortable jobs by simply choosing not to check and respond to that flag... However, maybe it should be the other way around.

I would use a layered approach, where your thread-pool / job system itself is not aware of 'aborting' as a concept. Doing so actually removes the need for the Handle class completely, making your job system a lot simpler.
Then, given this 'simple' job system that does one thing (and does it well), you can just as easily implement abortable jobs as another layer on top of it. Same goes for jobs with dependencies, jobs that need to report when they're complete, etc -- all these extra responsibilities can be layered on top of the simple/dumb/reusable job system.

e.g. the simplest example:
        std::atomic<bool> abort = false, completed = false;
    	pool.AddJob([&abort](){
    		int m = 0;
    		for (int n = 0; n < 9999999 && !*abort; ++n)
    			if (n % 1900 == 0)
    				++m;
    		return abort ? -1 : m;
    	}, [&completed](int m){completed = true; done(m);} );
        abort = true;
        while(!completed){}

You mix the abort and a job end event capability into the sheduler which isn't used by the sheduler.

h1 = pool.AddJob([](ThreadPool::SharedPtrBool abort){ int m = 0; for (int n = 0; n < 999999999 && !*abort; ++n) if (n % 1500 == 0) ++m; return m; }, done);

You pass a abort parameter and use this only in the job itself but you never use it in the sheduler which means it's task specific data and you mess up your AddJob function by restricting it to this kind of function. I think it's better to use a void pointer or a parameter class which can be derived from to pass your task specific data.

void pointer has zero dependencies but you can get caught by wrong reinterpret_cast and no possibility for auto cleanup.

A basic class introduce virtual function calls but you can safely use static_cast to get the right data for your task and you can move the data to the thread and let them cleaned up after the execution.

[source='cpp']class BaseTask
{
public:
BaseTask(std::function Callback);
virtual ~BaseTask();
virtual void Execute();

virtual void SendMessage(int Id){};

virtual ThreadPool* GetAssignedThreadPool();

virtual void SetThreadPool(ThreadPool& Instance);
private:
std::function m_Callback;

ThreadPool* m_Pool;
};

class AbortableTask: public BaseTask
{
public:

enum { ABORT_MSG = 0 };
AbortableTask(std::function Callback);
virtual ~AbortableTask() override;

virtual void SendMessage(int Id) override { if (Id == ABORT_MSG) m_Abort = true; }
bool GetAbort()const;
private:
std::atomic<bool> m_Abort;
};

AbortableTask* job = new AbortableTask([](BaseTask& Instance){

AbortableTask& task = static_cast<AbortableTask&>(Instance);
int m = 0;
for (int n = 0; n < 9999999 && !task.GetAbort(); ++n)
if (n % 1900 == 0) ++m;
if (n < 9999999)// triggered abort
std::cout << "First job abort!" << std::endl;

// task should be capable to spawn new tasks to implement divide and conquer strategy which scales really good with different amount of logical processor

BaseTask* jobIsComplete = new BaseTask([](BaseTask& Instance){ std::cout << "First job is complete" << std::endl; });

task.GetAssignedThreadPool()->AddJob(jobIsComplete);

});


// You should pass the job instance to the pool with shared pointer, auto pointer or similar mechanics
auto h1 = pool.AddJob(job);

pool.SendMessage(h1, AbortableTask::ABORT_MSG);[/source]

Using messages instead to of communicating with the job instance directly, because you get in trouble if the job is already gone and cleaned up.

Move the data for a job into the scope of the job because you get a mess up if you have to synchronize arrays and not thread safe classes, it's cache friendly and on top you can free the data after the job is done(in the sheduler).

Your AddJob should not block and you should return only a handle not a object instance and allow only to work with the handle on the public API of your ThreadPool.

If you use real object instance people can do a messup with no longer existing jobs because only the sheduler knows if they are still exists and you need thread safe code which should be handled by the sheduler too.

Hodgman:

Yes, I ended up (for now, still very open to improvement ideas) utilizing this approach. I still need a shared_ptr since I do not join my job-thread / wait for it to end and could at any time cancel it, which could be just after the thread has finished. But it seems to be a bit more clean to have it the way that my scheduler is really very thin. Frameworks like PPL, however, have some quite invasive abort token, anyways. I think, the way it ends a thread is even with exceptions which I am not sure if I am a fan of in this context. Would you also suggest that they redo their concept a bit?

TAK2004:

I like your idea of moving the responsibility for aborting a task into a Task instead of having access to the abort flag. However, if the user has to cast in order to prevent using a virtual method, it does seem even more intrusive than just passing an atomic bool into it. Also, it would be very easy to just call the virtual methods when a downcast would prevent us from using them. If we cast anyway, we could just remove the virtual keywords, could we not?

Also, I would really be interested in your synchronization of SendMessage in ThreadPool. You have to make sure there that the job still exists and otherwise discard the message. You say that the scheduler can clean up the thread after execution but how? Would you do this in the event loop of the scheduler, thus omitting the current boost::asio way of scheduling like I do?

The main takeaway from your code for me is: Use messages instead of direct handlers and I like that a lot! Thank you!

Hodgman:

Yes, I ended up (for now, still very open to improvement ideas) utilizing this approach. I still need a shared_ptr since I do not join my job-thread / wait for it to end and could at any time cancel it, which could be just after the thread has finished. But it seems to be a bit more clean to have it the way that my scheduler is really very thin. Frameworks like PPL, however, have some quite invasive abort token, anyways. I think, the way it ends a thread is even with exceptions which I am not sure if I am a fan of in this context. Would you also suggest that they redo their concept a bit?

TAK2004:

I like your idea of moving the responsibility for aborting a task into a Task. However, if the user has to cast in order to prevent using a virtual method, it does seem even more intrusive than just passing an atomic bool into it.

Also, I would really be interested in your synchronization of SendMessage in ThreadPool. You have to make sure there that the job still exists and otherwise discard the message. You say that the scheduler can clean up the thread after execution but how? Would you do this in the event loop of the scheduler, thus omitting the current boost::asio way of scheduling like I do?

The main takeaway from your code for me is: Use messages instead of direct handlers and I like that a lot! Thank you!

You don't cast to prevent the virtual method, I just put the Abort mechanic in a special task because not every task have to be canceled.

In game and tool programming the minority of tasks need this feature and the base class should be as simple and slim as possible.

You can put the abort in the BaseTask if you prefere it this way.

In the future you will encounter tasks which need a couple of data and/or have to give back results.

Then you can derive from BaseTask and put all the pointer, references and values in the task and no longer care take care of the ownership because the task take it.

A good example is the processing the meshes of a model. You create a BoundingVolumeTask class which contains a pointer to the vertices, offset and element count.

You set the references in task to the real min and max vectors you want to get back, then set the vertex pointer, offset and count and add the job.

Your producer don't care about offset and count and don't need to hold the variables somewhere till the job is done and this simplifies your code.

[source='cpp']class BoundingVolumeTask: public BaseTask
{
public:
void SetMesh(float* Data, size_t Start, size_t, Count);
void SetMinMaxResult(vec3f& Min, vec3f&Max);
virtual Execute()override{
if (task.m_Count > 33)
{
BoundingVolumeTask* subTask = new BoundingVolumeTask();
subTask->SetMesh(m_Data, m_Start, m_Count >> 1);
subTask->SetMinMaxResult(*m_Min, *m_Max);
GetAssignedThreadPool()->AddJob(subTask);
BoundingVolumeTask* subTask2 = new BoundingVolumeTask();
subTask2->SetMesh(m_Data, m_Start + (m_Count >> 1), m_Count >> 1);
subTask2->SetMinMaxResult(*m_Min, *m_Max);
GetAssignedThreadPool()->AddJob(subTask2);
}
else
{
vec3f min=vec3f::INF,max=vec3f::NEG_INF;
for (size_t i = m_Start; i < m_Start+ m_Count; ++i)
{
min.x = min.x > m_Data[i*3] ? m_Data[i*3] : min.x;
max.x = max.x > m_Data[i*3] ? m_Data[i*3] : max.x;
min.y = min.y > m_Data[i*3+1] ? m_Data[i*3+1] : min.y;
max.y = max.y > m_Data[i*3+1] ? m_Data[i*3+1] : max.y;
min.z = min.z > m_Data[i*3+2] ? m_Data[i*3+2] : min.z;
max.z = max.z > m_Data[i*3+2] ? m_Data[i*3+2] : max.z;
}
m_Min.AssignSmallest(min);// spinlocks with atomic foo
m_Max.AssignLargest(max);// spinlocks with atomic foo
}
}
private:
vec3f *m_Min,*m_Max;// need sync
float* m_Data;// need sync
size_t m_Start;
size_t m_Count;
}

vec3f min,max;
BoundingVolumeTask* getAABB = new BoundingVolumeTask();
getAABB.SetMinMaxResult(min,max);
getAABB.SetMesh(model.GetVertices().GetPtr(), 0, model.GetVertices().Size());
pool.AddJob(getAABB);[/source]

This is a more complex example(not complete) which split the current task into 2 new tasks till the task is small enought and jumps into the processing.

It makes no sence to use a lambda expression because this task is special and it's easier to override the Execute method.

You can make it even more complicated by expecting that the model and min,max instances can vanish during the execution or it could be that you use the min/max to early.

Then you need a job counter(atomic int) which increase for each created task and decrease on scope end of Execute() and check the counter before access min/max.

Sync the model with the task needs to send the Abort message and/or wait till the job counter reach 0 and block the removal of the model.

It seems I used the wrong words. You should abort a task not a thread.

The thread pool should create for each logical processor a thread at the creation and throw them away at process end.

I abort/cancel threads only in the internal API.

My public API only support to pause the whole sheduler in different ways(gentle, forcefull, blocking, disable).

https://github.com/tak2004/RadonFramework/blob/master/include/RadonFramework/Threading/ThreadPool.hpp

https://github.com/tak2004/Organesson/blob/master/code/console/console.cpp

Abort a task on the other side is done in userspace because I rarely used it and wasn't worth it to put it into a class.

To send a message to a task you should create a message queue for each worker thread which can processed by the task.

GetAbort e.g. can look into the queue and process all messages which are assigned to it's instance.

This way you only increase the workload for the tasks which use this feature because other task will never process them and throw them away after they are done.

If you want to remove a job you have to go through the job list and can get pretty messy depending on your implementation of the job list and this will slow down your sheduler in general.

This topic is closed to new replies.

Advertisement