Switching from CreateThread() to Thread

Started by
15 comments, last by Aardvajk 9 years, 4 months ago
I made a simple chat program using a simple console window, but have one problem. Originally both my client and server were using CreateThread(), but saw some more benefits of using the thread header so I began switching it. I did the client first and so far everything works with no problems. But then I tried the server end, and the program went haywire in the loop. With no client open at all, simply running the server kept accepting clients and disconnecting clients over and over rapidly. Its suppose to pause at accept and bare in mind, no client was open at all!

After careful debugging and putting those couple of lines of code back the way it was back to CreateThread(), I found simply having #include <thread> uncommented was making my loop go haywire. I wasnt even doing anything with the header. Whats making my program act that way from that thread header? Thanks in advance
Advertisement

Anyways sorry that I didnt post any code to show you what I mean. I posted that off my phone. Now that I'm home, here is what I'm talking about.

This server code goes haywire if you were to uncomment #include <thread>. Otherwise it works great.


#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <string>

//NOTE: Uncommenting this makes the program
//              go haywire
//#include <thread>

using namespace std;

#pragma comment (lib, "Ws2_32.lib")

#define IP_ADDRESS "192.168.56.1"
#define DEFAULT_PORT "3504"
#define DEFAULT_BUFLEN 512

struct client_type
{
    int id;
    SOCKET socket;
};

const char option_value = 1;
const int maxClients = 5;

//Function Prototypes
DWORD WINAPI process_client(LPVOID Parameter);
int main();

WSADATA wsaData;

struct addrinfo hints;
struct addrinfo *server = NULL;

SOCKET server_socket = INVALID_SOCKET;
client_type client[maxClients];

int num_clients = 0;

string msg = "";
char tempmsg[DEFAULT_BUFLEN] = "";

int temp_id = -1;

DWORD WINAPI process_client(LPVOID Parameter)
{
    client_type local_socket;
    local_socket = *(static_cast<client_type *>(Parameter));

    //Session
    if (num_clients < maxClients)
    {
        cout << "Client #" << local_socket.id << " Accepted" << endl;
        num_clients++;
    
        while (1)
        {
            memset(tempmsg, 0, DEFAULT_BUFLEN);

            if (local_socket.socket != 0)
            {
                int iResult = recv(local_socket.socket, tempmsg, DEFAULT_BUFLEN, 0);

                if (iResult > 0)
                {
                    if (strcmp("", tempmsg))
                        msg = "Client #" + to_string(local_socket.id) + ": " + tempmsg;

                    cout << msg.c_str() << endl;

                    //Broadcast that message to the other clients
                    for (int i = 0; i < maxClients; i++)
                    {
                        if (client[i].socket != INVALID_SOCKET)
                            iResult = send(client[i].socket, msg.c_str(), strlen(msg.c_str()), 0);
                    }
                }
                else
                {
                    msg = "Client #" + to_string(local_socket.id) + " Disconnected";

                    cout << msg << endl;

                    closesocket(local_socket.socket);
                    closesocket(client[local_socket.id].socket);
                    client[local_socket.id].socket = INVALID_SOCKET;
                    num_clients--;
                    if (num_clients <= 0)
                        num_clients = 0;

                    //Broadcast that message to the other clients
                    for (int i = 0; i < maxClients; i++)
                    {
                        if (client[i].socket != INVALID_SOCKET)
                            iResult = send(client[i].socket, msg.c_str(), strlen(msg.c_str()), 0);
                    }


                    //Create a temporary id for the next client
                    temp_id = -1;
                    for (int i = 0; i < maxClients; i++)
                    {
                        if (client[i].socket == INVALID_SOCKET)
                        {
                            client[i].id = i;
                            temp_id = i;
                            break;
                        }
                    }
                    break;
                }
            }
        }
    }
    else
    {
        cout << "Server is full." << endl;
    }

    return 0;
}

int main()
{
    //Initialize Winsock
    cout << "Intializing Winsock..." << endl;
    WSAStartup(MAKEWORD(2, 2), &wsaData);

    //Setup hints
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    //Setup Server
    cout << "Setting up server..." << endl;
    getaddrinfo(static_cast<LPCTSTR>(IP_ADDRESS), DEFAULT_PORT, &hints, &server);

    //Create a listening socket for connecting to server
    cout << "Creating server socket..." << endl;
    server_socket = socket(server->ai_family, server->ai_socktype, server->ai_protocol);

    //Setup socket options
    setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); //Make it possible to re-bind to a port that was used within the last 2 minutes
    setsockopt(server_socket, IPPROTO_TCP, TCP_NODELAY, &option_value, sizeof(int)); //Used for interactive programs

    //Assign an address to the server socket.
    cout << "Binding socket..." << endl;
    bind(server_socket, server->ai_addr, (int)server->ai_addrlen);

    //Listen for incoming connections.
    cout << "Listening..." << endl;
    listen(server_socket, SOMAXCONN);

    for (int i = 0; i < maxClients; i++)
    {
        client[i] = {-1, INVALID_SOCKET };
    }

    while (1)
    {
        //Create a temporary id for the next client
        temp_id = -1;
        for (int i = 0; i < maxClients; i++)
        {
            if (client[i].socket == INVALID_SOCKET)
            {
                client[i].id = i;
                temp_id = i;
                break;
            }
        }

        client[temp_id].socket = accept(server_socket, NULL, NULL);

        //Broadcast the id to that client
        if (client[temp_id].socket != INVALID_SOCKET)
        {
            msg = to_string(client[temp_id].id);
            send(client[temp_id].socket, msg.c_str(), strlen(msg.c_str()), 0);
        }

        DWORD threadID;
        CreateThread(NULL, 0, process_client, (LPVOID)&client[temp_id], 0, &threadID);
    }

    //Close listening socket
    closesocket(server_socket);

    //Close client socket
    for (int i = 0; i < maxClients; i++)
    {
        closesocket(client[i].socket);
    }

    //Clean up Winsock
    WSACleanup();
    cout << "Program has ended successfully" << endl;

    system("pause");
    return 0;
}

I want to be able to switch from CreateThread to using Thread though since my client side uses Thread and maintain consistency. Any bright ideas? Thanks:


#include <winsock2.h>
#include <ws2tcpip.h>
#include <iostream>
#include <string>
#include <thread>

using namespace std;

#pragma comment (lib, "Ws2_32.lib")

#define DEFAULT_BUFLEN 512            
#define IP_ADDRESS "192.168.56.1"
#define DEFAULT_PORT "3504"

struct client_type
{
    SOCKET socket;
    int id;
    char received_message[DEFAULT_BUFLEN];
};

int process_client(client_type &new_client);
int main();

WSAData wsa_data;
struct addrinfo *result = NULL, *ptr = NULL, hints;
string sent_message = "";
client_type client = { INVALID_SOCKET, -1, ""};
int iResult = 0;

int process_client(client_type &new_client)
{
    while (1)
    {
        memset(new_client.received_message, 0, DEFAULT_BUFLEN);

        if (new_client.socket != 0)
        {
            int iResult = recv(new_client.socket, new_client.received_message, DEFAULT_BUFLEN, 0);

            if (iResult > 0)
                cout << new_client.received_message << endl;
            else if (iResult <= 0)
            {
                cout << "recv() failed: " << WSAGetLastError() << endl;
                break;
            }
        }

    }

    return 0;
}

int main()
{
    cout << "Starting Client...\n";

    // Initialize Winsock
    iResult = WSAStartup(MAKEWORD(2, 2), &wsa_data);
    if (iResult != 0) {
        cout << "WSAStartup() failed with error: " << iResult << endl;
        return 1;
    }

    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    cout << "Connecting...\n";

    // Resolve the server address and port
    iResult = getaddrinfo(static_cast<LPCTSTR>(IP_ADDRESS), DEFAULT_PORT, &hints, &result);
    if (iResult != 0) {
        cout << "getaddrinfo() failed with error: " << iResult << endl;
        WSACleanup();
        system("pause");
        return 1;
    }

    // Attempt to connect to an address until one succeeds
    for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {

        // Create a SOCKET for connecting to server
        client.socket = socket(ptr->ai_family, ptr->ai_socktype,
            ptr->ai_protocol);
        if (client.socket == INVALID_SOCKET) {
            cout << "socket() failed with error: " << WSAGetLastError() << endl;
            WSACleanup();
            system("pause");
            return 1;
        }

        // Connect to server.
        iResult = connect(client.socket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            closesocket(client.socket);
            client.socket = INVALID_SOCKET;
            continue;
        }
        break;
    }

    freeaddrinfo(result);

    if (client.socket == INVALID_SOCKET) {
        cout << "Unable to connect to server!" << endl;
        WSACleanup();
        system("pause");
        return 1;
    }

    cout << "Successfully Connected" << endl << endl;

    //Obtain id from server for this client;
    recv(client.socket, client.received_message, DEFAULT_BUFLEN, 0);
    client.id = atoi(client.received_message);

    thread my_thread(process_client, client);

    while (1)
    {
        getline(cin, sent_message);
        iResult = send(client.socket, sent_message.c_str(), strlen(sent_message.c_str()), 0);
        
        if (iResult <= 0)
        {
            cout << "sent() failed: " << WSAGetLastError() << endl;
            break;
        }
    }

    //Shutdown the connection since no more data will be sent
    my_thread.detach();
    cout << "Shutting down socket..." << endl;
    iResult = shutdown(client.socket, SD_SEND);
    if (iResult == SOCKET_ERROR) {
        cout << "shutdown() failed with error: " << WSAGetLastError() << endl;
        closesocket(client.socket);
        WSACleanup();
        system("pause");
        return 1;
    }

    closesocket(client.socket);
    WSACleanup();
    system("pause");
    return 0;
}
Your code is not thread safe and is full of potential race conditions and other bugs. You should address the problems first before adding a complicating factor like switching thread APIs.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

At its current state, how is it not thread safe already if its working ok? Maybe I missed something perhaps?

That is why Im switching to Thread because I want to add more arguments and reduce any globals possible, and have control over it. But that #include statement really is messing it up for me and I still cant figure out why.

An extra extra header, even if you don't use it, can change what the optimizer does with your code, so if you have undefined behavior or timing/threading issues in your program this can change everything.
Having multiple threads sending/recving on the same sockets without external synchronization is begging for problems. One thread can easily grab state from another thread's operations, for example.

You have a server bug which will crash your program if you run out of free sockets. Study your use of temp_id carefully.

All uses of your global variables represent thread safety bugs. ALL of them. You need to look into race conditions, reentrancy bugs, and thread interleaved execution to find some of them, but it should be intuitively obvious that you can't just stuff things into a global variable while multiple threads are operating on that data, and expect everything to be fine.



So yes, you're missing something. A lot of somethings. The fact that you think this "works" at all is one of the dangerous pitfalls of threaded programming: it can appear to be fine until a tiny change is introduced, or worse, time goes by; and then all hell breaks loose. You're going to need to develop a mentality of analyzing your programs. Your current mentality is to write code and then poke it until it seems to do what you want; this is no longer a sufficient mode of operation. For better or worse, you're going to have to do a lot more hard thinking if you want to do multithreaded networking.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Just because you application works doesn't means it thread-safe. That is the beauty or curse of the non-deterministic nature of multithreaded application. If the code is not thread-safe, then of course its going to break on simple changes. Switching threading library shouldn't cause a working application to go haywire provided there is one-to-one correspondence in the expression of the threading primitives. Writing threaded code without taking into consideration synchronization issues is tantamount to coding suicide. Like ApochPiQ said, the code is riddle with issues that are not thread-safe. I suggest you take a step back and reason at a higher level how each subsystem could be potentially accessed at the SAME time via multiple threads and what would/should happen in that case.

To be fair, though, switching to C++11's threading primitives in <thread> and then also making use of C++11 <atomic> and <mutex> is probably the easiest way to get up to speed on how to do proper multithreading in C++.

Sean Middleditch – Game Systems Engineer – Join my team!

I agree that using the standard library is a good idea. However, IMO, it's a little bit cart-before-horse to try to learn <thread> and company before learning the fundamentals of how threading works and how to write threaded code. There are theory requirements, and just diving in and hacking around is likely to develop into extremely bad habits. So I advocate for learning the theory first, and then learning application via <thread> et. al.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

Well I just did some more experimenting. I commented anything that had to do with threading at all, including the function (even the client end I removed all threading) and moved all my variables inside main(). Uncommenting #include <thread> makes the server unable to accept clients, even if I put a while loop around the accept() function which you see. But if I comment out the #include statement, the client connects just fine. So the <thread> library is fiddling with the winsock function for some reason.


#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <string>

//NOTE: Uncommenting this makes the program
//              go haywire
//#include <thread>

using namespace std;

#pragma comment (lib, "Ws2_32.lib")

#define IP_ADDRESS "192.168.56.1"
#define DEFAULT_PORT "3504"
#define DEFAULT_BUFLEN 512

struct client_type
{
    int id;
    SOCKET socket;
};

//Function Prototypes
//DWORD WINAPI process_client(LPVOID Parameter);
//int process_client(client_type &new_client, int &num_clients, int &new_id);

int main();

//DWORD WINAPI process_client(LPVOID Parameter)
//int process_client(client_type &new_client, int &num_clients, int &new_id)
//{
    //Session
    //if (num_clients < maxClients)
    //{
        //cout << "Client #" << new_client.id << " Accepted" << endl;
        //num_clients++;

        //while (1)
        //{
        //    memset(tempmsg, 0, DEFAULT_BUFLEN);

        //    if (local_socket.socket != 0)
        //    {
        //        int iResult = recv(local_socket.socket, tempmsg, DEFAULT_BUFLEN, 0);

        //        if (iResult > 0)
        //        {
        //            if (strcmp("", tempmsg))
        //                msg = "Client #" + to_string(local_socket.id) + ": " + tempmsg;

        //            cout << msg.c_str() << endl;

        //            //Broadcast that message to the other clients
        //            for (int i = 0; i < maxClients; i++)
        //            {
        //                if (client[i].socket != INVALID_SOCKET)
        //                    iResult = send(client[i].socket, msg.c_str(), strlen(msg.c_str()), 0);
        //            }
        //        }
        //        else
        //        {
        //            msg = "Client #" + to_string(local_socket.id) + " Disconnected";

        //            cout << msg << endl;

        //            closesocket(local_socket.socket);
        //            closesocket(client[local_socket.id].socket);
        //            client[local_socket.id].socket = INVALID_SOCKET;
        //            num_clients--;
        //            if (num_clients <= 0)
        //                num_clients = 0;

        //            //Broadcast that message to the other clients
        //            for (int i = 0; i < maxClients; i++)
        //            {
        //                if (client[i].socket != INVALID_SOCKET)
        //                    iResult = send(client[i].socket, msg.c_str(), strlen(msg.c_str()), 0);
        //            }


        //            //Create a temporary id for the next client
        //            temp_id = -1;
        //            for (int i = 0; i < maxClients; i++)
        //            {
        //                if (client[i].socket == INVALID_SOCKET)
        //                {
        //                    client[i].id = i;
        //                    temp_id = i;
        //                    break;
        //                }
        //            }
        //            break;
        //        }
        //    }
        //}
    //}
    //else
    //{
    //    cout << "Server is full." << endl;
    //}

    //return 0;
//}

int main()
{
    const char option_value = 1;
    const int maxClients = 5;

    WSADATA wsaData;

    struct addrinfo hints;
    struct addrinfo *server = NULL;

    SOCKET server_socket = INVALID_SOCKET;
    client_type client[maxClients];

    int num_clients = 0;

    string msg = "";
    char tempmsg[DEFAULT_BUFLEN] = "";

    int temp_id = -1;


    //Initialize Winsock
    cout << "Intializing Winsock..." << endl;
    WSAStartup(MAKEWORD(2, 2), &wsaData);

    //Setup hints
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    //Setup Server
    cout << "Setting up server..." << endl;
    getaddrinfo(static_cast<LPCTSTR>(IP_ADDRESS), DEFAULT_PORT, &hints, &server);

    //Create a listening socket for connecting to server
    cout << "Creating server socket..." << endl;
    server_socket = socket(server->ai_family, server->ai_socktype, server->ai_protocol);

    //Setup socket options
    setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int)); //Make it possible to re-bind to a port that was used within the last 2 minutes
    setsockopt(server_socket, IPPROTO_TCP, TCP_NODELAY, &option_value, sizeof(int)); //Used for interactive programs

    //Assign an address to the server socket.
    cout << "Binding socket..." << endl;
    bind(server_socket, server->ai_addr, (int)server->ai_addrlen);

    //Listen for incoming connections.
    cout << "Listening..." << endl;
    listen(server_socket, SOMAXCONN);

    for (int i = 0; i < maxClients; i++)
    {
        client[i] = {-1, INVALID_SOCKET };
    }

    //thread my_thread;

    while (1)
    {
        cout << "MAIN: Begining loop" << endl;

        //Create a temporary id for the next client
        temp_id = -1;
        for (int i = 0; i < maxClients; i++)
        {
            if (client[i].socket == INVALID_SOCKET)
            {
                client[i].id = i;
                temp_id = i;
                break;
            }
        }
        
        cout << client[temp_id].socket << ", " << INVALID_SOCKET << endl;

        while (client[temp_id].socket == INVALID_SOCKET)
        {
            client[temp_id].socket = accept(server_socket, NULL, NULL);
        }

        //Broadcast the id to that client
        if (client[temp_id].socket != INVALID_SOCKET)
        {
            msg = to_string(client[temp_id].id);
            send(client[temp_id].socket, msg.c_str(), strlen(msg.c_str()), 0);
        }

        cout << client[temp_id].socket << ", " << INVALID_SOCKET << endl;

        //my_thread = thread(process_client, client[temp_id], num_clients, temp_id);


        cout << "TEST NUM_CLIENTS: " << num_clients << endl;
        cout << "TEST TEMP_ID: " << temp_id << endl;



    //    DWORD threadID;
    //    CreateThread(NULL, 0, process_client, (LPVOID)&client[temp_id], 0, &threadID);
    }

    //if ((int)(my_thread.native_handle()) != 0xCCCCCCCC)
        //my_thread.detach();

    //Close listening socket
    closesocket(server_socket);

    //Close client socket
    for (int i = 0; i < maxClients; i++)
    {
        closesocket(client[i].socket);
    }

    //Clean up Winsock
    WSACleanup();
    cout << "Program has ended successfully" << endl;

    system("pause");
    return 0;
}

This topic is closed to new replies.

Advertisement