Receive issues with recv()

Started by
5 comments, last by hplus0603 13 years ago
G'day to you!

I'll get straight to the point: I'm having some trouble with sending data from my server to my client and vice versa. Basically, data is continuously sent between the two, and this works fine for the most part. Problem is that at some point data will not be received by one or the other (possibly because of packet loss? I dunno). It will always stop receiving at the same point. The point at which it stops seems to be directly affected by the number of instructions between the calls to send() and recv(). i.e. if I had a piece of code that said:


m_iResult = ReceiveData(m_clientSocket, m_temp);

if (m_temp != m_controlArr.at(i).y)
{
printf("Send mismatch %d\n");
}



The recv() would not receive after, say the 20th time?

Now, if I removed the validation test to just:



m_iResult = ReceiveData(m_clientSocket, m_temp);


The recv() might fail after only the 8th piece of data had been sent.

Here's the important parts of the code for your reviewing pleasure:
SERVER


// Receive until the peer shuts down the connection
do {

m_iResult = ReceiveData(m_clientSocket, m_radius);
m_iSendResult = WriteData(m_radius, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_mousePos.x);
m_iSendResult = WriteData(m_mousePos.x, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_mousePos.y);
m_iSendResult = WriteData(m_mousePos.y, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_mousePos.z);
m_iSendResult = WriteData(m_mousePos.z, m_clientSocket);

vector<Vertex> m_controlArr;
m_ownedCount = 0;
for (int i = 0; i < m_grid; i++)
{
for (int j = 0; j < m_grid; j++)
{
m_pos = g_sphereArr[j].GetPosition();
if ((m_pos.x < m_radius) && (m_pos.y < m_radius))
{
if (g_sphereArr[j].GetOwnerShip() != 4)
{
g_sphereArr[j].SetOwnership(0);
}
m_player[0].SetOwnership(i, j);
m_controlArr.push_back(m_pos);
m_ownedCount++;
}
}
}

m_iSendResult = WriteData(m_ownedCount, m_clientSocket);
vector<float> m_sendBuff;
float m_temp;
for (int i = 0; i < m_ownedCount; i++)
{
printf("%d\n", i);
m_iSendResult = WriteData(m_controlArr.at(i).x, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_temp);
if (m_temp != m_controlArr.at(i).x)
{
printf("Send mismatch %d\n");
}
m_iSendResult = WriteData(m_controlArr.at(i).y, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_temp);
if (m_temp != m_controlArr.at(i).y)
{
printf("Send mismatch %d\n");
}
m_iSendResult = WriteData(m_controlArr.at(i).z, m_clientSocket);
m_iResult = ReceiveData(m_clientSocket, m_temp);
if (m_temp != m_controlArr.at(i).z)
{
printf("Send mismatch %d\n");
}
}
printf("Bytes sent: %d\n");

} while (m_iSendResult > 0);


CLIENT


int m_result = 512;
g_socket.WriteData(m_radius); //Send radius
m_result = g_socket.ReadData(m_radius);
g_socket.WriteData(g_mousePos.x); //Send position data
m_result = g_socket.ReadData(g_mousePos.x);
g_socket.WriteData(g_mousePos.y);
m_result = g_socket.ReadData(g_mousePos.y);
g_socket.WriteData(g_mousePos.z);
m_result = g_socket.ReadData(g_mousePos.z);

GLfloat m_sphereSize = 3.0f;
Vertex m_spherePos;
int m_receiving = 0;

g_socket.ReadData(m_controlCount);
Sphere* m_sphereArr = new Sphere[m_controlCount];
vector<Vertex> m_sphere;
for (int i = 0; i < m_controlCount; i++)
{
m_sphereArr.Create(m_sphereSize, 32, 32);
printf("%d\n", i);
m_result = g_socket.ReadData(m_spherePos.x);
g_socket.WriteData(m_spherePos.x);
m_result = g_socket.ReadData(m_spherePos.y);
g_socket.WriteData(m_spherePos.y);
m_result = g_socket.ReadData(m_spherePos.z);
g_socket.WriteData(m_spherePos.z);
m_sphere.push_back(m_spherePos);
}


...and here's the send function I'm using in both...


int Socket::WriteData(float p_data)
{
std::string m_tmp = FloatToString(p_data).c_str();
char* m_strToChr = (char*)m_tmp.c_str();
g_sendBuf = m_strToChr;
int m_sndResult = send(g_connectSocket, g_sendBuf, (int)strlen(g_sendBuf), 0);
std::cout << "Sent: " << g_sendBuf << std::endl;
if (m_sndResult == SOCKET_ERROR)
{
printf("Send failed: %d\n", WSAGetLastError());
closesocket(g_connectSocket);
WSACleanup();
return 1;
}
return m_sndResult;
}


...and the receive function:


int Socket::ReadData(float &p_data)
{
char m_recvbuf[DEFAULT_BUFLEN];
int m_recvBufLen = DEFAULT_BUFLEN;
std::string m_buf;
int m_iResult = -1;
m_iResult = recv(g_connectSocket, m_recvbuf, m_recvBufLen, 0);

if (m_iResult == INVALID_SOCKET)
{
printf("Receive failed: %d\n", WSAGetLastError());
closesocket(g_connectSocket);
WSACleanup();
return 1;
}
for (int i = 0; i < m_iResult; i++)
{
m_buf.push_back(m_recvbuf);
}
std::istringstream m_strToFloat(m_buf);
m_strToFloat >> p_data;
std::cout << "Recieved: " << p_data << std::endl;
return m_iResult;
}



Any help would be greatly appreciated, I've been stuck on this for a while now.

Note that this is using blocking as the data must be sent in sync. Later I need to add another three clients, and yes, I know this code ain't pretty, but at this point as long as it works I'll be happy.
Advertisement

I'll get straight to the point: I'm having some trouble with sending data from my server to my client and vice versa. Basically, data is continuously sent between the two, and this works fine for the most part. Problem is that at some point data will not be received by one or the other (possibly because of packet loss? I dunno). It will always stop receiving at the same point. The point at which it stops seems to be directly affected by the number of instructions between the calls to send() and recv(). i.e. if I had a piece of code that said:



send() *may* block if the output buffer is full.
recv() *will* block until it can receive at least one byte.
If you get to the point where both programs are waiting for data at the same time, you will deadlock.
Also, if one end is falling behind, and send() blocks on both ends, then you will also deadlock.

Finally, you seem to assume that send() will always send all the data, and that recv() will always receive one "packet's" worth of data. This is not actually the case for TCP -- it's a stream protocol, and the only guarantee for send() and recv() is that they will transfer AT LEAST one byte, and AT MOST the number of bytes requested in the buffer, before they return.
enum Bool { True, False, FileNotFound };
Another problem with trying to sync a simulation by binding it to the underlying network system is that you put your whole program at the mercy of the connection between the server and client. Right now as you have it, you can't actually use any of the data you are updating until all the data has arrived. You are beginning a data update transaction but allowing it to take as long as needed to complete. What happens if a client drops in the middle of it? The state would be corrupted. Likewise if you are using any of the data outside of this loop in another thread perhaps, the program state would never be in a complete coherent state; it'd always be partial.

As you have it, your whole design is fatally flawed and will not work as you expect it to when you go to add more players. As soon as you introduce threads to handle each connection, your simulation will further get out of sync and have all sorts of race conditions because of how you are not locking access to global data. I'm going to strongly suggest you archive your current version and start a new version taking the following advice into consideration.

First, you will need to redo your entire networking system according to the guidelines hplus mentioned (also read the forum FAQ as it's really helpful!). You don't want to have send/recv functions for specific types of data; just generic buffers of data. The concept of keeping data sends/recvs in order has nothing to do with when you send or receive as much as when you process the data. TCP is a stream protocol so you cannot actually achieve a packet based protocol like UDP is without implementing your own layer on top of it. To accomplish this, you just make use of a state variable to know what to process next. That way, you can do other things while you are waiting for the network data to come and you do not have to do your partial state update transactions as you are now.

Next, you will need to implement a message system to know what data is arriving so you can properly process it. You should never just send strings across TCP without length unless you are actually using a stream protocol (such as HTTP) or you are using fixed sized strings (some games do, still...). The protocol you need is not a stream protocol, but a packet protocol so you must do some extra work. When you do get data across the network, you should also verify the integrity of it. The last thing you want is to code something that is easily exploitable all because you did not implement simple checks in the first place.In the computing world, "this should never happen" should be handled with an exception rather than the silent treatment. If it were to really never happen, you'd not have to think about it first, so if you have to think about it, it could happen (don't ask me how, I've seen it though.)

Finally, consider using an existing network library such as boost::asio that allows you to focus on your program logic more so then the underlying winsock implementation of a decent networking system. boost::asio takes care of a lot of the things you would otherwise have to in regards to the partial send sand receives if you use the API correctly, so that can help you quite a bit if you are not yet familiar with the TCP protocol (once again check out the forum FAQ as it has some really helpful links).

I've written a complete example of what was just mentioned for you to get an idea of one direction you can go. There are many different ways to approach the problem and come up with a solution, but this way is how I'd do it based on what I know so far. It's a lot of code, so take some time to look through the concepts shown rather than the specific implementation as the implementation itself might not be suitable for your task. The concepts though, should be. For reference, take a look at this thread for the visitor related logic and this post for the networking stuff. As a standard disclaimer, code may contain bugs!

main.cpp
[spoiler]
#include <string>
#include <vector>
#include <map>
#include <algorithm>
#include "network.h"
#include "visitor.h"
#include <conio.h>

//-----------------------------------------------------------------------------

void HexDumpToConsole( const std::vector< unsigned char > & data );

//-----------------------------------------------------------------------------

// Generic methods, use template specialization for specific types as needed.

template < typename type >
std::string ToString( const type & value )
{
std::stringstream ss;
ss << value;
return ss.str();
}

template < typename type >
type FromString( const std::string & str )
{
type value;
std::stringstream ss;
ss << str;
ss >> value;
return value;
}

//-----------------------------------------------------------------------------

// The packet protocol format we will be using

struct Packet
{
unsigned short size;
unsigned short opcode;
unsigned int hash;
std::vector< unsigned char > payload;

Packet()
{
size = 0;
opcode = 0;
hash = 0;
}
};

//-----------------------------------------------------------------------------

// We write a protocol class to simplfy interactions between the stream and
// packets. Overall, life is just easier when we take this approach.

class Protocol
{
private:
std::vector< uint8_t > m_recv_buffer;
std::list< Packet > m_incoming_packets;
Packet m_incoming_packet;
std::list< Packet > m_outgoing_packets;

private:
// Converts an object to a packet. For this protocol we implement a simple hash
// to verify data.
template < typename type >
Packet ToPacket( type & object )
{
SeralizeStream ss;
visit( object, ss );

Packet packet;
packet.opcode = object.opcode;
packet.payload = ss.Buffer();

if( packet.payload.size() > 0x7FFF )
{
throw( std::runtime_error( "The packet payload is too large." ) );
}

packet.size = packet.payload.size();

packet.hash = 0xFEEF1F0E;
for( size_t x = 0; x < packet.payload.size(); ++x )
{
packet.hash = packet.hash * 101 + packet.payload[x];
}

return packet;
}

public:
Protocol()
{
}

~Protocol()
{
}

// Buffers objects to be sent out once TransferOutgoing is called.
template < typename type >
void Send( type & object )
{
m_outgoing_packets.push_back( ToPacket( object ) );
}

// Buffers the network data stream and extracts packets when they are avaliable.
void Recv( const std::vector< uint8_t > & buffer )
{
m_recv_buffer.insert( m_recv_buffer.end(), buffer.begin(), buffer.end() );

while( true ) // Extract as many packets from the stream as possible
{
if( m_incoming_packet.size == 0 ) // Waiting for a new packet
{
if( m_recv_buffer.size() >= 8 ) // If we at least have a packet header
{
DeseralizeStream ds( m_recv_buffer ); // Read the entire header
ds.visit("size", m_incoming_packet.size );
ds.visit("opcode", m_incoming_packet.opcode );
ds.visit("hash", m_incoming_packet.hash );
m_recv_buffer.erase( m_recv_buffer.begin(), m_recv_buffer.begin() + 8 );
}
else
{
return; // Wait for a complete header to be buffered
}
}

if( m_recv_buffer.size() >= m_incoming_packet.size ) // We have the entire payload
{
m_incoming_packet.payload.insert( m_incoming_packet.payload.begin(), m_recv_buffer.begin(), m_recv_buffer.begin() + m_incoming_packet.size ); // Copy payload
m_recv_buffer.erase( m_recv_buffer.begin(), m_recv_buffer.begin() + m_incoming_packet.size ); // Remove the payload

unsigned int hash = 0xFEEF1F0E;
for( size_t x = 0; x < m_incoming_packet.payload.size(); ++x )
{
hash = hash * 101 + m_incoming_packet.payload[x];
}

if( m_incoming_packet.hash != hash )
{
throw( std::runtime_error( "The packet hash is incorrect." ) );
}

m_incoming_packet.hash = 0;

m_incoming_packets.push_back( m_incoming_packet ); // Save a copy of the packet to the list

m_incoming_packet = Packet(); // clear the packet
}
else
{
return; // Wait for the rest of the payload
}
}
}

// Returns a list of incoming packets. The design is to allow easy locking for MT applications.
std::list< Packet > TransferIncoming()
{
std::list< Packet > incoming = m_incoming_packets;
m_incoming_packets.clear();
return incoming;
}

// Returns a list of outgoing buffers. The design is to allow easy locking for MT applications.
std::list< std::vector< uint8_t > > TransferOutgoing()
{
std::list< std::vector< uint8_t > > outgoing;
std::list< Packet >::iterator itr = m_outgoing_packets.begin();
while( itr != m_outgoing_packets.end() )
{
Packet & packet = *itr;
SeralizeStream ss;
ss.visit("size", packet.size );
ss.visit("opcode", packet.opcode );
ss.visit("hash", packet.hash );
outgoing.push_back( ss.Buffer() );
outgoing.push_back( packet.payload );
++itr;
}
m_outgoing_packets.clear();
return outgoing;
}

// Creates an object from a packet. We use this style in case there is no default ctor.
template < typename type >
void FromPacket( Packet & packet, type & object )
{
if( packet.opcode != object.opcode )
{
throw( std::runtime_error( "The packet cannot be parsed into this object." ) );
}

DeseralizeStream ds( packet.payload );
visit( object, ds );
}
};

//-----------------------------------------------------------------------------

// Your message data types go here

struct Message_Object_Location
{
std::string x;
std::string y;
std::string z;

Message_Object_Location()
{
}
};

struct Message_UserInput
{
enum id{ opcode = 1 };
std::string radius;
Message_Object_Location location;

Message_UserInput()
{
}
};

struct Message_UserOwnedActors
{
enum id{ opcode = 2 };
unsigned char count;
std::vector< Message_Object_Location > locations;

Message_UserOwnedActors()
{
count = 0;
}
};

//-----------------------------------------------------------------------------

// Your custom visit functions for each type go here

template< typename Stream >
Stream & visit( Message_Object_Location & value, Stream & stream )
{
stream.visit( "x", value.x );
stream.visit( "y", value.y );
stream.visit( "z", value.z );
return stream;
}

template< typename Stream >
Stream & visit( Message_UserInput & value, Stream & stream )
{
stream.visit( "radius", value.radius );
visit( value.location, stream );
return stream;
}

template< typename Stream >
Stream & visit( Message_UserOwnedActors & value, Stream & stream )
{
if( value.locations.size() > 255 )
{
throw( std::runtime_error( "No more than 255 locations can be sent at a time." ) );
}
stream.visit( "count", value.count );
value.locations.resize( value.count );
for( size_t x = 0; x < value.locations.size(); ++x )
{
visit( value.locations[x], stream );
}
return stream;
}

//-----------------------------------------------------------------------------

// We create a simulation class that represents the entire application state.
// We feed the simulation user events as needed.

class MyConnection;
class Simulation
{
private:
std::map< unsigned int, boost::weak_ptr< MyConnection > > connections;

public:
Simulation();
void AddUser( unsigned int id, boost::shared_ptr< MyConnection > connection );
void RemoveUser( unsigned int id );
void OnEvent( unsigned int id, Message_UserInput object );
void OnEvent( unsigned int id, Message_UserOwnedActors object );
void Tick();
};

Simulation global_simulation; // Boo! Just a simple example...

//-----------------------------------------------------------------------------

// Represents a connection. You can reuse the class for both server/client connections,
// but if you do you will have to adjust the code accordingly for each as needed. #define
// macros can work for that.
class MyConnection : public Connection
{
private:
unsigned int m_id;
Protocol m_protocol;

private:
void OnAccept( const std::string & host, uint16_t port ) // Only called for server connections
{
SetTimerInterval( 100 );

std::cout << "[" << __FUNCTION__ "][" << m_id << "] " << host << ":" << port << std::endl;

global_simulation.AddUser( m_id, boost::dynamic_pointer_cast< MyConnection >( shared_from_this() ) );

Recv();
}

void OnConnect( const std::string & host, uint16_t port ) // Only called for client connections
{
SetTimerInterval( 100 );

std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << host << ":" << port << std::endl;

Message_UserInput input;
input.radius = ToString( 3.14159f );
input.location.x = ToString( 100 );
input.location.y = ToString( 25 );
input.location.z = ToString( -10 );
m_protocol.Send( input );

Recv(); // Begin receiving
}

void OnSend( const std::vector< uint8_t > & buffer )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << buffer.size() << " bytes" << std::endl;
HexDumpToConsole( buffer );
}

void OnRecv( std::vector< uint8_t > & buffer )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << buffer.size() << " bytes" << std::endl;
HexDumpToConsole( buffer );

m_protocol.Recv( buffer ); // Parse the stream into packets
Recv(); // Start the next receive
}

void OnTimer( const boost::posix_time::time_duration & delta )
{
try
{
std::list< Packet > incoming = m_protocol.TransferIncoming();
if( incoming.size() > 0 )
{
std::list< Packet >::iterator itr = incoming.begin();
while( itr != incoming.end() )
{
Packet & packet = *itr;
if( packet.opcode == Message_UserInput::opcode )
{
try
{
Message_UserInput object;
m_protocol.FromPacket( packet, object );

global_simulation.OnEvent( m_id, object );
}
catch ( std::exception ex )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << ex.what() << std::endl;
Disconnect();
return;
}
}
else if( packet.opcode == Message_UserOwnedActors::opcode )
{
try
{
Message_UserOwnedActors object;
m_protocol.FromPacket( packet, object );

global_simulation.OnEvent( m_id, object );
}
catch ( std::exception ex )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << ex.what() << std::endl;
Disconnect();
return;
}
}
else
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] Error: Unknown packet with opcode (" << packet.opcode << ")" << std::endl;
}
++itr;
}
}
}
catch ( std::exception ex )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << ex.what() << std::endl;
Disconnect();
}

try
{
std::list< std::vector< uint8_t > > outgoing = m_protocol.TransferOutgoing();
if( outgoing.size() > 0 )
{
std::list< std::vector< uint8_t > >::iterator itr = outgoing.begin();
while( itr != outgoing.end() )
{
Send( *itr );
++itr;
}
}
}
catch ( std::exception ex )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << ex.what() << std::endl;
Disconnect();
}
}

void OnError( const boost::system::error_code & error )
{
std::cout << "[" << __FUNCTION__ << "][" << m_id << "] " << error << std::endl;

global_simulation.RemoveUser( m_id );
}

public:
MyConnection( boost::shared_ptr< Hive > hive )
: Connection( hive ), m_id( -1 )
{
}

~MyConnection()
{
}

void SetId( unsigned int id )
{
m_id = id;
}

unsigned int GetId() const
{
return m_id;
}

template < typename type >
void Transmit( type & object )
{
m_protocol.Send( object );
}
};

//-----------------------------------------------------------------------------

// Represents a "server".
class MyAcceptor : public Acceptor
{
private:
unsigned int m_id;

private:
bool OnAccept( boost::shared_ptr< Connection > connection, const std::string & host, uint16_t port )
{
std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;

boost::shared_ptr< MyConnection > cur_connection = boost::dynamic_pointer_cast< MyConnection >( connection );
cur_connection->SetId( ++m_id );

return true;
}

void OnTimer( const boost::posix_time::time_duration & delta )
{
}

void OnError( const boost::system::error_code & error )
{
std::cout << "[" << __FUNCTION__ << "] " << error << std::endl;
}

public:
MyAcceptor( boost::shared_ptr< Hive > hive )
: Acceptor( hive ), m_id( 0 )
{
}

~MyAcceptor()
{
}
};

//-----------------------------------------------------------------------------

int main( int argc, char * argv[] )
{
boost::shared_ptr< Hive > hive( new Hive() );

boost::shared_ptr< MyAcceptor > acceptor( new MyAcceptor( hive ) );
acceptor->Listen( "127.0.0.1", 7777 );

boost::shared_ptr< MyConnection > connection( new MyConnection( hive ) );
acceptor->Accept( connection );

while( true )
{
if( _kbhit() )
{
char ch = _getch();
if( ch == 'q' || ch == 'Q' )
{
std::cout << "Now exiting..." << std::endl;

break;
}
else if( ch == ' ' )
{
std::cout << "Creating a new client..." << std::endl;

boost::shared_ptr< MyConnection > connection( new MyConnection( hive ) );
connection->Connect( "127.0.0.1", 7777 );
}
}
hive->Poll();
global_simulation.Tick();
Sleep( 1 );
}

hive->Stop();

return 0;
}

//-----------------------------------------------------------------------------

void HexDumpToConsole( const std::vector< uint8_t > & data )
{
size_t size = data.size();
if( size % 16 )
{
size = size + 16 - size % 16;
}
for( size_t x = 0; x <= size; ++x )
{
if( x < data.size() )
{
printf( "%.2X ", data[x] );
}
else
{
printf( " " );
}
if( ( x + 1 ) % 16 == 0 )
{
printf(" ");
for( size_t y = 0; y < 16; ++y )
{
int ch = 0;
if( x - 16 - y < data.size() )
{
ch = data[ x - 16 - y ];
if( isprint( ch ) && !isspace( ch ) )
{
printf( "%c", ch );
}
else
{
printf( "." );
}
}
else
{
printf( "." );
}

}
printf( "\n" );
}
}
printf( "\n" );
}

//-----------------------------------------------------------------------------

Simulation::Simulation()
{
}

void Simulation::AddUser( unsigned int id, boost::shared_ptr< MyConnection > connection )
{
std::cout << "[" << __FUNCTION__ << "][" << id << "] " << "User added." << std::endl;

connections.insert( std::make_pair( id, connection ) );
}

void Simulation::RemoveUser( unsigned int id )
{
std::cout << "[" << __FUNCTION__ << "][" << id << "] " << "User removed." << std::endl;
}

void Simulation::OnEvent( unsigned int id, Message_UserInput object )
{
float radius = FromString< float >( object.radius );
float x = FromString< float >( object.location.x );
float y = FromString< float >( object.location.y );
float z = FromString< float >( object.location.z );

std::cout << "[" << __FUNCTION__ << "][" << id << "] " << "User event." << std::endl;
std::cout << "\t" << "Radius: " << radius << std::endl;
std::cout << "\t" << "X: " << x << std::endl;
std::cout << "\t" << "Y: " << y << std::endl;
std::cout << "\t" << "Z: " << z << std::endl;
std::cout << std::endl;

// TODO: your logic for processing the event

Message_Object_Location location1;
location1.x = ToString( 1.01f );
location1.y = ToString( 7.77f );
location1.z = ToString( 1.23f );

Message_Object_Location location2;
location2.x = ToString( 2.02f );
location2.y = ToString( 8.88f );
location2.z = ToString( 4.56f );

Message_Object_Location location3;
location3.x = ToString( 3.03f );
location3.y = ToString( 9.99f );
location3.z = ToString( 7.89f );

Message_UserOwnedActors response;
response.count = 3;
response.locations.push_back( location1 );
response.locations.push_back( location2 );
response.locations.push_back( location3 );

std::map< unsigned int, boost::weak_ptr< MyConnection > >::iterator itr = connections.find( id );
if( itr == connections.end() )
{
return; // TODO: Handle how you need to
}

boost::shared_ptr< MyConnection > connection = itr->second.lock();
if( connection )
{
connection->Transmit( response );
}
}

void Simulation::OnEvent( unsigned int id, Message_UserOwnedActors object )
{
std::cout << "[" << __FUNCTION__ << "][" << id << "] " << "User event." << std::endl;
std::cout << "\t" << "Count: " << (int)object.count << std::endl;
for( int x = 0; x < object.count; ++x )
{
float x_ = FromString< float >( object.locations[x].x );
float y_ = FromString< float >( object.locations[x].y );
float z_ = FromString< float >( object.locations[x].z );

std::cout << "\t" << "X: " << x_ << std::endl;
std::cout << "\t" << "Y: " << y_ << std::endl;
std::cout << "\t" << "Z: " << z_ << std::endl;
std::cout << std::endl;
}
std::cout << std::endl;
}

void Simulation::Tick()
{
}

//-----------------------------------------------------------------------------
[/spoiler]

visitor.h
[spoiler]
#pragma once

#ifndef VISITOR_H_
#define VISITOR_H_

//-----------------------------------------------------------------------------

#include <string>
#include <vector>

//-----------------------------------------------------------------------------

#define ENDIAN_BIG 0
#define ENDIAN_LITTLE 1

#if ENDIAN_BIG == 1 && ENDIAN_LITTLE == 1
#error "You may not use both big and little endian!"
#endif

#if ENDIAN_BIG == 0 && ENDIAN_LITTLE == 0
#error "Please choose an endianness!"
#endif

//-----------------------------------------------------------------------------

class SeralizeStream
{
private:
std::vector< unsigned char > m_buffer;

public:
SeralizeStream();
virtual ~SeralizeStream();
SeralizeStream & visit( const std::string & name, const std::string & value );
SeralizeStream & visit( const std::string & name, const unsigned char & value );
SeralizeStream & visit( const std::string & name, const signed char & value );
SeralizeStream & visit( const std::string & name, const unsigned short & value );
SeralizeStream & visit( const std::string & name, const signed short & value );
SeralizeStream & visit( const std::string & name, const unsigned int & value );
SeralizeStream & visit( const std::string & name, const signed int & value );;
SeralizeStream & visit( const std::string & name, const unsigned __int64 & value );
SeralizeStream & visit( const std::string & name, const signed __int64 & value );
SeralizeStream & visit( const std::string & name, const float & value );
SeralizeStream & visit( const std::string & name, const double & value );
std::vector< unsigned char > & Buffer();
void Clear();
};

//-----------------------------------------------------------------------------

class DeseralizeStream
{
private:
std::vector<unsigned char> & m_buffer;

protected:
size_t m_index;

public:
DeseralizeStream(std::vector<unsigned char> & buffer);
virtual ~DeseralizeStream();
DeseralizeStream & visit( const std::string & name, std::string & value );
DeseralizeStream & visit( const std::string & name, unsigned char & value );
DeseralizeStream & visit( const std::string & name, signed char & value );
DeseralizeStream & visit( const std::string & name, unsigned short & value );
DeseralizeStream & visit( const std::string & name, signed short & value );
DeseralizeStream & visit( const std::string & name, unsigned int & value );
DeseralizeStream & visit( const std::string & name, signed int & value );
DeseralizeStream & visit( const std::string & name, unsigned __int64 & value );
DeseralizeStream & visit( const std::string & name, signed __int64 & value );
DeseralizeStream & visit( const std::string & name, float & value );
DeseralizeStream & visit( const std::string & name, double & value );
};

//-----------------------------------------------------------------------------

#endif
[/spoiler]

visitor.cpp
[spoiler]
#include "visitor.h"
#include <iostream>
#include <iomanip>
#include <cstdio>
#include <cstdlib>

//-----------------------------------------------------------------------------

SeralizeStream::SeralizeStream()
{
}

//-----------------------------------------------------------------------------

SeralizeStream::~SeralizeStream()
{
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const std::string & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
size_t size = value.size();
do
{
if( size >= 255 )
{
m_buffer.push_back( 255 );
size -= 254;
}
else
{
m_buffer.push_back( static_cast< unsigned char >( size ) );
size -= size;
}
} while( size != 0 );
m_buffer.insert( m_buffer.end(), value.begin(), value.end() );
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const unsigned char & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << (int)value << " (0x" << std::hex << (int)value << std::dec << ")" << std::endl;
#endif
m_buffer.push_back( value );
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const signed char & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << (int)value << " (0x" << std::hex << (int)value << std::dec << ")" << std::endl;
#endif
m_buffer.push_back( value );
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const unsigned short & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const signed short & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const unsigned int & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const signed int & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const unsigned __int64 & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 56 ) & 0xFF );
m_buffer.push_back( ( value >> 48 ) & 0xFF );
m_buffer.push_back( ( value >> 40 ) & 0xFF );
m_buffer.push_back( ( value >> 32 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 32 ) & 0xFF );
m_buffer.push_back( ( value >> 40 ) & 0xFF );
m_buffer.push_back( ( value >> 48 ) & 0xFF );
m_buffer.push_back( ( value >> 56 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const signed __int64 & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
#if ENDIAN_BIG == 1
m_buffer.push_back( ( value >> 56 ) & 0xFF );
m_buffer.push_back( ( value >> 48 ) & 0xFF );
m_buffer.push_back( ( value >> 40 ) & 0xFF );
m_buffer.push_back( ( value >> 32 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 0 ) & 0xFF );
#elif ENDIAN_LITTLE == 1
m_buffer.push_back( ( value >> 0 ) & 0xFF );
m_buffer.push_back( ( value >> 8 ) & 0xFF );
m_buffer.push_back( ( value >> 16 ) & 0xFF );
m_buffer.push_back( ( value >> 24 ) & 0xFF );
m_buffer.push_back( ( value >> 32 ) & 0xFF );
m_buffer.push_back( ( value >> 40 ) & 0xFF );
m_buffer.push_back( ( value >> 48 ) & 0xFF );
m_buffer.push_back( ( value >> 56 ) & 0xFF );
#endif
return *this;
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const float & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
return visit( name, *( unsigned int * )( &value ) );
}

//-----------------------------------------------------------------------------

SeralizeStream & SeralizeStream::visit( const std::string & name, const double & value )
{
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
return visit( name, *( unsigned __int64 * )( &value ) );
}

//-----------------------------------------------------------------------------

std::vector< unsigned char > & SeralizeStream::Buffer()
{
return m_buffer;
}

//-----------------------------------------------------------------------------

void SeralizeStream::Clear()
{
m_buffer.clear();
}

//-----------------------------------------------------------------------------

DeseralizeStream::DeseralizeStream( std::vector<unsigned char> & buffer )
: m_buffer( buffer ), m_index( 0 )
{
}

//-----------------------------------------------------------------------------

DeseralizeStream::~DeseralizeStream()
{
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, std::string & value )
{
size_t size = 0;
while( m_buffer[ m_index ] == 255 )
{
size += 254;
++m_index;
}
size += m_buffer[ m_index++ ];
value.resize( size );
std::copy( m_buffer.begin() + m_index, m_buffer.begin() + m_index + size, value.begin() );
m_index += size;
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, unsigned char & value )
{
value = (unsigned char)m_buffer[ m_index++ ];
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << (int)value << " (0x" << std::hex << (int)value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, signed char & value )
{
value = (signed char)m_buffer[ m_index++ ];
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << (int)value << " (0x" << std::hex << (int)value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, unsigned short & value )
{
#if ENDIAN_BIG == 1
value = (unsigned short)m_buffer[ m_index++ ] << 8 | (unsigned short)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (unsigned short)m_buffer[ m_index++ ] << 0 | (unsigned short)m_buffer[ m_index++ ] << 8;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, signed short & value )
{
#if ENDIAN_BIG == 1
value = (signed short)m_buffer[ m_index++ ] << 8 | (signed short)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (signed short)m_buffer[ m_index++ ] << 0 | (signed short)m_buffer[ m_index++ ] << 8;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, unsigned int & value )
{
#if ENDIAN_BIG == 1
value = (unsigned int)m_buffer[ m_index++ ] << 24 | (unsigned int)m_buffer[ m_index++ ] << 16 | (unsigned int)m_buffer[ m_index++ ] << 8 | (unsigned int)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (unsigned int)m_buffer[ m_index++ ] << 0 | (unsigned int)m_buffer[ m_index++ ] << 8 | (unsigned int)m_buffer[ m_index++ ] << 16 | (unsigned int)m_buffer[ m_index++ ] << 24;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, signed int & value )
{
#if ENDIAN_BIG == 1
value = (signed int)m_buffer[ m_index++ ] << 24 | (signed int)m_buffer[ m_index++ ] << 16 | (signed int)m_buffer[ m_index++ ] << 8 | (signed int)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (signed int)m_buffer[ m_index++ ] << 0 | (signed int)m_buffer[ m_index++ ] << 8 | (signed int)m_buffer[ m_index++ ] << 16 | (signed int)m_buffer[ m_index++ ] << 24;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, unsigned __int64 & value )
{
#if ENDIAN_BIG == 1
value = (unsigned __int64)m_buffer[ m_index++ ] << 56 | (unsigned __int64)m_buffer[ m_index++ ] << 48 | (unsigned __int64)m_buffer[ m_index++ ] << 40 | (unsigned __int64)m_buffer[ m_index++ ] << 32 | (unsigned __int64)m_buffer[ m_index++ ] << 24 | (unsigned __int64)m_buffer[ m_index++ ] << 16 | (unsigned __int64)m_buffer[ m_index++ ] << 8 | (unsigned __int64)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (unsigned __int64)m_buffer[ m_index++ ] << 0 | (unsigned __int64)m_buffer[ m_index++ ] << 8 | (unsigned __int64)m_buffer[ m_index++ ] << 16 | (unsigned __int64)m_buffer[ m_index++ ] << 24 | (unsigned __int64)m_buffer[ m_index++ ] << 32 | (unsigned __int64)m_buffer[ m_index++ ] << 40 | (unsigned __int64)m_buffer[ m_index++ ] << 48 | (unsigned __int64)m_buffer[ m_index++ ] << 56;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, signed __int64 & value )
{
#if ENDIAN_BIG == 1
value = (signed __int64)m_buffer[ m_index++ ] << 56 | (signed __int64)m_buffer[ m_index++ ] << 48 | (signed __int64)m_buffer[ m_index++ ] << 40 | (signed __int64)m_buffer[ m_index++ ] << 32 | (signed __int64)m_buffer[ m_index++ ] << 24 | (signed __int64)m_buffer[ m_index++ ] << 16 | (signed __int64)m_buffer[ m_index++ ] << 8 | (signed __int64)m_buffer[ m_index++ ] << 0;
#elif ENDIAN_LITTLE == 1
value = (signed __int64)m_buffer[ m_index++ ] << 0 | (signed __int64)m_buffer[ m_index++ ] << 8 | (signed __int64)m_buffer[ m_index++ ] << 16 | (signed __int64)m_buffer[ m_index++ ] << 24 | (signed __int64)m_buffer[ m_index++ ] << 32 | (signed __int64)m_buffer[ m_index++ ] << 40 | (signed __int64)m_buffer[ m_index++ ] << 48 | (signed __int64)m_buffer[ m_index++ ] << 56;
#endif
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << " (0x" << std::hex << value << std::dec << ")" << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, float & value )
{
unsigned int v;
visit( name, v );
memcpy( &value, &v, 4 );
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

DeseralizeStream & DeseralizeStream::visit( const std::string & name, double & value )
{
unsigned __int64 v;
visit( name, v );
memcpy( &value, &v, 8 );
#ifdef _DEBUG
std::cout << "[" << __FUNCTION__<< "] " << name << " => " << value << std::endl;
#endif
return *this;
}

//-----------------------------------------------------------------------------

[/spoiler]

network.h
[spoiler]
#pragma once

#ifndef NETWORK_H_
#define NETWORK_H_

//-----------------------------------------------------------------------------

#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <vector>
#include <list>
#include <boost/cstdint.hpp>

//-----------------------------------------------------------------------------

using boost::uint64_t;
using boost::uint32_t;
using boost::uint16_t;
using boost::uint8_t;

using boost::int64_t;
using boost::int32_t;
using boost::int16_t;
using boost::int8_t;

//-----------------------------------------------------------------------------

class Hive;
class Acceptor;
class Connection;

//-----------------------------------------------------------------------------

class Connection : public boost::enable_shared_from_this< Connection >
{
friend class Acceptor;
friend class Hive;

private:
boost::shared_ptr< Hive > m_hive;
boost::asio::ip::tcp::socket m_socket;
boost::asio::strand m_io_strand;
boost::asio::deadline_timer m_timer;
boost::posix_time::ptime m_last_time;
std::vector< uint8_t > m_recv_buffer;
std::list< int32_t > m_pending_recvs;
std::list< std::vector< uint8_t > > m_pending_sends;
int32_t m_receive_buffer_size;
int32_t m_timer_interval;
volatile uint32_t m_error_state;

protected:
Connection( boost::shared_ptr< Hive > hive );
virtual ~Connection();

private:
Connection( const Connection & rhs );
Connection & operator =( const Connection & rhs );
void StartSend();
void StartRecv( int32_t total_bytes );
void StartTimer();
void StartError( const boost::system::error_code & error );
void DispatchSend( std::vector< uint8_t > buffer );
void DispatchRecv( int32_t total_bytes );
void DispatchTimer( const boost::system::error_code & error );
void HandleConnect( const boost::system::error_code & error );
void HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr );
void HandleRecv( const boost::system::error_code & error, int32_t actual_bytes );
void HandleTimer( const boost::system::error_code & error );

private:
// Called when the connection has successfully connected to the local
// host.
virtual void OnAccept( const std::string & host, uint16_t port ) = 0;

// Called when the connection has successfully connected to the remote
// host.
virtual void OnConnect( const std::string & host, uint16_t port ) = 0;

// Called when data has been sent by the connection.
virtual void OnSend( const std::vector< uint8_t > & buffer ) = 0;

// Called when data has been received by the connection.
virtual void OnRecv( std::vector< uint8_t > & buffer ) = 0;

// Called on each timer event.
virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;

// Called when an error is encountered.
virtual void OnError( const boost::system::error_code & error ) = 0;

public:
// Returns the Hive object.
boost::shared_ptr< Hive > GetHive();

// Returns the socket object.
boost::asio::ip::tcp::socket & GetSocket();

// Returns the strand object.
boost::asio::strand & GetStrand();

// Sets the application specific receive buffer size used. For stream
// based protocols such as HTTP, you want this to be pretty large, like
// 64kb. For packet based protocols, then it will be much smaller,
// usually 512b - 8kb depending on the protocol. The default value is
// 4kb.
void SetReceiveBufferSize( int32_t size );

// Returns the size of the receive buffer size of the current object.
int32_t GetReceiveBufferSize() const;

// Sets the timer interval of the object. The interval is changed after
// the next update is called.
void SetTimerInterval( int32_t timer_interval_ms );

// Returns the timer interval of the object.
int32_t GetTimerInterval() const;

// Returns true if this object has an error associated with it.
bool HasError();

// Binds the socket to the specified interface.
void Bind( const std::string & ip, uint16_t port );

// Starts an a/synchronous connect.
void Connect( const std::string & host, uint16_t port );

// Posts data to be sent to the connection.
void Send( const std::vector< uint8_t > & buffer );

// Posts a recv for the connection to process. If total_bytes is 0, then
// as many bytes as possible up to GetReceiveBufferSize() will be
// waited for. If Recv is not 0, then the connection will wait for exactly
// total_bytes before invoking OnRecv.
void Recv( int32_t total_bytes = 0 );

// Posts an asynchronous disconnect event for the object to process.
void Disconnect();
};

//-----------------------------------------------------------------------------

class Acceptor : public boost::enable_shared_from_this< Acceptor >
{
friend class Hive;

private:
boost::shared_ptr< Hive > m_hive;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::strand m_io_strand;
boost::asio::deadline_timer m_timer;
boost::posix_time::ptime m_last_time;
int32_t m_timer_interval;
volatile uint32_t m_error_state;

private:
Acceptor( const Acceptor & rhs );
Acceptor & operator =( const Acceptor & rhs );
void StartTimer();
void StartError( const boost::system::error_code & error );
void DispatchAccept( boost::shared_ptr< Connection > connection );
void HandleTimer( const boost::system::error_code & error );
void HandleAccept( const boost::system::error_code & error, boost::shared_ptr< Connection > connection );

protected:
Acceptor( boost::shared_ptr< Hive > hive );
virtual ~Acceptor();

private:
// Called when a connection has connected to the server. This function
// should return true to invoke the connection's OnAccept function if the
// connection will be kept. If the connection will not be kept, the
// connection's Disconnect function should be called and the function
// should return false.
virtual bool OnAccept( boost::shared_ptr< Connection > connection, const std::string & host, uint16_t port ) = 0;

// Called on each timer event.
virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;

// Called when an error is encountered. Most typically, this is when the
// acceptor is being closed via the Stop function or if the Listen is
// called on an address that is not available.
virtual void OnError( const boost::system::error_code & error ) = 0;

public:
// Returns the Hive object.
boost::shared_ptr< Hive > GetHive();

// Returns the acceptor object.
boost::asio::ip::tcp::acceptor & GetAcceptor();

// Returns the strand object.
boost::asio::strand & GetStrand();

// Sets the timer interval of the object. The interval is changed after
// the next update is called. The default value is 1000 ms.
void SetTimerInterval( int32_t timer_interval_ms );

// Returns the timer interval of the object.
int32_t GetTimerInterval() const;

// Returns true if this object has an error associated with it.
bool HasError();

public:
// Begin listening on the specific network interface.
void Listen( const std::string & host, const uint16_t & port );

// Posts the connection to the listening interface. The next client that
// connections will be given this connection. If multiple calls to Accept
// are called at a time, then they are accepted in a FIFO order.
void Accept( boost::shared_ptr< Connection > connection );

// Stop the Acceptor from listening.
void Stop();
};

//-----------------------------------------------------------------------------

class Hive : public boost::enable_shared_from_this< Hive >
{
private:
boost::asio::io_service m_io_service;
boost::shared_ptr< boost::asio::io_service::work > m_work_ptr;
volatile uint32_t m_shutdown;

private:
Hive( const Hive & rhs );
Hive & operator =( const Hive & rhs );

public:
Hive();
virtual ~Hive();

// Returns the io_service of this object.
boost::asio::io_service & GetService();

// Returns true if the Stop function has been called.
bool HasStopped();

// Polls the networking subsystem once from the current thread and
// returns.
void Poll();

// Runs the networking system on the current thread. This function blocks
// until the networking system is stopped, so do not call on a single
// threaded application with no other means of being able to call Stop
// unless you code in such logic.
void Run();

// Stops the networking system. All work is finished and no more
// networking interactions will be possible afterwards until Reset is called.
void Stop();

// Restarts the networking system after Stop as been called. A new work
// object is created ad the shutdown flag is cleared.
void Reset();
};

//-----------------------------------------------------------------------------

#endif

[/spoiler]

network.cpp
[spoiler]
#include "network.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/interprocess/detail/atomic.hpp>

//-----------------------------------------------------------------------------

Hive::Hive()
: m_work_ptr( new boost::asio::io_service::work( m_io_service ) ), m_shutdown( 0 )
{
}

Hive::~Hive()
{
}

boost::asio::io_service & Hive::GetService()
{
return m_io_service;
}

bool Hive::HasStopped()
{
return ( boost::interprocess::detail::atomic_cas32( &m_shutdown, 1, 1 ) == 1 );
}

void Hive::Poll()
{
m_io_service.poll();
}

void Hive::Run()
{
m_io_service.run();
}

void Hive::Stop()
{
if( boost::interprocess::detail::atomic_cas32( &m_shutdown, 1, 0 ) == 0 )
{
m_work_ptr.reset();
m_io_service.run();
m_io_service.stop();
}
}

void Hive::Reset()
{
if( boost::interprocess::detail::atomic_cas32( &m_shutdown, 0, 1 ) == 1 )
{
m_io_service.reset();
m_work_ptr.reset( new boost::asio::io_service::work( m_io_service ) );
}
}

//-----------------------------------------------------------------------------

Acceptor::Acceptor( boost::shared_ptr< Hive > hive )
: m_hive( hive ), m_acceptor( hive->GetService() ), m_io_strand( hive->GetService() ), m_timer( hive->GetService() ), m_timer_interval( 1000 ), m_error_state( 0 )
{
}

Acceptor::~Acceptor()
{
}

void Acceptor::StartTimer()
{
m_last_time = boost::posix_time::microsec_clock::local_time();
m_timer.expires_from_now( boost::posix_time::milliseconds( m_timer_interval ) );
m_timer.async_wait( m_io_strand.wrap( boost::bind( &Acceptor::HandleTimer, shared_from_this(), _1 ) ) );
}

void Acceptor::StartError( const boost::system::error_code & error )
{
if( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 0 ) == 0 )
{
boost::system::error_code ec;
m_acceptor.cancel( ec );
m_acceptor.close( ec );
m_timer.cancel( ec );
OnError( error );
}
}

void Acceptor::DispatchAccept( boost::shared_ptr< Connection > connection )
{
m_acceptor.async_accept( connection->GetSocket(), connection->GetStrand().wrap( boost::bind( &Acceptor::HandleAccept, shared_from_this(), _1, connection ) ) );
}

void Acceptor::HandleTimer( const boost::system::error_code & error )
{
if( error || HasError() || m_hive->HasStopped() )
{
StartError( error );
}
else
{
OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
StartTimer();
}
}

void Acceptor::HandleAccept( const boost::system::error_code & error, boost::shared_ptr< Connection > connection )
{
if( error || HasError() || m_hive->HasStopped() )
{
connection->StartError( error );
}
else
{
if( connection->GetSocket().is_open() )
{
connection->StartTimer();
if( OnAccept( connection, connection->GetSocket().remote_endpoint().address().to_string(), connection->GetSocket().remote_endpoint().port() ) )
{
connection->OnAccept( m_acceptor.local_endpoint().address().to_string(), m_acceptor.local_endpoint().port() );
}
}
else
{
connection->StartError( error );
}
}
}

void Acceptor::Stop()
{
m_io_strand.post( boost::bind( &Acceptor::HandleTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}

void Acceptor::Accept( boost::shared_ptr< Connection > connection )
{
m_io_strand.post( boost::bind( &Acceptor::DispatchAccept, shared_from_this(), connection ) );
}

void Acceptor::Listen( const std::string & host, const uint16_t & port )
{
boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
boost::asio::ip::tcp::resolver::query query( host, boost::lexical_cast< std::string >( port ) );
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve( query );
m_acceptor.open( endpoint.protocol() );
m_acceptor.set_option( boost::asio::ip::tcp::acceptor::reuse_address( false ) );
m_acceptor.bind( endpoint );
m_acceptor.listen( boost::asio::socket_base::max_connections );
StartTimer();
}

boost::shared_ptr< Hive > Acceptor::GetHive()
{
return m_hive;
}

boost::asio::ip::tcp::acceptor & Acceptor::GetAcceptor()
{
return m_acceptor;
}

int32_t Acceptor::GetTimerInterval() const
{
return m_timer_interval;
}

void Acceptor::SetTimerInterval( int32_t timer_interval )
{
m_timer_interval = timer_interval;
}

bool Acceptor::HasError()
{
return ( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 1 ) == 1 );
}

//-----------------------------------------------------------------------------

Connection::Connection( boost::shared_ptr< Hive > hive )
: m_hive( hive ), m_socket( hive->GetService() ), m_io_strand( hive->GetService() ), m_timer( hive->GetService() ), m_receive_buffer_size( 4096 ), m_timer_interval( 1000 ), m_error_state( 0 )
{
}

Connection::~Connection()
{
}

void Connection::Bind( const std::string & ip, uint16_t port )
{
boost::asio::ip::tcp::endpoint endpoint( boost::asio::ip::address::from_string( ip ), port );
m_socket.open( endpoint.protocol() );
m_socket.set_option( boost::asio::ip::tcp::acceptor::reuse_address( false ) );
m_socket.bind( endpoint );
}

void Connection::StartSend()
{
if( !m_pending_sends.empty() )
{
boost::asio::async_write( m_socket, boost::asio::buffer( m_pending_sends.front() ), m_io_strand.wrap( boost::bind( &Connection::HandleSend, shared_from_this(), boost::asio::placeholders::error, m_pending_sends.begin() ) ) );
}
}

void Connection::StartRecv( int32_t total_bytes )
{
if( total_bytes > 0 )
{
m_recv_buffer.resize( total_bytes );
boost::asio::async_read( m_socket, boost::asio::buffer( m_recv_buffer ), m_io_strand.wrap( boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
}
else
{
m_recv_buffer.resize( m_receive_buffer_size );
m_socket.async_read_some( boost::asio::buffer( m_recv_buffer ), m_io_strand.wrap( boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
}
}

void Connection::StartTimer()
{
m_last_time = boost::posix_time::microsec_clock::local_time();
m_timer.expires_from_now( boost::posix_time::milliseconds( m_timer_interval ) );
m_timer.async_wait( m_io_strand.wrap( boost::bind( &Connection::DispatchTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartError( const boost::system::error_code & error )
{
if( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 0 ) == 0 )
{
boost::system::error_code ec;
m_socket.shutdown( boost::asio::ip::tcp::socket::shutdown_both, ec );
m_socket.close( ec );
m_timer.cancel( ec );
OnError( error );
}
}

void Connection::HandleConnect( const boost::system::error_code & error )
{
if( error || HasError() || m_hive->HasStopped() )
{
StartError( error );
}
else
{
if( m_socket.is_open() )
{
OnConnect( m_socket.remote_endpoint().address().to_string(), m_socket.remote_endpoint().port() );
}
else
{
StartError( error );
}
}
}

void Connection::HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr )
{
if( error || HasError() || m_hive->HasStopped() )
{
StartError( error );
}
else
{
OnSend( *itr );
m_pending_sends.erase( itr );
StartSend();
}
}

void Connection::HandleRecv( const boost::system::error_code & error, int32_t actual_bytes )
{
if( error || HasError() || m_hive->HasStopped() )
{
StartError( error );
}
else
{
m_recv_buffer.resize( actual_bytes );
OnRecv( m_recv_buffer );
m_pending_recvs.pop_front();
if( !m_pending_recvs.empty() )
{
StartRecv( m_pending_recvs.front() );
}
}
}

void Connection::HandleTimer( const boost::system::error_code & error )
{
if( error || HasError() || m_hive->HasStopped() )
{
StartError( error );
}
else
{
OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
StartTimer();
}
}

void Connection::DispatchSend( std::vector< uint8_t > buffer )
{
bool should_start_send = m_pending_sends.empty();
m_pending_sends.push_back( buffer );
if( should_start_send )
{
StartSend();
}
}

void Connection::DispatchRecv( int32_t total_bytes )
{
bool should_start_receive = m_pending_recvs.empty();
m_pending_recvs.push_back( total_bytes );
if( should_start_receive )
{
StartRecv( total_bytes );
}
}

void Connection::DispatchTimer( const boost::system::error_code & error )
{
m_io_strand.post( boost::bind( &Connection::HandleTimer, shared_from_this(), error ) );
}

void Connection::Connect( const std::string & host, uint16_t port)
{
boost::system::error_code ec;
boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
boost::asio::ip::tcp::resolver::query query( host, boost::lexical_cast< std::string >( port ) );
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
m_socket.async_connect( *iterator, m_io_strand.wrap( boost::bind( &Connection::HandleConnect, shared_from_this(), _1 ) ) );
StartTimer();
}

void Connection::Disconnect()
{
m_io_strand.post( boost::bind( &Connection::HandleTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}

void Connection::Recv( int32_t total_bytes )
{
m_io_strand.post( boost::bind( &Connection::DispatchRecv, shared_from_this(), total_bytes ) );
}

void Connection::Send( const std::vector< uint8_t > & buffer )
{
m_io_strand.post( boost::bind( &Connection::DispatchSend, shared_from_this(), buffer ) );
}

boost::asio::ip::tcp::socket & Connection::GetSocket()
{
return m_socket;
}

boost::asio::strand & Connection::GetStrand()
{
return m_io_strand;
}

boost::shared_ptr< Hive > Connection::GetHive()
{
return m_hive;
}

void Connection::SetReceiveBufferSize( int32_t size )
{
m_receive_buffer_size = size;
}

int32_t Connection::GetReceiveBufferSize() const
{
return m_receive_buffer_size;
}

int32_t Connection::GetTimerInterval() const
{
return m_timer_interval;
}

void Connection::SetTimerInterval( int32_t timer_interval )
{
m_timer_interval = timer_interval;
}

bool Connection::HasError()
{
return ( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 1 ) == 1 );
}

//-----------------------------------------------------------------------------
[/spoiler]

And the output would look like this after hitting spacebar then q to exit:
[spoiler]
Creating a new client...
[MyAcceptor::OnAccept] 127.0.0.1:2038
[MyConnection::OnAccept][1] 127.0.0.1:7777
[Simulation::AddUser][1] User added.
[MyConnection::OnConnect][4294967295] 127.0.0.1:7777
[SeralizeStream::visit] radius => 3.14159
[SeralizeStream::visit] x => 100
[SeralizeStream::visit] y => 25
[SeralizeStream::visit] z => -10
[SeralizeStream::visit] size => 19 (0x13)
[SeralizeStream::visit] opcode => 1 (0x1)
[SeralizeStream::visit] hash => 17412564 (0x109b1d4)
[MyConnection::OnRecv][1] 8 bytes
13 00 01 00 D4 B1 09 01 ................

[DeseralizeStream::visit] size => 19 (0x13)
[DeseralizeStream::visit] opcode => 1 (0x1)
[DeseralizeStream::visit] hash => 17412564 (0x109b1d4)
[MyConnection::OnSend][4294967295] 8 bytes
13 00 01 00 D4 B1 09 01 ................

[MyConnection::OnRecv][1] 19 bytes
07 33 2E 31 34 31 35 39 03 31 30 30 02 32 35 03 ................
2D 31 30 .52.001.95141.3.

[MyConnection::OnSend][4294967295] 19 bytes
07 33 2E 31 34 31 35 39 03 31 30 30 02 32 35 03 ................
2D 31 30 .52.001.95141.3.

[DeseralizeStream::visit] radius => 3.14159
[DeseralizeStream::visit] x => 100
[DeseralizeStream::visit] y => 25
[DeseralizeStream::visit] z => -10
[Simulation::OnEvent][1] User event.
Radius: 3.14159
X: 100
Y: 25
Z: -10

[SeralizeStream::visit] count => 3 (0x3)
[SeralizeStream::visit] x => 1.01
[SeralizeStream::visit] y => 7.77
[SeralizeStream::visit] z => 1.23
[SeralizeStream::visit] x => 2.02
[SeralizeStream::visit] y => 8.88
[SeralizeStream::visit] z => 4.56
[SeralizeStream::visit] x => 3.03
[SeralizeStream::visit] y => 9.99
[SeralizeStream::visit] z => 7.89
[SeralizeStream::visit] size => 46 (0x2e)
[SeralizeStream::visit] opcode => 2 (0x2)
[SeralizeStream::visit] hash => 2131048720 (0x7f053910)
[MyConnection::OnRecv][4294967295] 8 bytes
2E 00 02 00 10 39 05 7F ................

[DeseralizeStream::visit] size => 46 (0x2e)
[DeseralizeStream::visit] opcode => 2 (0x2)
[DeseralizeStream::visit] hash => 2131048720 (0x7f053910)
[MyConnection::OnSend][1] 8 bytes
2E 00 02 00 10 39 05 7F ................

[MyConnection::OnRecv][4294967295] 46 bytes
03 04 31 2E 30 31 04 37 2E 37 37 04 31 2E 32 33 ................
04 32 2E 30 32 04 38 2E 38 38 04 34 2E 35 36 04 32.1.77.7.10.1..
33 2E 30 33 04 39 2E 39 39 04 37 2E 38 39 .65.4.88.8.20.2.

[MyConnection::OnSend][1] 46 bytes
03 04 31 2E 30 31 04 37 2E 37 37 04 31 2E 32 33 ................
04 32 2E 30 32 04 38 2E 38 38 04 34 2E 35 36 04 32.1.77.7.10.1..
33 2E 30 33 04 39 2E 39 39 04 37 2E 38 39 .65.4.88.8.20.2.

[DeseralizeStream::visit] count => 3 (0x3)
[DeseralizeStream::visit] x => 1.01
[DeseralizeStream::visit] y => 7.77
[DeseralizeStream::visit] z => 1.23
[DeseralizeStream::visit] x => 2.02
[DeseralizeStream::visit] y => 8.88
[DeseralizeStream::visit] z => 4.56
[DeseralizeStream::visit] x => 3.03
[DeseralizeStream::visit] y => 9.99
[DeseralizeStream::visit] z => 7.89
[Simulation::OnEvent][4294967295] User event.
Count: 3
X: 1.01
Y: 7.77
Z: 1.23

X: 2.02
Y: 8.88
Z: 4.56

X: 3.03
Y: 9.99
Z: 7.89


Now exiting...
[MyConnection::OnError][4294967295] system:0
[Simulation::RemoveUser][4294967295] User removed.
[MyConnection::OnError][1] asio.misc:2
[Simulation::RemoveUser][1] User removed.
[MyAcceptor::OnError] system:0
Press any key to continue . . .
[/spoiler]

Once you understand the basics of what is going on with the network and visitor logic, the main part you will focus on is the Simulation logic. That is where your applications logic takes place. You feed it inputs and based on state, it reacts accordingly. I tried to keep it simple, so all it uses is some hard coded values, but you should be able to get an idea of how to work with the model.

That is just to give you an idea of another method. It might not be the most ideal model for your simulation. In any case though, I cannot recommend you stick with your current model because it simply won't work (under realistic conditions). You really need to separate the network layer from the business logic layer. Good luck!
Hmm, thanks for the code, I'm just trying to get my head around it and figure out if it's appropriate for this task. Fingers crossed!
Ok, I've started again from scratch using UDP (which it wurns out I should have been using anyway).
I'm kind of getting a similar problem where my recvfrom() is only receiving the first couple of characters from the stream, yet that is only when the server attempts to communicate with the client and not vice versa. Curiously enough, the server-side sendto() which corresponds with that recvfrom() returns WSA error 0, which suggests that the client is disconnected.... which it isn't.

Here's my server's send function....



int SendData(SOCKET sd, char* p_sendBuf, sockaddr_in &client, int client_length)
{
if(sendto(sd, p_sendBuf, (int)strlen(p_sendBuf) + 1, 0, (struct sockaddr *)&client, client_length) != (int)sizeof(p_sendBuf))
{
printf("Error sending data: %d\n", WSAGetLastError());
return 0;
}
printf("Sent Data: %d\n", p_sendBuf);
return 1;
}


...and the client's receive code...


int Socket::ReceiveData(char* p_recvBuf)
{
if(recvfrom(sd, p_recvBuf, sizeof(p_recvBuf), 0, (SOCKADDR *) &server, (int *) &server_length) < 0)
{
printf("Error receiving data: %d\n", WSAGetLastError());
return 0;
}
printf("Received Data: %d\n", p_recvBuf);
return 1;
}



One thing I did notice, though is that changing 'sizeof(p_recvBuf)' to 4096 increased the amount of data received. Once again, any help would be greatly appreciated.

*Edit* I re-jigged the recvfrom() and it turns out the message I was sending was too large. I've eventually got it working. I think. I'm not sure the correct data is being sent, but that may just be because of the algorithm that creates the data being a bit off.
UDP is not a connected, reliable stream; it is an unconnected, unreliable datagram protocol. Your code must be able to handle packet loss, packet re-ordering and packet duplication.

int SendData(SOCKET sd, char* p_sendBuf, sockaddr_in &client, int client_length)
{
if(sendto(sd, p_sendBuf, (int)strlen(p_sendBuf) + 1, 0, (struct sockaddr *)&client, client_length) != (int)sizeof(p_sendBuf))



So, the first problem is that you're asking it to send strlen(p_sendBuf)+1 bytes, but then you're comparing against sizeof(p_sendBuf) bytes.

Given that p_sendBuf is of pointer type, that value will be 4 or 8, depending on your architecture. Meanwhile, strlen(p_sendBuf)+1 will be between 1 and undefined, depending on whether your argument string is properly zero terminated or not.

Second, because you are sending a terminating zero, you will be assuming that that zero is there on the receiving side. This means that someone who wants to crash your receiving end (or, worse, zero-day exploit it) will only have to craft a careful packet that does not including the terminating zero, and whatever you're doing on the receiving end will overflow the buffer. It's better to send the length of the string, and verify that the length of the string is not greater than the amount of data in the packet on the receiving end. Also, if the entire packet is a single string, you could just receive into a buffer that is one bigger than what you tell the system, and then put the terminating 0 at the end of the received data:


char buf[4096+1];
int r = recvfrom(sd, buf, sizeof(buf)-1, 0, &addr, &addrSize);
if (r < 0) {
// bad socket
}
else {
buf[r] = 0; // zero terminate
puts(buf); // yay, no buffer overflow!
}
enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement