Sign in to follow this  
_moagstar_

boost (asio, iostreams, thread) and strange memory leaks

Recommended Posts

I am having some difficulties in tracking down the source of a memory leak, and was wondering if anyone had some insights into why they are happening. I'll give you a bit of background to the class I am writing. I am using boost.asio for network code, and in particular the socket_stream class. The operations for this class are blocking, this is not a problem, however I am also using boost.thread, and when a thread is interrupted, I also want all the blocking operations to be interrupted too. I couldn't see a way to do this using the asio library, so I decided to try and implement a class similar to socket_stream that uses asynchronous io under the hood, providing interruption points in the send and receive functions. To client code it just looks like a normal blocking send or receive. I am using the boost.iostreams class to make this task a bit easier for myself. However in the test that I am running, I sporadically get memory leaks : 1>Detected memory leaks! 1>Dumping objects -> 1>{275} normal block at 0x003DB3D0, 16 bytes long. 1> Data: < = = > D0 91 3D 00 D0 91 3D 00 00 00 00 00 00 00 00 00 1>{263} normal block at 0x003D91D0, 16 bytes long. 1> Data: < = = > D0 B3 3D 00 D0 B3 3D 00 CD CD CD CD CD CD CD CD 1>{262} normal block at 0x003D9190, 16 bytes long. 1> Data: < = = > 90 91 3D 00 90 91 3D 00 CD CD CD CD CD CD CD CD 1>{261} normal block at 0x003D9150, 16 bytes long. 1> Data: <P = P = > 50 91 3D 00 50 91 3D 00 CD CD CD CD CD CD CD CD 1>{260} normal block at 0x003D9110, 16 bytes long. 1> Data: < = = > 10 91 3D 00 10 91 3D 00 CD CD CD CD CD CD CD CD 1>{259} normal block at 0x01410068, 73808 bytes long. 1> Data: < u = `$| > DC E2 75 00 90 B4 3D 00 60 24 7C 00 00 00 00 00 1>{222} normal block at 0x003DB840, 24 bytes long. 1> Data: < u = | > 08 DF 75 00 90 B4 3D 00 08 1F 7C 00 00 00 00 00 1>{220} normal block at 0x003DB6F0, 56 bytes long. 1> Data: <l u = "| > 6C E1 75 00 90 B4 3D 00 E8 22 7C 00 00 00 00 00 1>{219} normal block at 0x003DB6A8, 24 bytes long. 1> Data: < u = | > F8 DE 75 00 90 B4 3D 00 90 1F 7C 00 00 00 00 00 1>{217} normal block at 0x003DB520, 112 bytes long. 1> Data: < u = | > 90 CB 75 00 90 B4 3D 00 00 1B 7C 00 00 00 00 00 1>{216} normal block at 0x003DB4D0, 32 bytes long. 1> Data: < > 98 A4 15 00 FF FF FF FF 00 00 00 00 00 00 00 00 1>{215} normal block at 0x003DB490, 16 bytes long. 1> Data: < 3= 4= = = > D0 33 3D 00 00 34 3D 00 D0 B4 3D 00 20 B5 3D 00 1>{206} normal block at 0x003DA068, 4100 bytes long. 1> Data: < > CD CD CD CD CD CD CD CD CD CD CD CD CD CD CD CD 1>{97} normal block at 0x003D3400, 16 bytes long. 1> Data: < u 3= > 18 DF 75 00 01 00 00 00 01 00 00 00 D0 33 3D 00 1>{96} normal block at 0x003D33D0, 4 bytes long. 1> Data: < > 00 00 00 00 1>Object dump complete. Does anyone have any idea what this problem could be? As far as I can tell I can't see anything obviously wrong with the code, although I have been looking at this problem for a while now and I can't seem to make sense of these leaks, so there more than likely is a blantantly obvious bug I am missing. Any help would be greatly appreciated, thanks in advance. Here is the code :
// SocketStream.h
class SocketDevice
{
public:

    typedef char char_type;

    typedef boost::iostreams::seekable_device_tag category;
    typedef boost::iostreams::stream_offset StreamOffset;

    typedef boost::asio::ip::tcp::endpoint Endpoint;
    typedef boost::asio::io_service IoService;

    SocketDevice();
    SocketDevice(const SocketDevice& source);
    SocketDevice& operator = (const SocketDevice& source);    

    void connect(const Endpoint& endpoint);
    void accept(const Endpoint& endpoint);

    std::streamsize read(char_type* data, std::streamsize count);
    std::streamsize write(const char* data, std::streamsize count);   
    StreamOffset seek(StreamOffset offset, std::ios_base::seekdir direction);

private:

    void sleep();

    void asyncAcceptHandler(const boost::system::error_code& error);
    void asyncConnectHandler(const boost::system::error_code& error);
    void asyncReceiveHandler(const boost::system::error_code& error, std::size_t bytes_transferred);
    void asyncSendHandler(const boost::system::error_code& error, std::size_t bytes_transferred);

    typedef boost::asio::ip::tcp::socket Socket;
    typedef boost::asio::ip::tcp::acceptor Acceptor;

    boost::scoped_ptr<IoService>    m_service;
    boost::scoped_ptr<Socket>       m_socket;
    boost::scoped_ptr<Acceptor>     m_acceptor;

    bool                            m_connected;
    std::streamsize                 m_bytesSent;
    std::streamsize                 m_bytesReceived;

};

typedef boost::iostreams::stream<SocketDevice> SocketStream;


// SocketStream.cpp
SocketDevice::SocketDevice() :
    m_service(new IoService),
    m_socket(new Socket(*m_service)),
    m_acceptor(new Acceptor(*m_service)),
    m_connected(false),
    m_bytesSent(0),
    m_bytesReceived(0)
{    
}

SocketDevice::SocketDevice(
    const SocketDevice& source    ///< The source device to copy from
    ) :
    m_service(new IoService),
    m_socket(new Socket(*m_service)),
    m_acceptor(new Acceptor(*m_service)),
    m_connected(false),
    m_bytesSent(0),
    m_bytesReceived(0)
{
}

SocketDevice& SocketDevice::operator = (const SocketDevice& source)
{
    return *this;
}

void SocketDevice::sleep()
{
    boost::xtime xt;
    boost::xtime_get(&xt, boost::TIME_UTC);
    xt.nsec += 1000;
    boost::thread::sleep(xt);
}

void SocketDevice::connect(const Endpoint& endpoint)
{
    m_socket->async_connect(endpoint, 
        boost::bind(&SocketDevice::asyncConnectHandler, this, boost::asio::placeholders::error));

    while (!m_connected)
    {
        m_service->poll();

        // this sleep ensures an interruption point is defined so that a thread
        // performing an asynchronous connect can never be blocked when an
        // interrupt is attempted.
        sleep();
    }
}

void SocketDevice::accept(const Endpoint& endpoint)
{
    m_acceptor->async_accept(*m_socket, const_cast<Endpoint&>(endpoint),
        boost::bind(&SocketDevice::asyncAcceptHandler, this, boost::asio::placeholders::error));

    while (!m_connected)
    {
        m_service->poll();

        // this sleep ensures an interruption point is defined so that a thread
        // performing an asynchronous accept can never be blocked when an
        // interrupt is attempted.
        sleep();
    }
}

std::streamsize SocketDevice::read(char_type* data, std::streamsize count)
{
    using namespace boost::asio::placeholders;

    boost::asio::async_read(*m_socket, boost::asio::buffer(data, count), 
        boost::bind(&SocketDevice::asyncReceiveHandler, this, error, bytes_transferred));

    m_bytesReceived = 0;

    while (m_bytesReceived < count)
    {
        m_service->poll();

        // this sleep ensures an interruption point is defined so that a thread
        // performing an asynchronous receive can never be blocked when an
        // interrupt is attempted.
        sleep();
    }

    return count;
}

std::streamsize SocketDevice::write(const char* data, std::streamsize count)
{
    using namespace boost::asio::placeholders;

    m_bytesSent = 0;

    boost::asio::async_write(*m_socket, boost::asio::buffer(data, count), 
        boost::bind(&SocketDevice::asyncReceiveHandler, this, error, bytes_transferred));

    while (m_bytesSent < count)
    {
        m_service->poll();

        // this sleep ensures an interruption point is defined so that a thread
        // performing an asynchronous receive can never be blocked when an
        // interrupt is attempted.
        sleep();
    }

    return count;
}

SocketDevice::StreamOffset SocketDevice::seek(StreamOffset offset, std::ios_base::seekdir direction)
{
    ASSERT(0);    // unsupported for socket streams
    return 0;
}

void SocketDevice::asyncConnectHandler(const boost::system::error_code& error)
{
    m_connected = true;
}

void SocketDevice::asyncAcceptHandler(const boost::system::error_code& error)
{
    m_connected = true;
}

void SocketDevice::asyncReceiveHandler(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    m_bytesReceived += bytes_transferred;
}

void SocketDevice::asyncSendHandler(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    m_bytesSent += bytes_transferred;
}

SocketStreamTest.cpp
void server()
{
    using namespace boost::asio::ip;

    SocketStream            stream;
    SocketDevice::Endpoint  endpoint(tcp::v4(), 12345);

    stream.open(SocketDevice());
    stream->accept(endpoint);

    Sleep(2000);
}

void client()
{
    using namespace boost::asio::ip;

    SocketStream            stream;
    SocketDevice::Endpoint  endpoint(address::from_string("127.0.0.1"), 12345);

    stream.open(SocketDevice());
    stream->connect(endpoint);

    // perform an operation that will block so that the interruption
    // of a blocking operation can be tested.

    std::string str;
    stream >> str;
}

BOOST_AUTO_TEST_CASE(TestBlockingInterruption)
{
    boost::thread clientThread(client);
    boost::thread serverThread(server);

    Sleep(5000);

    serverThread.interrupt();
    clientThread.interrupt();
}

Share this post


Link to post
Share on other sites
Actually, I have found a way of making the asio stream socket non-blocking so that when the thread is interupted it doesn't hang :


boost::asio::socket_base::non_blocking_io command(true);
m_stream.rdbuf()->io_control(command);


So I don't have to implement my own socket class, phew. However I'm still curious as to the cause of the leaks.

Share this post


Link to post
Share on other sites
The leaks are reported at program shutdown. I assume that the boost.test library is setting the _CRTDBG_LEAK_CHECK_DF flag, because I'm not doing it. The leaks only occur if I interrupt the thread, which suggests to me that there is some memory that isn't being free'd when the thread_interrupted exception is being thrown, I was hoping that the code I am calling was exception safe, perhaps I should take another look at the exception safety guarentees of the iostreams and asio libary functions I'm using.

Share this post


Link to post
Share on other sites
Because it's at shutdown, there are chances that your leak detector is reporting these BEFORE this portion of memory is actually freed, in which case I wouldn't worry about it.

Share this post


Link to post
Share on other sites
Quote:
SocketDevice::SocketDevice(
const SocketDevice& source ///< The source device to copy from
) :
m_service(new IoService),
m_socket(new Socket(*m_service)),
m_acceptor(new Acceptor(*m_service)),
m_connected(false),
m_bytesSent(0),
m_bytesReceived(0)
{
}

SocketDevice& SocketDevice::operator = (const SocketDevice& source)
{
return *this;
}


These are not implemented correctly. Since you are using only managed pointers, there is no need to implement either copy constructor or assignment operation. If you do, they need to be implemented to behave as expected.

Copy operation like this doesn't make any sense either. You need one instance of io_service. You also only need a single acceptor per server socket. Copying sockets by recreating them isn't possible, since you can't bind them to same endpoint, and if you reconnect them, then it's not a copy operation.

Quote:
Actually, I have found a way of making the asio stream socket non-blocking so that when the thread is interupted it doesn't hang :

You are already using asynchronous calls, the blocking happens because you block in your code. Look at the tutorials to see how to multiplex calls using io_service.run().

Quote:
I couldn't see a way to do this using the asio library

Documentation explains that closing the socket does that.

[Edited by - Antheus on June 2, 2009 6:32:47 AM]

Share this post


Link to post
Share on other sites
I derive all my classes from boost::noncopyable and only remove the restriction if there's a very good reason why an instance should be copyable. IMO this helps to avoid a whole bunch of subtle errors like the one Antheus has pointed out.

Share this post


Link to post
Share on other sites
Quote:
Original post by Red Ant
I derive all my classes from boost::noncopyable and only remove the restriction if there's a very good reason why an instance should be copyable. IMO this helps to avoid a whole bunch of subtle errors like the one Antheus has pointed out.


Me too, but in this case the SocketDevice class must be copy constructable in order to be used with boost.iostreams. I'm not quite sure why I implemented the assignment operator any more it was late on friday when I first tried this - but it's not necessary.

Quote:
Original post by Antheus
These are not implemented correctly. Since you are using only managed pointers, there is no need to implement either copy constructor or assignment operation. If you do, they need to be implemented to behave as expected.

Copy operation like this doesn't make any sense either


I had kind of realised that these were flawed implementations, and like you say it doesn't really make sense to copy these objects, I just needed to provide a copy constructor so that it could be used with the iostreams library. I was getting round this by only opening the socket once it had been copied into the stream object, although this is not a particularly nice caveat to using this class, so I was hoping that I could find a solution that meant I could avoid having to write this class altogethor.

Quote:


You are already using asynchronous calls, the blocking happens because you block in your code. Look at the tutorials to see how to multiplex calls using io_service.run().

Documentation explains that closing the socket does that.



I'd rather not do this because close would have to be called on a seperate thread which means adding synchronisation everywhere. I'm not multiplexing through the service - this is undesirable in my situation, I want it to block, just not hang when the exception is thrown.

I made a typo in my original post too, it is the basic_socket_iostream class that I am using and << >> read write all use the non-asynchronous functions. The thread doesn't hang when setting the non_block_io_command command though so my short-term problem is solved (although I'm not sure how or why)

I am using non-member functions read and write in order to read and write objects to / from a stream, this stream could be a file, socket, an area of memory, any iostream compitable class basically. Which means that my same read and write functions will work with a variety of different data sources. This is why I don't want to use the async_read and async_write with multiplexing through the service.

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this