Task Scheduler

Started by
16 comments, last by void0 12 years, 6 months ago
Heya,

I'm working on a generic task scheduler that I can use in my engine to do all sorts of asynchronous jobs such as resource loading, and continuous jobs such as running update loops. I've got a basic working system so far but would like to get some constructive criticism on the design and help on a couple of things.

Current problems I'm aware of and would like to fix but need help on:

- Duplicate code in the Task classes. Would like to have Task as base class and TaskNotify as derived but kept getting template errors I couldn't decipher.
- I'd like to somehow make the callbacks more robust and allow them to accept the return value of the task. I couldn't quite figure out how to do this.



The scheduler and task classes below make heavy use of boost threads/futures/packages and threadpool.

TaskScheduler
[source lang=cpp]

#ifndef _TASK_SCHEDULER_H_
#define _TASK_SCHEDULER_H_

#include "boost/threadpool.hpp"
#include "boost/thread.hpp"
#include "boost/utility/result_of.hpp"
#include "boost/shared_ptr.hpp"

#include "Task.h"

class TaskScheduler : private boost::noncopyable
{
public:

TaskScheduler ( uint32 n = boost::thread::hardware_concurrency() ) : m_ThreadPool(n)
{

}

template <typename T>
typename T::Future Submit(T& t)
{
boost::threadpool::schedule(m_ThreadPool, boost::bind(&T::operator(), t));
return t.future();
}

template<typename F>
typename Task<typename boost::result_of<F()>::type>::Future Submit (const F& f)
{
Task<typename boost::result_of<F()>::type> t(f);
return Submit(t);
}

template<typename F, typename C>
typename TaskNotify<typename boost::result_of<F()>::type, typename boost::result_of<C()>::type>::Future Submit (const F& f, const C& c)
{
TaskNotify<typename boost::result_of<F()>::type, typename boost::result_of<C()>::type> t(f,c);
return Submit(t);
}

private:

boost::threadpool::pool m_ThreadPool;
};

#endif[/source]

Tasks
[source lang=cpp]


#ifndef _TASK_H_
#define _TASK_H_

template <typename R>
class Task
{
public:

typedef typename boost::unique_future<R> Future;

protected:

typedef boost::packaged_task<R> PackagedTask;
typedef boost::shared_ptr<PackagedTask> PackagedTaskPtr;

public:

template <typename F>
explicit Task(const F& f) : m_Task(new PackagedTask(f))
{
}

void operator()()
{
(*m_Task)();
}

Future future()
{
return m_Task->get_future();
}

protected:

PackagedTaskPtr m_Task;
};


template <typename R, typename Callback = void>
class TaskNotify
{
public:

typedef typename boost::unique_future<R> Future;

protected:

typedef boost::packaged_task<R> PackagedTask;
typedef boost::shared_ptr<PackagedTask> PackagedTaskPtr;

typedef boost::packaged_task<Callback> PackagedCallback;
typedef boost::shared_ptr<PackagedCallback> PackagedCallbackPtr;

public:

template <typename F, typename C>
explicit TaskNotify(const F& f, const C& c) : m_Task(new PackagedTask(f)), m_Callback(new PackagedCallback©)
{
}

void operator()()
{
(*m_Task)();
(*m_Callback)();
}

Future future()
{
return m_Task->get_future();
}

protected:

PackagedCallbackPtr m_Callback;
PackagedTaskPtr m_Task;
};

#endif
[/source]

Main/Tests
[source lang=cpp]

#include "stdafx.h"
#include <iostream>
#include <conio.h>
typedef unsigned int uint32;
#include "TaskScheduler.h"

static boost::mutex mutex;
void func(int i)
{
boost::mutex::scoped_lock lock(mutex);
std::cout << "i = " << i << std::endl;
}

int fib(int x)
{
if (x == 0)
return 0;

if (x == 1)
return 1;

return fib(x-1)+fib(x-2);
}

void fib1Done()
{
std::cout << "Fib 1 Done" << std::endl;
}

void fib2Done()
{
std::cout << "Fib 2 Done" << std::endl;
}

int _tmain(int argc, _TCHAR* argv[])
{
TaskScheduler s;

// Simple test of ordered tasks
Task<void> task(boost::bind(&func, 1));
Task<void>::Future a = s.Submit(task);
Task<void>::Future b = s.Submit(boost::bind(&func, 2));
Task<void>::Future c = s.Submit(boost::bind(&func, 3));

a.wait();
b.wait();
c.wait();

// Callback test
TaskNotify<int> fibTask(boost::bind(&fib, 40), boost::bind(&fib1Done));
TaskNotify<int>::Future fibby1 = s.Submit(fibTask);

TaskNotify<int>::Future fibby2 = s.Submit(boost::bind(&fib, 30), boost::bind(&fib2Done));

_getch();
return 0;
}
[/source]
Advertisement
Here is my approach:


  • boost::asio::io_service for the thread pool instead of the (unofficial) boost::threadpool
  • boost::function for the callback type, just do callback(future.get()) and your issue #2 is solved.
  • Single templated T::future_type submit(T&) for both task and task_notify (or any class defining future_type and operator() really). Your compiler errors probably due to not passing template argument as boost::ref(t). (boost::unique_future<> not copy-constructable).


task and task_notify

[source lang=cpp]
template <typename R>
class task {
public:
typedef boost::packaged_task<R> task_type;
typedef boost::shared_ptr<task_type> task_ptr;
typedef boost::unique_future<R> future_type;

template <typename F>
task(F f)
: ptask(boost::make_shared<task_type>(f)),
future(ptask->get_future())
{}

void operator()()
{ (*ptask)(); }

future_type& get_future()
{ return future; }

R get()
{ return future.get(); }

bool ready() const
{ return future.is_ready(); }

private:
task_ptr ptask;
future_type future;
};

template <typename R>
class task_notify : public task<R> {
public:
typedef boost::function<void (R)> callback_type;

template <typename F>
task_notify(F f, callback_type cb)
: task(f), callback(cb)
{}

void operator()()
{
task<R>::operator()();
callback(get());
}

private:
callback_type callback;
};
[/source]

task_scheduler

[source lang=cpp]
class task_scheduler {
public:
task_scheduler()
: work(io_service)
{
std::size_t ncores = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < ncores; ++i) {
threads.create_thread(boost::bind(&task_scheduler::run, this));
}
}

~task_scheduler()
{
io_service.stop();
threads.join_all();
}

template <typename T>
typename T::future_type& submit(T& t)
{
io_service.post(boost::bind(&T::operator(), boost::ref(t)));
return t.get_future();
}

private:
void run() {
io_service.run();
}

boost::asio::io_service io_service;
boost::asio::io_service::work work;
boost::thread_group threads;
};
[/source]

Main/Tests

[source lang=cpp]

#if _WIN32
#pragma comment (linker, "/ENTRY:mainCRTStartup")
#pragma comment (linker, "/SUBSYSTEM:CONSOLE")
#endif

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/make_shared.hpp>

#include <iostream>

// include: task
// include: task_notify
// include: task_scheduler

int fib(int x)
{
if(x == 0) return 0;
if(x == 1) return 1;
return fib(x-1)+fib(x-2);
}

void fib30_done(int result) {
std::cout << "fib30_done, fib(30) = " << result << std::endl;
}

int main(int argc, char *argv[])
{
task_scheduler ts;

task<int> t1(boost::bind(&fib, 10));
task<int> t2(boost::bind(&fib, 20));
task_notify<int> t3(boost::bind(&fib, 30), boost::bind(&fib30_done, _1));

ts.submit(t1);
ts.submit(t2);
ts.submit(t3);

std::cout << "fib(10) = " << t1.get() << std::endl;
std::cout << "fib(20) = " << t2.get() << std::endl;
std::cout << "fib(30) = " << t3.get() << std::endl;

getchar();
return 0;
}
[/source]
Oh nice! That's pretty elegant and the kind of where I wanted to head :)

I had thought about boost::asio but wasn't sure if it was overkill or the right tool for the job but it seems to fit in nicely.

Thanks!
You're welcome.

Although my code should be considered "proof-of-concept" as I omitted proper copy semantics and what not. Also my task<> and task_notify<> objects are not allowed to go out of scope. I suspect that with your shared_ptr<packaged_task> that the idea was so task objects could go out of scope and still complete asynchronously through the packaged_task?

You inspired me to try a similar design in my engine. I wonder if its best to pass around the task scheduler to all objects that wish to create tasks, or just have the task scheduler static in the task objects. The latter approach sure is tempting, just put a task on the stack and forget about it (unless you need the return value, of course). Another idea is to just have a global 'schedule_task()' function that takes a function object and returns a future. No need to worry about the task scheduler or task objects. On the other hand, I want to avoid globals, but a task scheduler sure seems like a good candidate as you don't want to have more than one task scheduler (compare to Intel TBB).
Thoughts?
I haven't actually tested the case of the Task objects going out of scope yet, but my plan was to allow them to just be put on the stack and forgot about and if you needed the return value then you would use a task notifier. The idea of the task notify object was mostly to fit in with resource loading. I wanted to be able to ask the resource manager for a resource and have it return a dummy resource straight away, and then have it push a file load task into the scheduler and when the task was done it would use the callback to post the file data back to a resource loader which would set up the dummy resource with real data.

So I think the task objects definitely need to be able to go out of scope but still have something sharing them until the task is finished and then it can be destroyed.

I was recently thinking about how the scheduler should be managed and was thinking that there is probably only a few major places within the engine that would need to be able to create tasks and so I thought it'd be fine just passing a shared_ptr around. So far I can see only the engine core and resource manager requiring it. I did also think about whether it should be a Singleton and just have global access to it but I've been trying to sway away from Singletons as it seems to be shunned upon these days.

Here is my approach:

  • boost::asio::io_service for the thread pool instead of the (unofficial) boost::threadpool
  • boost::function for the callback type, just do callback(future.get()) and your issue #2 is solved.
  • Single templated T::future_type submit(T&) for both task and task_notify (or any class defining future_type and operator() really). Your compiler errors probably due to not passing template argument as boost::ref(t). (boost::unique_future<> not copy-constructable).


Hi folks,

I agree with using ASIO in comparison to the original system. Given that ASIO is written against a generally very well scaling system on a per OS level (IOCompletions for Win32, KQueue bsd/osx, EPoll linux etc) it is a very good and well debugged starting point. I would suggest avoiding the futures system for "small" tasks and prefer the ASIO strands in this case, the performance seemed measurably higher with pretty much the same usable behavior.

The down side to ASIO for threading is that lots of anything "small" will suck in terms of OS overhead. Unfortunately all the variations of ASIO are, for all intents and purposes, central queues which means adding/removing tasks from a central location which means you hit Ahmdahl's law in a very big hurry if you run too much through it too fast. As far as the suggested usage goes, it should be completely fine, just don't push too much further as tempting as it may be. I use ASIO in systems which have "NOTHING" to do with IO just because it is a great way to create an event driven framework for any number of things. Unfortunately though, it is very tempting to push game related items through it and that will be a mistake. Been there, done that, kernel times go apeshit and performance drops off very quickly. Just don't go there. :)

/../ Unfortunately though, it is very tempting to push game related items through it and that will be a mistake. Been there, done that, kernel times go apeshit and performance drops off very quickly. Just don't go there. :)


Agreed. Use it for long running or computationally heavy tasks (like resource loading). Even not considering performance, using it for game related items is a mistake as those tasks would block waiting for the long running tasks to finish (only 2 tasks at once on a dual-core, for example). For game related items I'd probably use coroutines or similar to create an event driven system that runs from the main thread.
I added some code which allows you to create a task and forget about it by using shared pointers. It's technically not a "create on stack and forget about" but I don't really see any other way of doing it.

In the task scheduler I added this function:
[source lang=cpp]

template <typename T>
typename T::future_type& submit(boost::shared_ptr<T>& t)
{
io_service.post(boost::bind(&T::operator(), t));
return t->get_future();
}
[/source]

The test is then simply:
[source lang=cpp]


void outofscopetest(task_scheduler& ts)
{
boost::shared_ptr<task_notify<int> > fibTask(new task_notify<int>(boost::bind(&fib, 35), boost::bind(&fibDone, _1)));
ts.submit(fibTask);
}
[/source]

In terms of design I'm not sure whether the other submit function should be removed and all tasks should be added as shared pointers or both should be kept to allow for stack and heap allocated tasks. It might be best to force shared pointers only to save confusion and user error.

Thoughts?
My question would be; why are you doing things via shared_ptr/pointers at all?

Why not accept a copy of the functor which removes any life time issues from the task itself...?
phantom is right, there should be no need for shared_ptr for the task itself. It is however required for packaged_task to be a shared_ptr because io_service completion handlers must be copyable.

I made the following changes to my code:


  1. task objects now allowed to go out of scope
  2. task_sheduler::submit receives copy of functor and posts copy to io_service::post
  3. Removed task_notify class. Reason for this is I got compilation errors ([color="#FF0000"]*) and couldn't quite figure out why. I think the end result is better anyway.
  4. Callback is now second constructor to task<>. boost::function::empty used to decide if we should callback.


[color="#FF0000"]* error C2664: 'void boost::detail::future_object<T>::mark_finished_with_result(const int &)' : cannot convert parameter 1 from 'void' to 'const int &' c:\boost\include\boost-1_45\boost\thread\future.hpp 1224



DISCLAIMER: Beware of bugs in the code below. Still proof of concept :)

Full source:

[source lang=cpp]
#if _WIN32
#pragma comment (linker, "/ENTRY:mainCRTStartup")
#pragma comment (linker, "/SUBSYSTEM:CONSOLE")
#endif

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/make_shared.hpp>

#include <iostream>

template <typename R>
class task {
public:
typedef boost::packaged_task<R> task_type;
typedef boost::shared_ptr<task_type> task_ptr;
typedef boost::shared_future<R> future_type;
typedef boost::function<void (R)> callback_type;

template <typename F>
task(F f)
: ptask(boost::make_shared<task_type>(f)),
future(ptask->get_future())
{}

template <typename F>
task(F f, callback_type cb)
: ptask(boost::make_shared<task_type>(f)),
future(ptask->get_future()),
callback(cb)
{}

void operator()()
{
(*ptask)();
if(!callback.empty()) callback(get());
}

future_type get_future()
{ return future; }

R get()
{ return future.get(); }

bool ready() const
{ return future.is_ready(); }

private:
task_ptr ptask;
future_type future;
callback_type callback;
};

class task_scheduler {
public:
task_scheduler()
: work(io_service)
{
std::size_t ncores = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < ncores; ++i) {
threads.create_thread(boost::bind(&boost::asio::io_service::run,
boost::ref(io_service)));
}
}

~task_scheduler()
{
io_service.stop();
threads.join_all();
}

template <typename T>
typename T::future_type submit(T t)
{
io_service.post(t);
return t.get_future();
}

private:
boost::asio::io_service io_service;
boost::asio::io_service::work work;
boost::thread_group threads;
};

int fib(int x)
{
if(x == 0) return 0;
if(x == 1) return 1;
return fib(x-1)+fib(x-2);
}

void async_fib_done(int x, int result) {
std::cout << "fib("<<x<<") = " << result << std::endl;
}

task<int>::future_type async_fib(task_scheduler& ts, int x) {
task<int> t(boost::bind(&fib, x));
return ts.submit(t);
}
task<int>::future_type async_fib_notify(task_scheduler& ts, int x) {
task<int> t(boost::bind(&fib, x), boost::bind(&async_fib_done, x, _1));
return ts.submit(t);
}

int main(int argc, char *argv[])
{
task_scheduler ts;

task<int>::future_type f1 = async_fib(ts, 10);
task<int>::future_type f2 = async_fib_notify(ts, 30);

std::cout << "fib(10) = " << f1.get() << std::endl;
// .. fib(30) printed from callback ..

getchar();
return 0;
}
[/source]

This topic is closed to new replies.

Advertisement