Hello everybody!
I have written a multipurpose-threadpool based on boost::asio where I can file jobs and receive a handle to abort jobs. When adding a job, I can (must, at the moment) provide a "then" function that is called upon finishing the job. This function gets as argument the handle (to check which job has been done if multiple jobs are added with the same "then" function) and the return value of the job function.
My questions are:
1) Is this thread-safe?
2) Is there optimization potential? For example, do I copy the shared_ptr too frequently? I presume I could move it within my lambda that I pass to post because h is copied within the lambda, but I am not sure.
3) Anything else?
This is the code:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <atomic>
class Handle
{
public:
using SharedPtrBool = std::shared_ptr<std::atomic<bool>>;
Handle() : abort(new std::atomic<bool>(false)) {}
// checks whether handles are equal and whether both are not aborted
bool EqualAndUnaborted(const Handle& rhs) const
{
// any abort flag equal to true?
if (*abort)
return false;
// do pointers equal?
return abort.get() == rhs.abort.get();
}
// aborts thread
void Abort()
{
*abort = true;
}
// get identifier
int GetIdentifier() const
{
static_assert(sizeof(int) == sizeof(decltype(abort.get())), "identifier type (int) size is not equal to pointer size");
return reinterpret_cast<int>(abort.get());
}
private:
SharedPtrBool abort;
friend class ThreadPool;
};
class ThreadPool
{
public:
using SharedPtrBool = Handle::SharedPtrBool;
ThreadPool(int threads)
: work(ioService)
{
for (int n = 0; n < threads; ++n)
threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
~ThreadPool()
{
ioService.stop();
threadPool.join_all();
}
template<typename T, typename U>
Handle AddJob(T func, U then)
{
std::cout << "Job added" << std::endl;
Handle h;
ioService.post([h, func, then](){
then(h, func(h.abort));
});
return h;
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
private:
boost::asio::io_service ioService;
boost::thread_group threadPool;
boost::asio::io_service::work work;
};
Here is some test program (compilable, if you just put it directly below the code above):
Handle h1, h2, h3;
void done(Handle h, int r)
{
if (h.EqualAndUnaborted(h1))
{
std::cout << "First with " << r << std::endl;
}
else if (h.EqualAndUnaborted(h2))
{
std::cout << "Second with " << r << std::endl;
}
else if (h.EqualAndUnaborted(h3))
{
std::cout << "Third with " << r << std::endl;
}
}
int main()
{
ThreadPool pool(4);
h1 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
int m = 0;
for (int n = 0; n < 999999999 && !*abort; ++n)
if (n % 1500 == 0)
++m;
return m;
}, done);
h2 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
int m = 0;
for (int n = 0; n < 9999999 && !*abort; ++n)
if (n % 2000 == 0)
++m;
return m;
}, done);
h3 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
int m = 0;
for (int n = 0; n < 9999999 && !*abort; ++n)
if (n % 1900 == 0)
++m;
return m;
}, done);
boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
h1.Abort();
std::cout << "aborted" << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
}
Thank you very much in advance!