• Advertisement
Sign in to follow this  

Problem with boost::asio::io_service thread pool example

This topic is 981 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I found the below boost::asio::io_service thread pool example. I'm trying to figure out how to setup a simple thread pool job que type system.

#include <boost/shared_ptr.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/move/move.hpp>
#include <iostream>

boost::mutex mutex;

int sleep_print(int seconds)
{
    mutex.lock();
    std::cout << "goint to sleep (" << seconds << ")" << std::endl;
    mutex.unlock();
    boost::this_thread::sleep(boost::posix_time::milliseconds(seconds));
    mutex.lock();
    std::cout << "wake up (" << seconds << ")" << std::endl;
    mutex.unlock();
    return 0;
}

typedef boost::packaged_task<int> task_t;
typedef boost::shared_ptr<task_t> ptask_t;

void push_job(int seconds, boost::asio::io_service& io_service, std::vector<boost::shared_future<int> >& pending_data)
{
    ptask_t task = boost::make_shared<task_t>(boost::bind(&sleep_print, seconds));
    boost::shared_future<int> fut(task->get_future());
    pending_data.push_back(fut);
    io_service.post(boost::bind(&task_t::operator(), task));
}

int main()
{
    boost::asio::io_service io_service;
    boost::thread_group threads;
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < boost::thread::hardware_concurrency() ; ++i)
    {
	    threads.create_thread(boost::bind(&boost::asio::io_service::run,
		    &io_service));
    }

    std::vector<boost::shared_future<int> > pending_data; // vector of futures

    push_job(1000, io_service, pending_data);
    push_job(1001, io_service, pending_data);
    push_job(1002, io_service, pending_data);
    push_job(1003, io_service, pending_data);
    push_job(1004, io_service, pending_data);
    push_job(1005, io_service, pending_data);
    push_job(1006, io_service, pending_data);
    push_job(1007, io_service, pending_data);
    push_job(1008, io_service, pending_data);
    push_job(1009, io_service, pending_data);
    push_job(1010, io_service, pending_data);

    boost::wait_for_all(pending_data.begin(), pending_data.end());

    io_service.stop();
    pending_data.clear();

    system("PAUSE");
	
    return 0;
}

Note the line of code at the bottom. I added that.

io_service.stop();

The problem is without the above stop command, I'll randomly get the below error when the program quits.

Unhandled exception at 0x74b2e733 in consoleTestAp.exe: Microsoft C++ exception: 
boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> > at memory location 0x02a8f11c..

The problem seems to be that even though the jobs have finished, these boost::packaged_task(s), which are shared pointers, are still being referenced by io_service when it goes out of scope. I'm not sure why they are being referenced after the jobs have finished.

 

Could it be that the boost::packaged_task is being used incorrectly? Ideally, I would like to post a job and delete it's task after it's been completed but that doesn't seem to be a reality.

Share this post


Link to post
Share on other sites
Advertisement

I doing more research, I think I found the problem by finding a similar problem when using boost::thread_group. Looks like I needed to allocate boost::thread_group so that I can specifically call it's destructor for clean up. The thread group wants to take responsibility of destructing the thread objects so letting the destructor get called when the program ends doesn't ensure the shared pointer objects are still around for cleanup.

 

Below is the correct code example.

#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/move/move.hpp>
#include <iostream>

boost::mutex mutex;

int sleep_print(int seconds)
{
    mutex.lock();
    std::cout << "goint to sleep (" << seconds << ")" << std::endl;
    mutex.unlock();
    boost::this_thread::sleep(boost::posix_time::milliseconds(seconds));
    mutex.lock();
    std::cout << "wake up (" << seconds << ")" << std::endl;
    mutex.unlock();
    return 0;
}

typedef boost::packaged_task<int> task_t;

void push_job(int seconds, boost::asio::io_service& io_service, std::vector<boost::shared_future<int> >& pending_data)
{
    boost::shared_ptr<task_t> task = boost::make_shared<task_t>(boost::bind(&sleep_print, seconds));
    boost::shared_future<int> fut(task->get_future());
    pending_data.push_back(fut);
    io_service.post(boost::bind(&task_t::operator(), task));
}

int main()
{
    boost::asio::io_service io_service;
    boost::scoped_ptr<boost::thread_group> threads( new boost::thread_group );
    boost::asio::io_service::work work(io_service);

    for (int i = 0; i < boost::thread::hardware_concurrency() ; ++i)
    {
	    threads->create_thread(boost::bind(&boost::asio::io_service::run,
		    &io_service));
    }

    std::vector<boost::shared_future<int> > pending_data; // vector of futures

    push_job(1000, io_service, pending_data);
    push_job(1001, io_service, pending_data);
    push_job(1002, io_service, pending_data);
    push_job(1003, io_service, pending_data);
    push_job(1004, io_service, pending_data);
    push_job(1005, io_service, pending_data);
    push_job(1006, io_service, pending_data);
    push_job(1007, io_service, pending_data);
    push_job(1008, io_service, pending_data);
    push_job(1009, io_service, pending_data);
    push_job(1010, io_service, pending_data);

    boost::wait_for_all(pending_data.begin(), pending_data.end());

    // Stop all thread activity before deleting the thread group
    io_service.stop();

    // Allow threads to clean up while shared task pointers are still in scope.
    threads.reset();

    system("PAUSE");
	
    return 0;
}
Edited by GameCodingNinja

Share this post


Link to post
Share on other sites
Sign in to follow this  

  • Advertisement