Passing Concurreny::concurrent_queue by reference to thread

Started by
3 comments, last by Tispe 10 years, 1 month ago

Hello

I am testing out Concurreny and std::thread features that I might want to use. But I am having trouble passing a thread-safe queue to a workerThread by reference.

When I push a message in main thread I can't retrieve it in the worker thread. What am I doing wrong?


thread workerThread;
concurrent_queue<string> SendQueue;
concurrent_queue<string> ReceiveQueue;

workerThread = thread(&ThreadRoutine, SendQueue, ReceiveQueue);

ThreadRoutine(concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
{

string SendString;
if(SendQueue.try_pop(SendString))
{
//Never triggers after I push a message from main side.
}

}

If I push a few messages BEFORE I create the thread, then those messages gets carried over. It's like the queues are not passed by reference at all.

Advertisement


But I am having trouble passing a thread-safe queue to a workerThread by reference.

I think that this is not the problem, most likely there's an other reason. Maybe you forgot to start the thread or something like this. More code would be helpful.

This is the demo class I wrote for testing:


class Network
{
public:
	Network(void)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, SendQueue, ReceiveQueue);
	}
	void Disconnect()
	{
		SendQueue.push(string("..quit"));
	}
	void Send(string message)
	{
		SendQueue.push(message);
	}
	bool GetMessage(string &message)
	{
		return ReceiveQueue.try_pop(message);
	}

private:
	thread workerThread;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;

	static DWORD ThreadRoutine(string server, short port, concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
	{
		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			cout << "An error occurred while trying to create an ENet client host.\n";
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer;

			enet_address_set_host (& address, server.c_str());
			address.port = port;

			peer = enet_host_connect (client, & address, 2, 0);    
			if (peer == NULL){
				cout << "No available peers for initiating an ENet connection.\n";
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					cout << "Connection to server succeeded.\n";

					bool Quit = false;
					while(!Quit)
					{
						string SendString;
						if(SendQueue.try_pop(SendString))
						{
							if(strcmp(SendString.c_str(), "..quit") == 0){
								cout << "Exiting...\n";
								Quit = true;
								enet_peer_reset(peer);
								continue;
							} else {
								cout << "Sending packet\n";
								ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
								enet_peer_send (peer, 0, packet);
							}
						}

						if(enet_host_service(client, &event, 1000) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
									event.packet -> dataLength,
									event.packet -> data,
									event.peer -> data,
									event.channelID);
								ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								cout << "Disconected.\n";
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset (peer);
					cout << "Connection to server failed.\n";
				}

			}

		}

		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

I think I fixed it.

Apperantly, I need to wrap it in std::ref


void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, ref(SendQueue), ref(ReceiveQueue));
	}

I have never heard of std::ref before....

For anyone interested:


class Network
{
public:
	Network(void)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, ref(SendQueue), ref(ReceiveQueue));
		workerThread.detach();
	}
	void Disconnect()
	{
		SendQueue.push(string("..quit"));
	}
	void Send(string message)
	{
		SendQueue.push(message);
	}
	bool GetMessage(string &message)
	{
		return ReceiveQueue.try_pop(message);
	}

private:
	thread workerThread;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;

	static DWORD ThreadRoutine(string server, short port, concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
	{
		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			ReceiveQueue.push(string("An error occurred while trying to create an ENet client host."));
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer = NULL;

			enet_address_set_host (& address, server.c_str());
			address.port = port;

			peer = enet_host_connect (client, & address, 2, 0);    
			if (peer == NULL){
				ReceiveQueue.push(string("No available peers for initiating an ENet connection."));
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					ReceiveQueue.push(string("Connection to server succeeded."));
					bool Quit = false;
					while(!Quit)
					{
						string SendString;
						if(SendQueue.try_pop(SendString))
						{
							if(strcmp(SendString.c_str(), "..quit") == 0){
								ReceiveQueue.push(string("Exiting worker thread."));
								Quit = true;
								enet_peer_disconnect(peer, 0);
								if(enet_host_service (client, & event, 3000) > 0)
								{
									switch (event.type)
									{
									case ENET_EVENT_TYPE_RECEIVE:
										enet_packet_destroy (event.packet);
										break;
									case ENET_EVENT_TYPE_DISCONNECT:
										ReceiveQueue.push(string("Disconnection succeeded."));
										continue;
									}
								}

								enet_peer_reset(peer);
								continue;
							} else {
								ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
								enet_peer_send (peer, 0, packet);
							}
						}

						if(enet_host_service(client, &event, 1) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								/*printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
									event.packet -> dataLength,
									event.packet -> data,
									event.peer -> data,
									event.channelID);*/
								ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								ReceiveQueue.push(string("Disconnected."));
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset (peer);
					ReceiveQueue.push(string("Connection to server failed."));
				}

			}

		}

		SendQueue.clear();
		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

If anyone cares I removed a race condition:


class concurrentIO : public std::enable_shared_from_this<concurrentIO>
{
public:
	shared_ptr<concurrentIO> NoRaceCondition;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;
};

class Network
{
public:
	Network(void) : Connected(false)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		spFIFOs = make_shared<concurrentIO>();
		spFIFOs->NoRaceCondition = spFIFOs;
		thread workerThread = thread(&Network::ThreadRoutine, server, port, spFIFOs.get());
		workerThread.detach();
		Connected = true;
	}
	void Disconnect()
	{
		spFIFOs.reset();
		Connected = false;
	}
	bool Send(string message)
	{
		if(Connected)
		{
			spFIFOs->SendQueue.push(message);
		}

		return Connected;
	}
	bool GetMessage(string &message)
	{
		if(Connected)
		{
			return spFIFOs->ReceiveQueue.try_pop(message);
		}

		return false;
	}

private:
	bool Connected;
	shared_ptr<concurrentIO> spFIFOs;

	static DWORD ThreadRoutine(string server, short port, LPVOID param)
	{
		shared_ptr<concurrentIO> spFIFOs = static_cast<concurrentIO* >(param)->shared_from_this();
		spFIFOs->NoRaceCondition.reset();

		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			spFIFOs->ReceiveQueue.push(string("An error occurred while trying to create an ENet client host."));
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer = NULL;

			enet_address_set_host(& address, server.c_str());
			address.port = port;

			peer = enet_host_connect(client, & address, 2, 0);    
			if (peer == NULL){
				spFIFOs->ReceiveQueue.push(string("No available peers for initiating an ENet connection."));
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					spFIFOs->ReceiveQueue.push(string("Connection to server succeeded."));

					bool Quit = false;
					while(!Quit)		
					{
						if(spFIFOs.unique())			//if main side resets its shared pointer to signal Disconnect
						{
							enet_peer_disconnect(peer, 0);
							if(enet_host_service (client, & event, 3000) > 0)
							{
								switch (event.type)
								{
								case ENET_EVENT_TYPE_RECEIVE:
									enet_packet_destroy (event.packet);
									break;
								case ENET_EVENT_TYPE_DISCONNECT:
									spFIFOs->ReceiveQueue.push(string("Disconnection succeeded."));
									continue;
								}
							}
							enet_peer_reset(peer);
							Quit = true;
							continue;
						}

						string SendString;
						while(spFIFOs->SendQueue.try_pop(SendString))
						{
							ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
							enet_peer_send(peer, 0, packet);
						}

						if(enet_host_service(client, &event, 0) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								/*printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
								event.packet -> dataLength,
								event.packet -> data,
								event.peer -> data,
								event.channelID);*/
								spFIFOs->ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								spFIFOs->ReceiveQueue.push(string("Disconnected."));
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset(peer);
					spFIFOs->ReceiveQueue.push(string("Connection to server failed."));
				}

			}

		}

		spFIFOs->SendQueue.clear();
		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

This topic is closed to new replies.

Advertisement