Jump to content

  • Log In with Google      Sign In   
  • Create Account





A guide to getting started with boost::asio

Posted by Drew_Benton, 31 January 2011 · 190,048 views



3. Giving io_service some work to do

Now we can finally get to doing the real work! We will be reusing the previous example as our base, so our examples are multi-threaded ready. If the io_service is the brain and heart of the boost::asio library, the io_service member functions post and dispatch would be the arms and legs. The post function "is used to ask the io_service to execute the given handler, but without allowing the io_service to call the handler from inside this function." The dispatch function "guarantees that the handler will only be called in a thread in which the run(), run_one(), poll() or poll_one() member functions is currently being invoked. The handler may be executed inside this function if the guarantee can be met."

So the fundamental difference is that dispatch will execute the work right away if it can and queue it otherwise while post queues the work no matter what. Both of the functionality are really important as the function we will use will depend on the context that it is being used in. Remember earlier the remarks about how the internals of the work class worked? If the work class were to use dispatch over and over, it might be possible the work never finished for a poll call, but if the work called post, it could.

Let us get started! We will start out with a simple Fibonacci calculation. To make things more interesting we will add in some time delays to show the true nature of the power of boost::asio. We will also reduce the number of worker threads to just 2. The actual value that we will want to use in a multi-threaded program will vary depending on a number of factors, but that will be talked about later.

Example 3a
Spoiler


In this example, starting in main, we post 3 function objects to the io_service via the post function. In this particular case, since the current thread does no call the io_service run or poll function, dispatch would also call the post function and not execute the code right away. After we give the io_service work, through post, we reset the work object to signal once the work has been completed that we wish to exit. Finally, we wait on all the threads to finish as we have with the join_all function.

Our fib function simply calculates the sequence but we add in a time delay to slow things down to see our worker threads in action! We ultimate have to wrap the call with CalculateFib since we care about the return value and we want to see extra debugging information about when the function actually starts and completes.

Running the program, we should see the first two worker threads start on the first two units of work and once one worker thread has finished, it takes up the third unit of work. Once all work has been finished, the program exits.

Congratulations! We have now completed our first job! That was not so bad was it? Our program structure for working with boost::asio will be pretty generic overall. We can setup reusable worker threads to build up a pool of workers and when we send work to the io_service, it simply does it when it can. That is our basic example, let us consider another.

In this example, we show the difference between post and dispatch and how it can get us into trouble if we are not careful with what we do! We will use only one worker thread this time.

Example 3b
Spoiler


If we run the program, we should see the problem here. We wanted an in order display of events, but instead it was out of order. This is because dispatch was used for some events and post for others. Dispatched events can execute from the current worker thread even if there are other pending events queued up. The posted events have to wait until the handler completes before being allowed to be executed. Keep this in mind when programming we can easily code ourselves into serious bugs if we depend on the order of such events!

It should also be noted that if we had more than one worker thread, we would actually get the expected results because of the sleep call, but the problem remains still. If we removed the sleep, we might get any order of output depending on who grabbed the lock mutex first. For example, running the program without the sleep one example output received was 0, 2, 1, 4, 3, 5. We have to be aware of such things when programming at this level so we do not get fooled by "correct" output that was simply a result of having setup our program in such a way that it was possible. These types of bugs are the hardest to track down once they happen so it is imperative we fully understand the API that we are using first before diving too deep in.

That pretty much wraps up, no pun intended, how we will pass our work to the io_service object. We simply setup our program to process the work via poll or run how we need and then we can call dispatch or post as needed. There is a lot of cool stuff we can do using these concepts now! At this point, we can now get into the other useful aspects of the boost::asio library.





Attached Files






I have not read it all but it looks to be a great help. I was just starting to look into boost:asio for networking, in my current project, and this will give me a good start. Thanks!
Thank you for that article! I'm already using boost::asio (because it's awesome) but I wasn't aware of the "unordered vs ordered" issue regarding work being serialized through a strand. Please correct me if I'm wrong but as far as I understand it I can ensure ordering in example 4b when I change those lines...
io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );
...like this...
strand.post( boost::bind( &PrintNum, 1 ) );
..., right?!

Thank you for that article! I'm already using boost::asio (because it's awesome) but I wasn't aware of the "unordered vs ordered" issue regarding work being serialized through a strand. Please correct me if I'm wrong but as far as I understand it I can ensure ordering in example 4b when I change those lines...

io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );
...like this...
strand.post( boost::bind( &PrintNum, 1 ) );
..., right?!


Correct! You have to post through the strand itself to ensure explicit ordering whereas if you just wrap, you ensure serialization through that strand (with no guarantees to the actual order).

I too was not really aware of that issue until it popped up in one of my programs and boy was it a pain to track down. I mean the docs do explain this clearly, but it was something I just completely misinterpreted so I am making a point to everyone to be careful about it and closely re-read the docs!

Note that in the following case:

async_op_1(..., s.wrap(a));
async_op_2(..., s.wrap( b );
the completion of the first async operation will perform s.dispatch(a), and the second will perform s.dispatch( b ), but the order in which those are performed is unspecified. That is, you cannot state whether one happens-before the other. Therefore none of the above conditions are met and no ordering guarantee is made.




In example 4b, async_op_1 would be "io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );" and async_op_2 would be"io_service->post( strand.wrap( boost::bind( &PrintNum, 2 ) ) );". Hindsight is 20/20 but it's only after you make the mistake or someone brings it up that it really stands out, assuming you didn't understand it before (which I'm sure a lot of more veteran programmers and docs readers wouldn't make that mistake, but we are all human after all :)).
Hi !! Great article, very informative :)

I've previously written an IOCP networking engine using Windows IOCP api which implemented per-connection protocol-handling via a pluggable abstract eventsink class, your implementation reminds me a lot of my iocp framework, with the exception of the event dispatching...

I'm a little confused about the final example implementations, all of a sudden the worker thread pool is gone?
I'd like to see the next exciting episode implementing thread-pooling - am I correct to assume that would belong in the Hive class, or a derived MyHive ?? If you could clarify this, I would be grateful.

I'd also like to see a wrapper class which implements both Client and Server functionality under one hood, I can think of several situations where it is desirable for a server application to make outbound connections. A simple socksv4 proxy server would make an excellent example implementation ;)
I think I see a small problem:<br><br>When I examine the debug output, it states that 18 bytes were sent, but only 12 were echoed back - depite the hex output showing the correct amount of 18 bytes in both cases.<br>When I comment out the 'hex output' code in the OnSend and OnRecv handlers, both sides of the connection correctly report that 18 bytes were echoed.<br>This is despite the client and server being executed in separate applications, and despite the global lock on debug output, so it seems not to be a threading issue.<br><br>What could be causing the buffer vector's length to be manipulated in this way?<br><br>
Yeah ,&nbsp; in MyConnection.OnRecv and OnSend, I replaced this:<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; std::cout &lt;&lt; "[" &lt;&lt; __FUNCTION__ &lt;&lt; "] " &lt;&lt; buffer.size() &lt;&lt; " bytes" &lt;&lt; std::endl;<br><br>with the following:<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; char blah[50];<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; sprintf_s(blah,"%d bytes\n",buffer.size());<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; std::cout &lt;&lt; "[" &lt;&lt; __FUNCTION__ &lt;&lt; "] " &lt;&lt; blah;<br><br>and now it always correctly reports the 'packet size'.<br><br>I still don't understand why it was displaying incorrectly, any ideas?<br><br>

I'm a little confused about the final example implementations, all of a sudden the worker thread pool is gone?


Yes, simply because a client application such as was shown in 9b does not need really one. ;) In that case, I wanted a simple example that was able to be exited with a keypress (sorry, windows only example!) and did not use the thread pool because it was not needed. Sorry I didn't clarify this, but it was just personal preference for that example.

I'd like to see the next exciting episode implementing thread-pooling - am I correct to assume that would belong in the Hive class, or a derived MyHive ?? If you could clarify this, I would be grateful.


There's not much more to expand with the thread stuff. The custom Hive class would be for extending the object with your own methods as needed so it's all wrapped up into one object. You can then use boost::dynamic_pointer_cast to change the shared_ptr base type into the derived type.

I'd also like to see a wrapper class which implements both Client and Server functionality under one hood, I can think of several situations where it is desirable for a server application to make outbound connections. A simple socksv4 proxy server would make an excellent example implementation ;)


More examples are on the way with some fixes to the network wrapper code. I am actively using the code in quite a few different setups so I'll cover all the practical bases. The main use of my code right now is actually similar to what you are requesting and that is a proxy. Needless to say, I feel it works very nice overall in practice.
Nice stuff :). Although i already knew alot of the stuff discussed here I think it is a nice guide for people who are just starting. Nicely done :)
I would love to see a nicely written (i.e. latexified) PDF version of this article. It's really great.
First of all, I really like this tutorial. It discusses the basics of boost::asio very nicely.
But, the are several things I would like to change in your code. There are other opportunities which allows us to create more readable and more performant code. I've created an example and documentated the most changes I made:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>

#include <iostream>


void worker_thread(boost::asio::io_service&, boost::mutex&);


int main()
{
	boost::mutex              	mutex;
	boost::asio::io_service   	io_service;
	boost::asio::io_service::work work(io_service);

	// Please note: You don't need to lock access to `std::cout`, because no other thread is concurrently
	//          	trying to modifying the object
	std::cout << "thread [" << boost::this_thread::get_id() << "] press [any key] to exit." << std::endl;

	boost::thread_group worker_threads;
	for (int i = 0; i < 4; ++i)
	{
    	// Please note: You can pass references to the worker_thread() function even if you want to used it
    	//          	with an function object created by boost::bind(). The only thing you need to do is
    	//          	wrapping your object in a reference_wrapper by using boost::ref or boost::cref.
    	//          	However, you don't need to manage any global object any more and don't worry about
    	//          	copy constructibility.
    	worker_threads.create_thread(
        	boost::bind(worker_thread, boost::ref(io_service), boost::ref(mutex)));
	}

	// Now there are several threads running in the background and we would need to guard the access to
	// any global object used in our worker_thread() function.

	std::cin.get();

	io_service.stop();

	worker_threads.join_all();

	return 0;
}


void worker_thread(boost::asio::io_service& io_service, boost::mutex& mutex)
{
	{
    	// We can limit the scope of our lock_guard. So, it's much easier to see which code locations
    	// are guarded.
    	boost::lock_guard<boost::mutex> lock(mutex);
    	std::cout << "thread [" << boost::this_thread::get_id() << "] start" << std::endl;
	}

	io_service.run();

	{
    	boost::lock_guard<boost::mutex> lock(mutex);
    	std::cout << "thread [" << boost::this_thread::get_id() << "] finish" << std::endl;
	}
}
Great article!

You mentioned the fast delegates, is there a way to use them togheter with boost thread and the asio io_service? I've been trying to mess around with the last exapmle on the 4th page.. but none of the 3 fast delegate implementations seem to be able to bind like boost does so I'm not able to supply arguments when I post or dispatch jobs.. I'm able to use the fast delegates if I use the one that doesnt have arguments though..
Hi all,
is there a printable version of this guide?

Thanks
Little note. In example Example 2e you can make it more concise using std:ref. Then example will look like

void WorkerThread( boost::asio::io_service & io_service )
{
std::cout << "Thread Start\n";
io_service.run();
std::cout << "Thread Finish\n";
}
int main(int argc, char **argv)
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::cout << "Press [return] to exit." << std::endl;
boost::thread_group worker_threads;
for( int x = 0; x < 4; ++x )
{
  worker_threads.create_thread( boost::bind( &WorkerThread, std::ref(io_service) ) );
}
std::cin.get();
io_service.stop();
worker_threads.join_all();
return 0;
}
Also in sample 8a strand created but never used.
I was really enjoying this guide until you dropped that network wrapper on us as an abrupt end. Boost is best understood in small chunks because it's not readable in my opinion. The network wrapper doesn't even follow what you've been demonstrating up to this point. What the heck is a Hive? This is like a bad ending to a good anime.

It was the best post i've been found since I start studying boost.asio, considering I'm a newbie in this subject. I would like to congratulate you, my friend, and also thanks for the great post. I'm thinking about make a simpler version of this subject in portuguese, and I would like to know whether you let me show some samples from here - I'll include source, of course.

 

Thanks.

Okay, I will admit I am refreshing some rusty C++, some familiarity with boost and template development, but it's been a little while. That being said, not looking for the refresher course as has been mentioned in the early going. I do have a question, I am designing a "peripherals" network infrastructure, ultimately will go on an embedded device, but for now want to expose the peripherals of interest to a Windows host program.

 

Basically, we'll have one io_service (probably) per peripheral (could be two if we need for control and data (response) sockets to be different. That keeps the IO concerns neatly separated, if it doesn't get too busy in the Asio internals.

 

So... What is this concept of "work" and "strands" I am reading about? Writing is simple enough, but the asymetry of reading, blocking or polling, whether to go with asynchronous reading. I assume it's to parse through whatever response protocol we receive? Then do something with it, submit to an event broker or something like that, that a peripheral response has been received?

 

Need a little help grasping that magic if you will. Thanks!

I am a little confused by this one. What real work is being done here? In other words, so the thought crosses my mind, "so we bound the worker thread, and so we pass the io_service in as a paramter... so what?". In other words, what real work is being done here? Or when would it be appropriate to do so?

 

Taking a step back, there's really still the "simple" use case, right? Write some data, control, request, whatever, to a server. Read some data, control, response, whatever, from a client (or could be as a client to another server)?

 

Little note. In example Example 2e you can make it more concise using std:ref. Then example will look like
 

void WorkerThread( boost::asio::io_service & io_service )
{
std::cout << "Thread Start\n";
io_service.run();
std::cout << "Thread Finish\n";
}
int main(int argc, char **argv)
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::cout << "Press [return] to exit." << std::endl;
boost::thread_group worker_threads;
for( int x = 0; x < 4; ++x )
{
  worker_threads.create_thread( boost::bind( &WorkerThread, std::ref(io_service) ) );
}
std::cin.get();
io_service.stop();
worker_threads.join_all();
return 0;
}

Ah! Now I grok! Or I am starting to. The examples here are enlightening to me. So the coupling really has to do with setting the io_service up once, and apart from posting or dispatching as appropriate, the key is to run the io_service on the worker thread. That opens the whole thing up for seamless (hopefully) non-blocking operation.

On page 8, maybe I am missing something about boost::asio::ip::tcp::resolver::query? Example 7a has the host address as the web URL? Or IP address would work if we're connecting to a non-DHCP-mapped-address? Then a lexical cast from 80 (port?) to string? For what purpose? Is that what query is calling "host"? While the address is called "service"? I'm confused about that.
 

PARTNERS