implementing asynchronous calls

Started by
12 comments, last by chairthrower 15 years, 9 months ago
I implemented a fairly basic system for asynchronous calls (without return value, so no futures are needed. basically just injecting code into another thread). The purpose is to be able to call a function from one thread and have it executed in another. The system works (it's very basic), but there might be a better way I don't know about (which is why i'm posting here) AsyncCallable.hpp

#ifndef ASYNC_CALLABLE_HPP
#define ASYNC_CALLABLE_HPP

#include <vector>

#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>

class AsyncCallable
{
protected:
	void appendCall(boost::function<void (void)> call);
	void executeCalls();

private:
	typedef std::vector<boost::function<void (void)> > call_vec;
	call_vec callsFront;
	call_vec callsBack;

	boost::mutex callMutex;
};

#endif

AsyncCallable.cpp

#include "AsyncCallable.hpp"

void AsyncCallable::appendCall(boost::function<void (void)> call)
{
	callMutex.lock();
	callsBack.push_back(call);
	callMutex.unlock();
}

void AsyncCallable::executeCalls()
{
	callMutex.lock();
	callsFront.swap(callsBack);
	callMutex.unlock();

	for(call_vec::iterator it = callsFront.begin(); it != callsFront.end(); ++it)
		(*it)();

	callsFront.resize(0);
}

A simple task using AsyncCallable:

class TestCallable : private AsyncCallable
{
public:
	TestCallable() : running(true) {}

	void run()
	{
		std::cout << "Howdy!\n";

		while(running)
		{
			executeCalls();

			std::cout << "Bla\n";
			Sleep(123);
		}

		std::cout << "Bye!\n";
	}

	void saySomething(const std::string& sth)
	{
		appendCall(boost::bind(&TestCallable::_saySomething,this,sth));
	}

	void stop()
	{
		appendCall(boost::bind(&TestCallable::_stop,this));
	}

private:
	bool running;

	void _saySomething(const std::string& sth)
	{
		std::cout << sth << "\n";
	}

	void _stop()
	{
		running = false;
	}
};

Advertisement
You might want to look into boost's asio::io_service.

While it's associated with sockets, it's core is async dispatcher. It contains all the facilities for proper dispatching, custom functor allocators, option to guarantee non-reentrant callback execution, as well as lock-less function dispatch.
Using ASIO for this seems a little extreme imo.
Quote:Original post by l0calh05t
Using ASIO for this seems a little extreme imo.


Why? You're using boost already, and it does exactly the same thing, but covers all the bases.

The canonical example however is exactly the same as yours, except that instead of appendcall, you use service.post()
Well, it *is* one of the larger boost libraries. Anyways, I had a look at the Boost.Asio documentation, and as far as I can tell, it doesn't do quite what I want. What I want is tasks (for example a render task) that can be told to execute a specific (member) function asynchronously, at a point which makes sense for that particular task (for example calls to the renderer like render.resizeViewport() only really makes sense between frames). Correct me if I'm wrong, but as far as I can tell this is not possible with Asio.
Quote:Original post by l0calh05t
Well, it *is* one of the larger boost libraries. Anyways, I had a look at the Boost.Asio documentation, and as far as I can tell, it doesn't do quite what I want. What I want is tasks (for example a render task) that can be told to execute a specific (member) function asynchronously, at a point which makes sense for that particular task (for example calls to the renderer like render.resizeViewport() only really makes sense between frames). Correct me if I'm wrong, but as far as I can tell this is not possible with Asio.


Perhaps what you need is not true asynchronous execution, but simple deferred invocation.

Whether there are any benefits from having additional infrastructure in place is up to you, your original post perhaps sounded more elaborate than that.

The use I was referring to is the following:
#include <boost/asio.hpp>#include <boost/bind.hpp>using std::cout;using std::endl;void resizeViewport(int w, int h){	cout << "Resizing between frames only" << endl;};struct Game {	void updateLogic() { cout << "updateLogic()" << endl; }	void renderFrame() { cout << "renderFrame()" << endl; }	void updateInput() {		cout << "updateInput()" << endl;		if (rand() % 20 == 0) {			service.post(boost::bind(&Game::stop, this));		} else {			service.post(boost::bind(&resizeViewport, 100, 200));		}		if (rand() & 1) service.post(boost::bind(&Game::waitABit, this));	}	void stop() { 		cout << "We're bored, let's quit" << endl;		running = false; 	}	void waitABit() { 		cout << "Wait a bit" << endl;		Sleep(250); 	}	void run() {		running = true;		while (running) {			updateInput();			updateLogic();			renderFrame();// Process pending events// These could be running in their own thread,// it just depends on how run() is invoked			size_t n = service.run();			service.reset();			cout << "Ran " << n << " deferred invocation(s)." << endl;		}	}	boost::asio::io_service service;	bool running;};int main(int argc, char* argv[]){	Game game;	game.run();	return 0;}
No, I actually meant asynchronous execution (or perhaps asynchronous deferred invocation). The render task runs in a separate thread.
the asio lib that has been mentioned supports the assigning of threads - they're called strands and operate as a policy that determine which thread(s) potentially get to call back to on your function. I expect they would be flexible enought to work in the given scenario, but i have never used them so i cant really comment (threads frighten me). alternatively if you want something more lightweight - model it as a message pump with the only distinction being that the messages are bound functions. create a dedicated thread and a container of queued messages (representing partially or fully applied functions) and a dispatch pump that just applies these messages. the boost synchonisation wait primitive can be used to halt the dispatcher when the queue is exhausted and the signal to wake on new enqued messages. you should be able to get it so that these primitives are only needed when the queue is exhausted although the pushing and popping of new items to the queue will need synchronisation protection. you are then free to push these functions from whatever random thread and have them dispatch in the context of the pump thread as needed.
On the Windows platform, IO Completion Ports are what you're looking for. Basically it's a queue of work items that x number of threads are working on at any time. What is nice about this is that the OS' thread scheduler somehow knows to wake up the same thread over and over if it's completed with it's work. This limits your context switching.

On a Mac or Linux, I assume there would be a very similar kernel feature but am unsure.

Happy coding.
If you're interested, here's my version with a futures interface. It also deals nicely with propagating exceptions, passing arguments optimally, supporting reference return values and some other tricky stuff.

// ridiculous example:double calculate_pi(int dp);async::future<double> pi = async::call(excption_handler, calculate_pi, 10);// put the kettle on...double area = 2 * pi.value() * radius;

This topic is closed to new replies.

Advertisement