Jump to content

  • Log In with Google      Sign In   
  • Create Account






A guide to getting started with boost::asio

Posted by Drew_Benton, 31 January 2011 · 173,884 views



4. Serializing our workload with strand

There will come a time when we will want to queue work to be done, but the order in which it is done is important. The strand class was created for such scenarios. The strand class "provides serialised handler execution." This means if we post work1 -> work2 -> work3 through a strand, no matter how many worker threads we have, it will be executed in that order. Neat!

With great power comes great responsibility though. We must understand the order of handler invocation for the strand class!

Order of handler invocation
Given:

  • a strand object s
  • an object a meeting completion handler requirements
  • an object a1 which is an arbitrary copy of a made by the implementation
  • an object b meeting completion handler requirements
  • an object b1 which is an arbitrary copy of b made by the implementation
if any of the following conditions are true:
  • s.post(a) happens-before s.post( b )
  • s.post(a) happens-before s.dispatch( b ), where the latter is performed outside the strand
  • s.dispatch(a) happens-before s.post( b ), where the former is performed outside the strand
  • s.dispatch(a) happens-before s.dispatch( b ), where both are performed outside the strand
then asio_handler_invoke(a1, &a1) happens-before asio_handler_invoke(b1, &b1).

Note that in the following case:

async_op_1(..., s.wrap( a ));
async_op_2(..., s.wrap( b ));
the completion of the first async operation will perform s.dispatch( a ), and the second will perform s.dispatch( b ), but the order in which those are performed is unspecified. That is, you cannot state whether one happens-before the other. Therefore none of the above conditions are met and no ordering guarantee is made.


It is absolutely imperative that we understand these conditions when using the strand class. If we do not, we can code a solution that has undefined behavior that might work most of the time, but every once in a while, it breaks down and it is extremely hard to figure out why! I have done this myself and learned quite a lot from it as a result.

Now we can consider an example where we do not use strand. We will remove the output locks on the std::cout object.

Example 4a
Spoiler


The output on my PC was as follows:
Spoiler


This is pretty much expected. Since we no longer lock the std::cout object and have multiple threads writing to it, the final output gets combined. Depending on how many worker threads we have and how many PC cores as well, the output might look a little different and even might show up correct! Conceptually though, we will know the correct output does not mean anything here since we are not properly synchronizing access to a global shared object!

Now, let us check out the next example, simply comment out all of the io_service->post and uncomment the strand.post function calls. Here is one output of the strand program.

Spoiler


No matter how many times we run the program, we should see a clean output each time for the x outputs. This is because the strand object is correctly serializing the event processing to only one thread at a time. It is very important that we notice that strand does not serialize work through only one thread either. If we check the previous output once again, more than one thread was used. So work will still execute serially, but it will execute through whichever worker thread is available at the time. We cannot program with the incorrect assumption the same thread will actually process all of the work! If we do, we will have bugs that will come back to bite us.

As mentioned before, in the past I had used strand the wrong way without realizing it and it caused all sorts of hard to find problems. Let us now take a look at such an example that is syntactically correct but logically incorrect as per our expectations.

Example 4b
Spoiler


If we run this program quite a few times, we should see the expected 1, 2, 3, 4, 5, 6 output. However, every so often, we might see 2, 1, 3, 4, 5, 6 or some other variation where the events are switched. Sometimes we have to run a lot to get this to happen, while other times it might happen more frequently. The output remains clean though, but the order is just not as expected. This is because the work we are passing is guaranteed to be executed serially, but there is no guarantee to which the order of the work actually takes place as a result of the API functions we are using!

So if order is important, we have to go through the strand object API itself. If order is not important, then we can post across the io_service object and wrap our handler through the strand. It might seem obvious now, but if we were just getting started with this stuff on our own, it would be easy to misunderstand these basic concepts. The type of work we are posting will ultimately determine which interface we want to use as both are really useful. We will see more examples of the strand wrap member function being used in the future.

That pretty much covers the strand object. It is very powerful as it allows us to have synchronization without explicit locking. This is absolutely a must have feature when working with multi-threaded systems and maintaining efficiency across the board.

We almost have enough core concepts covered to move on into the networking aspect of the boost::asio library. The boost::asio library is huge with a ton of awesome features!





Attached Files






I have not read it all but it looks to be a great help. I was just starting to look into boost:asio for networking, in my current project, and this will give me a good start. Thanks!
Thank you for that article! I'm already using boost::asio (because it's awesome) but I wasn't aware of the "unordered vs ordered" issue regarding work being serialized through a strand. Please correct me if I'm wrong but as far as I understand it I can ensure ordering in example 4b when I change those lines...
io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );
...like this...
strand.post( boost::bind( &PrintNum, 1 ) );
..., right?!

Thank you for that article! I'm already using boost::asio (because it's awesome) but I wasn't aware of the "unordered vs ordered" issue regarding work being serialized through a strand. Please correct me if I'm wrong but as far as I understand it I can ensure ordering in example 4b when I change those lines...

io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );
...like this...
strand.post( boost::bind( &PrintNum, 1 ) );
..., right?!


Correct! You have to post through the strand itself to ensure explicit ordering whereas if you just wrap, you ensure serialization through that strand (with no guarantees to the actual order).

I too was not really aware of that issue until it popped up in one of my programs and boy was it a pain to track down. I mean the docs do explain this clearly, but it was something I just completely misinterpreted so I am making a point to everyone to be careful about it and closely re-read the docs!

Note that in the following case:

async_op_1(..., s.wrap(a));
async_op_2(..., s.wrap( b );
the completion of the first async operation will perform s.dispatch(a), and the second will perform s.dispatch( b ), but the order in which those are performed is unspecified. That is, you cannot state whether one happens-before the other. Therefore none of the above conditions are met and no ordering guarantee is made.




In example 4b, async_op_1 would be "io_service->post( strand.wrap( boost::bind( &PrintNum, 1 ) ) );" and async_op_2 would be"io_service->post( strand.wrap( boost::bind( &PrintNum, 2 ) ) );". Hindsight is 20/20 but it's only after you make the mistake or someone brings it up that it really stands out, assuming you didn't understand it before (which I'm sure a lot of more veteran programmers and docs readers wouldn't make that mistake, but we are all human after all :)).
Hi !! Great article, very informative :)

I've previously written an IOCP networking engine using Windows IOCP api which implemented per-connection protocol-handling via a pluggable abstract eventsink class, your implementation reminds me a lot of my iocp framework, with the exception of the event dispatching...

I'm a little confused about the final example implementations, all of a sudden the worker thread pool is gone?
I'd like to see the next exciting episode implementing thread-pooling - am I correct to assume that would belong in the Hive class, or a derived MyHive ?? If you could clarify this, I would be grateful.

I'd also like to see a wrapper class which implements both Client and Server functionality under one hood, I can think of several situations where it is desirable for a server application to make outbound connections. A simple socksv4 proxy server would make an excellent example implementation ;)
I think I see a small problem:<br><br>When I examine the debug output, it states that 18 bytes were sent, but only 12 were echoed back - depite the hex output showing the correct amount of 18 bytes in both cases.<br>When I comment out the 'hex output' code in the OnSend and OnRecv handlers, both sides of the connection correctly report that 18 bytes were echoed.<br>This is despite the client and server being executed in separate applications, and despite the global lock on debug output, so it seems not to be a threading issue.<br><br>What could be causing the buffer vector's length to be manipulated in this way?<br><br>
Yeah ,&nbsp; in MyConnection.OnRecv and OnSend, I replaced this:<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; std::cout &lt;&lt; "[" &lt;&lt; __FUNCTION__ &lt;&lt; "] " &lt;&lt; buffer.size() &lt;&lt; " bytes" &lt;&lt; std::endl;<br><br>with the following:<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; char blah[50];<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; sprintf_s(blah,"%d bytes\n",buffer.size());<br>&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; std::cout &lt;&lt; "[" &lt;&lt; __FUNCTION__ &lt;&lt; "] " &lt;&lt; blah;<br><br>and now it always correctly reports the 'packet size'.<br><br>I still don't understand why it was displaying incorrectly, any ideas?<br><br>

I'm a little confused about the final example implementations, all of a sudden the worker thread pool is gone?


Yes, simply because a client application such as was shown in 9b does not need really one. ;) In that case, I wanted a simple example that was able to be exited with a keypress (sorry, windows only example!) and did not use the thread pool because it was not needed. Sorry I didn't clarify this, but it was just personal preference for that example.

I'd like to see the next exciting episode implementing thread-pooling - am I correct to assume that would belong in the Hive class, or a derived MyHive ?? If you could clarify this, I would be grateful.


There's not much more to expand with the thread stuff. The custom Hive class would be for extending the object with your own methods as needed so it's all wrapped up into one object. You can then use boost::dynamic_pointer_cast to change the shared_ptr base type into the derived type.

I'd also like to see a wrapper class which implements both Client and Server functionality under one hood, I can think of several situations where it is desirable for a server application to make outbound connections. A simple socksv4 proxy server would make an excellent example implementation ;)


More examples are on the way with some fixes to the network wrapper code. I am actively using the code in quite a few different setups so I'll cover all the practical bases. The main use of my code right now is actually similar to what you are requesting and that is a proxy. Needless to say, I feel it works very nice overall in practice.
Nice stuff :). Although i already knew alot of the stuff discussed here I think it is a nice guide for people who are just starting. Nicely done :)
I would love to see a nicely written (i.e. latexified) PDF version of this article. It's really great.
First of all, I really like this tutorial. It discusses the basics of boost::asio very nicely.
But, the are several things I would like to change in your code. There are other opportunities which allows us to create more readable and more performant code. I've created an example and documentated the most changes I made:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>

#include <iostream>


void worker_thread(boost::asio::io_service&, boost::mutex&);


int main()
{
	boost::mutex              	mutex;
	boost::asio::io_service   	io_service;
	boost::asio::io_service::work work(io_service);

	// Please note: You don't need to lock access to `std::cout`, because no other thread is concurrently
	//          	trying to modifying the object
	std::cout << "thread [" << boost::this_thread::get_id() << "] press [any key] to exit." << std::endl;

	boost::thread_group worker_threads;
	for (int i = 0; i < 4; ++i)
	{
    	// Please note: You can pass references to the worker_thread() function even if you want to used it
    	//          	with an function object created by boost::bind(). The only thing you need to do is
    	//          	wrapping your object in a reference_wrapper by using boost::ref or boost::cref.
    	//          	However, you don't need to manage any global object any more and don't worry about
    	//          	copy constructibility.
    	worker_threads.create_thread(
        	boost::bind(worker_thread, boost::ref(io_service), boost::ref(mutex)));
	}

	// Now there are several threads running in the background and we would need to guard the access to
	// any global object used in our worker_thread() function.

	std::cin.get();

	io_service.stop();

	worker_threads.join_all();

	return 0;
}


void worker_thread(boost::asio::io_service& io_service, boost::mutex& mutex)
{
	{
    	// We can limit the scope of our lock_guard. So, it's much easier to see which code locations
    	// are guarded.
    	boost::lock_guard<boost::mutex> lock(mutex);
    	std::cout << "thread [" << boost::this_thread::get_id() << "] start" << std::endl;
	}

	io_service.run();

	{
    	boost::lock_guard<boost::mutex> lock(mutex);
    	std::cout << "thread [" << boost::this_thread::get_id() << "] finish" << std::endl;
	}
}
Great article!

You mentioned the fast delegates, is there a way to use them togheter with boost thread and the asio io_service? I've been trying to mess around with the last exapmle on the 4th page.. but none of the 3 fast delegate implementations seem to be able to bind like boost does so I'm not able to supply arguments when I post or dispatch jobs.. I'm able to use the fast delegates if I use the one that doesnt have arguments though..
Hi all,
is there a printable version of this guide?

Thanks
Little note. In example Example 2e you can make it more concise using std:ref. Then example will look like

void WorkerThread( boost::asio::io_service & io_service )
{
std::cout << "Thread Start\n";
io_service.run();
std::cout << "Thread Finish\n";
}
int main(int argc, char **argv)
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::cout << "Press [return] to exit." << std::endl;
boost::thread_group worker_threads;
for( int x = 0; x < 4; ++x )
{
  worker_threads.create_thread( boost::bind( &WorkerThread, std::ref(io_service) ) );
}
std::cin.get();
io_service.stop();
worker_threads.join_all();
return 0;
}
Also in sample 8a strand created but never used.
I was really enjoying this guide until you dropped that network wrapper on us as an abrupt end. Boost is best understood in small chunks because it's not readable in my opinion. The network wrapper doesn't even follow what you've been demonstrating up to this point. What the heck is a Hive? This is like a bad ending to a good anime.

It was the best post i've been found since I start studying boost.asio, considering I'm a newbie in this subject. I would like to congratulate you, my friend, and also thanks for the great post. I'm thinking about make a simpler version of this subject in portuguese, and I would like to know whether you let me show some samples from here - I'll include source, of course.

 

Thanks.

Okay, I will admit I am refreshing some rusty C++, some familiarity with boost and template development, but it's been a little while. That being said, not looking for the refresher course as has been mentioned in the early going. I do have a question, I am designing a "peripherals" network infrastructure, ultimately will go on an embedded device, but for now want to expose the peripherals of interest to a Windows host program.

 

Basically, we'll have one io_service (probably) per peripheral (could be two if we need for control and data (response) sockets to be different. That keeps the IO concerns neatly separated, if it doesn't get too busy in the Asio internals.

 

So... What is this concept of "work" and "strands" I am reading about? Writing is simple enough, but the asymetry of reading, blocking or polling, whether to go with asynchronous reading. I assume it's to parse through whatever response protocol we receive? Then do something with it, submit to an event broker or something like that, that a peripheral response has been received?

 

Need a little help grasping that magic if you will. Thanks!

I am a little confused by this one. What real work is being done here? In other words, so the thought crosses my mind, "so we bound the worker thread, and so we pass the io_service in as a paramter... so what?". In other words, what real work is being done here? Or when would it be appropriate to do so?

 

Taking a step back, there's really still the "simple" use case, right? Write some data, control, request, whatever, to a server. Read some data, control, response, whatever, from a client (or could be as a client to another server)?

 

Little note. In example Example 2e you can make it more concise using std:ref. Then example will look like
 

void WorkerThread( boost::asio::io_service & io_service )
{
std::cout << "Thread Start\n";
io_service.run();
std::cout << "Thread Finish\n";
}
int main(int argc, char **argv)
{
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::cout << "Press [return] to exit." << std::endl;
boost::thread_group worker_threads;
for( int x = 0; x < 4; ++x )
{
  worker_threads.create_thread( boost::bind( &WorkerThread, std::ref(io_service) ) );
}
std::cin.get();
io_service.stop();
worker_threads.join_all();
return 0;
}

Ah! Now I grok! Or I am starting to. The examples here are enlightening to me. So the coupling really has to do with setting the io_service up once, and apart from posting or dispatching as appropriate, the key is to run the io_service on the worker thread. That opens the whole thing up for seamless (hopefully) non-blocking operation.

On page 8, maybe I am missing something about boost::asio::ip::tcp::resolver::query? Example 7a has the host address as the web URL? Or IP address would work if we're connecting to a non-DHCP-mapped-address? Then a lexical cast from 80 (port?) to string? For what purpose? Is that what query is calling "host"? While the address is called "service"? I'm confused about that.
 

PARTNERS