Get data from web browser using posix threads C++

Started by
23 comments, last by hplus0603 13 years, 1 month ago
I am trying to implement this drawing using posix threads and sockets(TCP).

webserver <--- proxy <--- client
webserver ---> proxy ---> client




I tried using 4 threads for every arrow.I want to ask if i implement the code good,doesnt send data as it supposed too.Here is my code:

void* client_proxy(void* arg)
{
TCPSocket* sock = (TCPSocket*)arg;
pthread_mutex_lock(&mutex);
numbytes = sock->Recv(data, sizeof(data));
pthread_mutex_unlock(&mutex);
cout <<"Received from client: " << numbytes << endl;
}

void* proxy_server(void* arg)
{
ClientSocket* client = (ClientSocket*)arg;
pthread_mutex_lock(&mutex);
numbytes2 = client->Send(data, numbytes);
pthread_mutex_unlock(&mutex);
cout << "Sent to web server: " << numbytes << endl;
}

void* server_proxy(void* arg)
{
ClientSocket* client = (ClientSocket*)arg;
pthread_mutex_lock(&mutex2);
numbytes2 = client->Recv(data2, sizeof(data2));
cout <<"Received from web server: " << numbytes2 << endl;
pthread_mutex_unlock(&mutex2);
}

void* proxy_client(void* arg)
{
TCPSocket* sock = (TCPSocket*)arg;
pthread_mutex_lock(&mutex2);
numbytes = sock->Send(data2, numbytes2);
pthread_mutex_unlock(&mutex2);
cout << "Sent to client: " << numbytes << endl;
}




PS: the web server is a browser,and it always send data to load the page only 40-50% of it. Then it remains stuck. I just can't get the right design to make this work :(
Advertisement
Using a thread per socket (arrow), as you described it, is really inefficient and totally over complicates what you are trying to do. All you really need is one thread and the right use of socket functions.

Before we get into that though, you will first want to read this forums FAQ. It is very helpful and will address quite a few things you are doing wrong in that snippet of code. Check out Beej's Guide to Network Programming as well.

The problem you are having right now is TCP is a stream protocol. So any operation you perform for N bytes, 0 - N bytes might actually be processed. As a result, you have to keep track of how many bytes were actually processed, and then handle that scenario accordingly. So for example, when you call send, you need to make sure the number of bytes returned matches how many were requested to be sent. If it does not, then you have had a partial send take place, so you must wait until you can send again to finish sending the rest of the data. If you do not correctly implement that, then you will have stream corruption which to the end user looks like data loss.

There's a lot of wrapper code you are using that isn't shown, so you will need to make sure you understand the concepts of TCP first with the help of the FAQ and the guide, and then double check all your logic to make sure it will work as expected. In addition, to fix the thread issue you have currently, take a look at the select function. It will basically allow you to handle all your sockets in one thread through the function. Threading is not simple and adds quite a lot of complications when you have shared data, so it would help to use only one thread to start off with along with select.

One last note, judging by the code posted, it looks like you are also sharing the same buffer for recv and send operations. That is a big no-no so you will need to research the different proper ways to handle those data buffers with TCP. In short, that's simply making sure each socket has its own unique receive buffer and send buffers are all unique or you just merge all data into one buffer to send and slide data over to reuse it, implementing a pseudo-circular buffer.

Spend some time checking out the FAQ and the guide and then try to update your code accordingly to solve your problems. Good luck!

I am trying to implement this drawing using posix threads and sockets(TCP).

webserver <--- proxy <--- client
webserver ---> proxy ---> client




I think you'll need to give us a little more information here.

If this is all in-process, why wouldn't you just pass data between the subsystems as a simple object?

If this is between processes, then you just want one thread per process.
enum Bool { True, False, FileNotFound };
@ hplus0603: i am using a client Proxifier that intercepts all programs connecting to the internet. So for example if i open a webpage it will intercept that page and send it(ip) to the proxy.Then the proxy connects to that ip and a connection between client and server has established trough a proxy,and the client send's data to the proxy who forwards to the server,and vice versa.

I am confused that everyone tells me a different way to a aproach: use blocking sockets or non blocking sockets,use select,use threads. I would want to implement it using only threads without select(even if is ineficient),but if there is no way,i will use select too. So should i use blocking or non blocking sockets?

@ Drew_Benton: i will be greatfull if you tell me whats is the right way to use sockets in my implementation :)


PS: btw i am making a proxy server.


Thanks in advanced for help.
Why not just use two threads. One from server-to-client, the other from client-to-server. Streams are then independent.

Also, if this all there is to code, then it's missing a loop that reads until there is any data left.

struct SocketPair {
TCPSocket * sockFrom;
TCPSocket * sockTo;
};
void* client_proxy(void* arg)
{
SocketPair * p = (SocketPair*)arg;

while (p->sockFrom->open()) { // whatever you use to determine if there is more data to read
int nRead = p->sockFrom->Recv(data, sizeof(data));

int sent = 0;
int remaining = nRead;

do {
int nSent -= p->sockTo->Send(data+sent, remaining);
sent += nSent;
remaining -= nSent;
while (remaining > 0);
}
}


Now just start two threads, one with (server,client) and the other (client,server) in SocketPair parameter.

Obviously the above is lacking error handling so make sure to properly handle the return codes or exceptions or similar.

PS: btw i am making a proxy server.


You want to use select(). Or, if you're going to have more than 50 sockets open, you want to use GetQueuedCompletionStatus() with I/O completion ports.


enum Bool { True, False, FileNotFound };
I am still having problem i changed my send() and recv() data,and i made it work with two threads. Here is my complet code,surelly i have a mistake:



// forward declaration
class TCPSocket;
class ClientSocket;

// type used to encapsulate a TCPSOcket and a ClientSocket
struct SocketPair {
TCPSocket *clientProxySocket;
ClientSocket *proxyServerSocket;
};

// creates two threads for client and proxy
pthread_t pid1, pid2;

// declare a global SocketPair struct
SocketPair socketPair;

static void fillAddr(const string &address, unsigned short port, sockaddr_in &addr) {
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;

hostent *host;
if ((host = gethostbyname(address.c_str())) == NULL)
{
cout << "Error" << endl;
}
addr.sin_addr.s_addr = *((unsigned long *) host->h_addr_list[0]);
addr.sin_port = htons(port);
}

class Socket {
public:
Socket()
{
descriptor_m = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(descriptor_m == SOCKET_ERROR)
{
cout << "Error in Socket()" << strerror(errno) << endl;
}
}
Socket(unsigned int descriptor)
{
this->descriptor_m = descriptor;
}

// return the socket descriptor
unsigned int getSocket()
{
return descriptor_m;
}

void Close()
{
#ifdef WIN32
::closesocket(desc);
#else
::close(desc);
#endif
}

~Socket()
{
#ifdef WIN32
::closesocket(descriptor_m);
#else
::close(descdescriptor_m);
#endif
}
private:
unsigned int descriptor_m;

};

class TCPSocket: public Socket {
public:
TCPSocket(): Socket() {}

TCPSocket(int sockDesc): Socket(sockDesc) {}

int Send(const char* buffer, int size)
{
int total = 0;
int bytesleft = size;
int n;
while(total < n)
{
n = ::send(getSocket(), buffer + total, bytesleft, 0);
if(n == -1) break;
total += n;
bytesleft =- n;
}
size = total;
return size;
}
int Recv(char* buffer, int size)
{
int total = 0;
int n;
do {
n = ::recv(getSocket(), buffer, size, 0);
total += n;
}while(n == 0);
return total;

}
};

class ServerSocket: public Socket {
private:
sockaddr_in server;
public:
ServerSocket(): Socket()
{
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(LISTEN_PORT);
}
void Bind()
{
if((::bind(getSocket(), (sockaddr*)&server, sizeof(server))) == SOCKET_ERROR)
{
cout << "Error in Bind() -->";
cout << strerror(errno) << endl;
}
}
void Listen()
{
if((::listen(getSocket(), MAX_CONNECTIONS)) == SOCKET_ERROR)
{
cout << "Error in Listen() -->";
cout << strerror(errno) << endl;
}
}
TCPSocket* Accept()
{
int remoteSocket = ::accept(getSocket(), 0, 0);
if(remoteSocket == SOCKET_ERROR)
{
cout << "Error in Accept() -->";
cout << strerror(errno) << endl;
exit(1);
}
return new TCPSocket(remoteSocket);
}
};

class ClientSocket: public TCPSocket {
public:
void Connect(string address, unsigned int port)
{
sockaddr_in sin;

fillAddr(address, // in
port, // in
sin); // out
cout << "CONNETING TO :" << address << " " << port << endl;
if((::connect(getSocket(), (sockaddr*)&sin, sizeof(sin))) == SOCKET_ERROR)
{
cout << "Error in Connect()" << strerror(errno) << endl;
}
}
};

void Initialize()
{
WSAData wsadata;
if(WSAStartup( MAKEWORD( 1, 1 ), &wsadata ) != 0 )
{
cout << "Error creating socket" << endl;
exit(1);
}
wsadata.wVersion = 5;
}



void* client_server(void* arg)
{
SocketPair* socketPair = (SocketPair*)arg;

pthread_mutex_lock(&mutex);
numbytes = socketPair->clientProxySocket->Recv(data, sizeof(data));
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
numbytes2 = socketPair->proxyServerSocket->Send(data, numbytes);
pthread_mutex_unlock(&mutex);
cout <<"Received from client: " << numbytes << endl;
cout << "Sent to web server: " << numbytes2 << endl;
return arg;

return NULL;
}

void* server_client(void* arg)
{
SocketPair* socketPair = (SocketPair*)arg;
numbytes = socketPair->proxyServerSocket->Recv(data, sizeof(data));
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
numbytes2 = socketPair->clientProxySocket->Send(data, numbytes);
pthread_mutex_unlock(&mutex);
cout <<"Received from web server: " << numbytes << endl;
cout << "Sent to client: " << numbytes2 << endl;
return arg;
}

void dispatchConnection(TCPSocket *clientProxySocket, ClientSocket *proxyServerSocket)
{

// class for authentication protocl SOCKS 5 (source not incldued)
Handle handle(clientProxySocket);
handle.HandleAuthentication();


// get address and port from packet send by client
string addr = handle.getAddress();
short prt = handle.getPort();

proxyServerSocket->Connect(addr, prt);

// populate the socketPair
socketPair.clientProxySocket = clientProxySocket;
socketPair.proxyServerSocket = proxyServerSocket;

if(pthread_create(&pid1, NULL, client_server, &socketPair) != 0)
{
cout << "Unable to create pipe client-server thread (pthread_create()" << endl;
}
if(pthread_create(&pid2, NULL, server_client, &socketPair) != 0)
{
cout << "Unable to create pipe server-client thread (pthread_create()" << endl;
}
//delete clientProxySocket;
}


int main()
{
pthread_mutex_init(&mutex, 0);
pthread_mutex_init(&mutex2, 0);
#ifdef WIN32
Initialize();
#endif
ServerSocket s;
s.Bind();
s.Listen();

// memory
while(true)
{
// loop forever, to accept any clients
TCPSocket *clientProxySocket = s.Accept();
ClientSocket *proxyServerSocket = new ClientSocket();

dispatchConnection(clientProxySocket, proxyServerSocket); // function Accept returns dynamic allocated memory
// the client is responsible in release it
}

return 0;
}




Both your TCPSocket::Send and TCPSocket::Recv functions are incorrect and contain bugs.

In Send, you are using uninitailized variables, not returning the actual number of bytes sent, and have implemented subtraction wrong (?!).

In Recv, your loop condition is incorrect, you need to check for a SOCKET_ERROR return value like in send (-1), and you don't really need a loop of any sort.

Please, take the time to review those functions and make a good effort to fix the issues on your own. If you simply post code and ask people to help you fix it, you won't learn anything and will continue to make the same mistakes over and over.

After you fixed your bugs (or at least really tried hard to), this is how simple those functions need to be:
[spoiler]

// Attempts to send 'size' bytes from 'buffer'. The 'buffer' should be at least
// 'size' bytes upon entry and readable. The function returns how many bytes
// were sent. There is no error reporting mechanism built in, so if the number of
// bytes returned does not match the number of bytes requested to be sent, assume
// an error has occurred and obtain the last socket error to know what happened.

// NOTE: This logic is still susceptible to malicious clients that stop receiving so
// larger sends might end up not going through. You can either not try to send the
// entire buffer or add timing code to timeout the send if it is not completed.

int Send( const char * buffer, int size )
{
// TODO: Implement sanity checks for parameters to make sure they are valid

int offset = 0;

// Loop while we have not sent the entire buffer
while( offset < size )
{
int sent = ::send( getSocket(), buffer + offset, size - offset, 0 );
if( sent == SOCKET_ERROR )
{
break;
}

offset += sent;

/*
// TODO: Consider using a platform sleep of the lowest value
// to give the system a chance to catch up since this was a
// partial send. If you do not and are trying to send too much
// to the client, you will loop many, many times and waste
// a lot of useful CPU in the process.
if( offset != size )
{
Sleep(1);
}
*/
}

// Return how many bytes we actually sent
return offset;
}

// Will attempt to receive up to 'size' bytes into 'buffer'. The 'buffer' should be at least
// 'size' bytes upon entry and writable. The function returns the result of recv, so
// SOCKET_ERROR is returned upon an error, 0 is returned upon a disconnect,
// and all other values returned > 0 indicate how many bytes were actually received.
int Recv( char * buffer, int size )
{
// TODO: Implement sanity checks for parameters to make sure they are valid

return ::recv( getSocket(), buffer, size, 0 );
}


Since you are not using select, you should not try to receive with fixed sizes since that opens your program up to denial of service attacks where you are expecting N bytes but only 0..N - 1 are ever sent. That is why the Recv function will return 0..N requested bytes rather than N and no looping is required. As a result, you have to properly handle the return value in the code that calls the function. Likewise is true of the Send function, you must properly handle the return value and check any error codes where necessary.[/spoiler]

Fixing those two functions still won't fix all your problems though. It looks like you are still using one shared buffer for all communication, so your locks are wrong. Specifically, you should be doing: "Lock -> Operation 1 -> Operation 2 -> Unlock" rather than "Lock -> Operation 1 -> Unlock -> Lock -> Operation 2 -> Unlock". In your current code, you now have a data corruption possibility from the second thread using the buffer and overwriting data from the first or vice versa. Alternatively, you can just setup a local receive buffer in each thread and there is no need to lock then.

You should also keep track of the pointers you allocate via new so you can delete them at the program end or when both connections are done. Right now, you will simply leak resources until the program crashes from an out of memory exception. Also, you are not handling the error codes for ::accept properly either. You need to make sure the error is not on the remote side. Right now, someone could make your program exit simply by starting an accept and then purposefully trigger a WSAECONNRESET and all of your connections would go poof. Not good! Is there any particular reason you request winsock 1.1 then set the version to 5 in the Initialize function? It's pretty unnecessary.

Finally, and this is the biggest issue of it all, you start threads to handle your connections, but you exit them after the first send/recv that takes place. This means you are only able to support the first send/recv pair between the program and the remote host, which is pretty useless for anything but simple sites that do not support keep-alive. Reread Antheus's post that shows the logic behind this. You loop while the socket is open and then read some data and immediately send it out to the pair. Since you have a unique buffer for each thread in that case, there is nothing for you to lock/synchronize with the mutexs. Once you do that, you will be able to support traffic past the initial send/recv on the connections. Using a lock and a counter inside the SocketPair object, you can also implement a simple reference counter to know when you can delete the object as well so you do not leak memory.
I follow your adviced and made changes in the implementation,but still it doesnt work good. Sometimes, recv() returns -1 amd 0 but then it continue to work and receivede data from the webserver but id doesnt send it back to the client or viceversa:



Received from client: 865
Sent to web server: 0
Received from client: 865
Sent to web server: 0
Received from client: 865
Sent to web server: 0
Received from client: 865
Sent to web server: 865
You are connected !!!

Received from client: 3
Sent to web server: 3
Error receiving data
No error
Error receiving data
No error
Error receiving data







This is how i implement them:



void* client_server(void* arg)
{
pthread_detach(pthread_self());

char data[1500];
SocketPair* socketPair = (SocketPair*)arg;

int bytes_recv = 0;
int bytes_sent = 0;
while((bytes_recv = socketPair->clientProxySocket->Recv(data, sizeof(data))) > 0)
{
int sent = 0;
int remaining = bytes_recv;
do {
bytes_sent = socketPair->proxyServerSocket->Send(data, bytes_recv);
sent += bytes_sent;
remaining -= bytes_sent;
cout <<"Received from client: " << bytes_recv << endl;
cout << "Sent to web server: " << bytes_sent << endl;
}while(remaining > 0);
}
return NULL;
}

void* server_client(void* arg)
{
pthread_detach(pthread_self());

char data2[1500];
SocketPair* socketPair = (SocketPair*)arg;

int bytes_recv = 0;
int bytes_sent = 0;
while((bytes_recv = socketPair->proxyServerSocket->Recv(data2, sizeof(data2))) > 0)
{
int sent = 0;
int remaining = bytes_recv;
do {
bytes_sent = socketPair->clientProxySocket->Send(data2, bytes_recv);
sent += bytes_sent;
remaining -= bytes_sent;
cout <<"Received from web server: " << bytes_recv << endl;
cout << "Sent to client: " << bytes_sent << endl;
}while(remaining > 0);
}
return NULL;
}

void dispatchConnection(TCPSocket *clientProxySocket, ClientSocket *proxyServerSocket)
{
Handle handle(clientProxySocket);
handle.HandleAuthentication();

string addr = handle.getAddress();
short prt = handle.getPort();

proxyServerSocket->Connect(addr, prt);

// populate the socketPair
socketPair.clientProxySocket = clientProxySocket;
socketPair.proxyServerSocket = proxyServerSocket;

if(pthread_create(&pid1, NULL, client_server, &socketPair) != 0)
{
cout << "Unable to create pipe client-server thread (pthread_create()" << endl;
}
if(pthread_create(&pid2, NULL, server_client, &socketPair) != 0)
{
cout << "Unable to create pipe server-client thread (pthread_create()" << endl;
}
}


int main()
{
Initialize();
ServerSocket s;
s.Bind();
s.Listen();

// memory
while(true)
{
// loop forever, to accept any clients
TCPSocket *clientProxySocket = s.Accept();
ClientSocket *proxyServerSocket = new ClientSocket();

dispatchConnection(clientProxySocket, proxyServerSocket); // function Accept returns dynamic allocated memory
// the client is responsible in release it

delete clientProxySocket;
delete proxyServerSocket;

}

return 0;
}







And the send() and recv() functions i implemented like sugested ( i hope) :



int Send(const char* buffer, int size)
{
int offset = 0;
while(offset < size)
{
int n = ::send(getSocket(), buffer + offset, size - offset, 0);
if(n == SOCKET_ERROR)
{
break;
}
offset += n;
if(offset != size)
{
Sleep(1);
}
}
return offset;
}

int Recv(char* buffer, int size)
{
int n = ::recv(getSocket(), buffer, size, 0);
if(n == SOCKET_ERROR)
{
cout << "Error receiving data" << endl;
cout << strerror(errno) << endl;
}
if(n == 0)
{
cout << "Remote host closed connection" << endl;
}
return n;
}







PS: I'm sorry for just pasting code but i really study and i can't find the answer why it doesnt work good. I don't expect to fix my code, but i will welcome sugestions with what I am doing wrong like you did until now. And trust me,I learn better from mistakes.

Btw, the only thing i didn't understand was the accept() part,how can you figure it out if the error is an the remote side?

Thanks again.

You should not be calling delete on clientProxySocket and proxyServerSocket. You will corrupt the memory used by the threads. Instead, you have to work out a different system to free them after they are no longer needed. For now, don't delete them but keep in mind the memory leak.

Now that you have your core recv/send functions fixed up, you need to simplify your applications send/recv logic. It's way too complex and uses a "too clever" coding style that gets you into trouble. In addition, you will want way larger buffers for a web proxy. You can also increase the internal socket send/recv size to the max, but you can read up on how to do that later.

Your client_server function should look like this:

void * client_server( void * arg )
{
pthread_detach( pthread_self() );

SocketPair * socketPair = (SocketPair*)arg;
char data[65536];

while( true )
{
// First, receive as many bytes as possible
int bytes_recv = socketPair->clientProxySocket->Recv( data, sizeof( data ) );

// 0 for disconnect, -1 for error
if( bytes_recv <= 0)
{
// TODO: Log this event
break;
}

// Now try to send all the bytes
int bytes_sent = socketPair->proxyServerSocket->Send( data, bytes_recv );

// Debugging info
cout <<"Received from client: " << bytes_recv << endl;
cout << "Sent to web server: " << bytes_sent << endl;

// On success, bytes_sent should equal bytes_recv
if( bytes_sent != bytes_recv )
{
// TODO: Log this error since Send should send all the data by design
break;
}
}

return NULL;
}


As you can see, since your core Send function attempts to send all the data, you do not need to do it again yourself. Any errors in the core send function are unrecoverable so you cannot fix that in your application logic. Instead, you just have to compare the final byte count that was sent and proceed from there.

Fix the delete issue first, since that is going to be a huge problem in messing up the execution of your program. Once that is done, fix up your client_server as well as the server_client function (it too should follow a similar format as client_server) and you should be good to go barring other issues in different parts of the code.

This topic is closed to new replies.

Advertisement