• Advertisement
Sign in to follow this  

Passing Concurreny::concurrent_queue by reference to thread

This topic is 1407 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

Hello

 

I am testing out Concurreny and std::thread features that I might want to use. But I am having trouble passing a thread-safe queue to a workerThread by reference.

 

When I push a message in main thread I can't retrieve it in the worker thread. What am I doing wrong?

thread workerThread;
concurrent_queue<string> SendQueue;
concurrent_queue<string> ReceiveQueue;
workerThread = thread(&ThreadRoutine, SendQueue, ReceiveQueue);
ThreadRoutine(concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
{

string SendString;
if(SendQueue.try_pop(SendString))
{
//Never triggers after I push a message from main side.
}

}

If I push a few messages BEFORE I create the thread, then those messages gets carried over. It's like the queues are not passed by reference at all.

 

 

Share this post


Link to post
Share on other sites
Advertisement


But I am having trouble passing a thread-safe queue to a workerThread by reference.

I think that this is not the problem, most likely there's an other reason. Maybe you forgot to start the thread or something like this. More code would be helpful.

Share this post


Link to post
Share on other sites

This is the demo class I wrote for testing:

class Network
{
public:
	Network(void)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, SendQueue, ReceiveQueue);
	}
	void Disconnect()
	{
		SendQueue.push(string("..quit"));
	}
	void Send(string message)
	{
		SendQueue.push(message);
	}
	bool GetMessage(string &message)
	{
		return ReceiveQueue.try_pop(message);
	}

private:
	thread workerThread;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;

	static DWORD ThreadRoutine(string server, short port, concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
	{
		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			cout << "An error occurred while trying to create an ENet client host.\n";
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer;

			enet_address_set_host (& address, server.c_str());
			address.port = port;

			peer = enet_host_connect (client, & address, 2, 0);    
			if (peer == NULL){
				cout << "No available peers for initiating an ENet connection.\n";
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					cout << "Connection to server succeeded.\n";

					bool Quit = false;
					while(!Quit)
					{
						string SendString;
						if(SendQueue.try_pop(SendString))
						{
							if(strcmp(SendString.c_str(), "..quit") == 0){
								cout << "Exiting...\n";
								Quit = true;
								enet_peer_reset(peer);
								continue;
							} else {
								cout << "Sending packet\n";
								ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
								enet_peer_send (peer, 0, packet);
							}
						}

						if(enet_host_service(client, &event, 1000) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
									event.packet -> dataLength,
									event.packet -> data,
									event.peer -> data,
									event.channelID);
								ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								cout << "Disconected.\n";
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset (peer);
					cout << "Connection to server failed.\n";
				}

			}

		}

		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

Share this post


Link to post
Share on other sites

I think I fixed it.

 

Apperantly, I need to wrap it in std::ref

void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, ref(SendQueue), ref(ReceiveQueue));
	}

I have never heard of std::ref before....

 

 

For anyone interested:

class Network
{
public:
	Network(void)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		workerThread = thread(&Network::ThreadRoutine, server, port, ref(SendQueue), ref(ReceiveQueue));
		workerThread.detach();
	}
	void Disconnect()
	{
		SendQueue.push(string("..quit"));
	}
	void Send(string message)
	{
		SendQueue.push(message);
	}
	bool GetMessage(string &message)
	{
		return ReceiveQueue.try_pop(message);
	}

private:
	thread workerThread;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;

	static DWORD ThreadRoutine(string server, short port, concurrent_queue<string> &SendQueue, concurrent_queue<string> &ReceiveQueue)
	{
		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			ReceiveQueue.push(string("An error occurred while trying to create an ENet client host."));
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer = NULL;

			enet_address_set_host (& address, server.c_str());
			address.port = port;

			peer = enet_host_connect (client, & address, 2, 0);    
			if (peer == NULL){
				ReceiveQueue.push(string("No available peers for initiating an ENet connection."));
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					ReceiveQueue.push(string("Connection to server succeeded."));
					bool Quit = false;
					while(!Quit)
					{
						string SendString;
						if(SendQueue.try_pop(SendString))
						{
							if(strcmp(SendString.c_str(), "..quit") == 0){
								ReceiveQueue.push(string("Exiting worker thread."));
								Quit = true;
								enet_peer_disconnect(peer, 0);
								if(enet_host_service (client, & event, 3000) > 0)
								{
									switch (event.type)
									{
									case ENET_EVENT_TYPE_RECEIVE:
										enet_packet_destroy (event.packet);
										break;
									case ENET_EVENT_TYPE_DISCONNECT:
										ReceiveQueue.push(string("Disconnection succeeded."));
										continue;
									}
								}

								enet_peer_reset(peer);
								continue;
							} else {
								ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
								enet_peer_send (peer, 0, packet);
							}
						}

						if(enet_host_service(client, &event, 1) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								/*printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
									event.packet -> dataLength,
									event.packet -> data,
									event.peer -> data,
									event.channelID);*/
								ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								ReceiveQueue.push(string("Disconnected."));
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset (peer);
					ReceiveQueue.push(string("Connection to server failed."));
				}

			}

		}

		SendQueue.clear();
		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

Edited by Tispe

Share this post


Link to post
Share on other sites

If anyone cares I removed a race condition:

class concurrentIO : public std::enable_shared_from_this<concurrentIO>
{
public:
	shared_ptr<concurrentIO> NoRaceCondition;
	concurrent_queue<string> SendQueue;
	concurrent_queue<string> ReceiveQueue;
};

class Network
{
public:
	Network(void) : Connected(false)
	{
	}
	~Network(void)
	{
		Disconnect();
	}

	void Connect(string server, short port)
	{
		spFIFOs = make_shared<concurrentIO>();
		spFIFOs->NoRaceCondition = spFIFOs;
		thread workerThread = thread(&Network::ThreadRoutine, server, port, spFIFOs.get());
		workerThread.detach();
		Connected = true;
	}
	void Disconnect()
	{
		spFIFOs.reset();
		Connected = false;
	}
	bool Send(string message)
	{
		if(Connected)
		{
			spFIFOs->SendQueue.push(message);
		}

		return Connected;
	}
	bool GetMessage(string &message)
	{
		if(Connected)
		{
			return spFIFOs->ReceiveQueue.try_pop(message);
		}

		return false;
	}

private:
	bool Connected;
	shared_ptr<concurrentIO> spFIFOs;

	static DWORD ThreadRoutine(string server, short port, LPVOID param)
	{
		shared_ptr<concurrentIO> spFIFOs = static_cast<concurrentIO* >(param)->shared_from_this();
		spFIFOs->NoRaceCondition.reset();

		if (enet_initialize () != 0){
			return 1;
		}

		ENetHost* client = NULL;
		client = enet_host_create (NULL, 1, 2, 0, 0);
		if(client == NULL){
			spFIFOs->ReceiveQueue.push(string("An error occurred while trying to create an ENet client host."));
		} else {
			ENetAddress address;
			ENetEvent event;
			ENetPeer *peer = NULL;

			enet_address_set_host(& address, server.c_str());
			address.port = port;

			peer = enet_host_connect(client, & address, 2, 0);    
			if (peer == NULL){
				spFIFOs->ReceiveQueue.push(string("No available peers for initiating an ENet connection."));
			} else {
				if (enet_host_service (client, & event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT){
					spFIFOs->ReceiveQueue.push(string("Connection to server succeeded."));

					bool Quit = false;
					while(!Quit)		
					{
						if(spFIFOs.unique())			//if main side resets its shared pointer to signal Disconnect
						{
							enet_peer_disconnect(peer, 0);
							if(enet_host_service (client, & event, 3000) > 0)
							{
								switch (event.type)
								{
								case ENET_EVENT_TYPE_RECEIVE:
									enet_packet_destroy (event.packet);
									break;
								case ENET_EVENT_TYPE_DISCONNECT:
									spFIFOs->ReceiveQueue.push(string("Disconnection succeeded."));
									continue;
								}
							}
							enet_peer_reset(peer);
							Quit = true;
							continue;
						}

						string SendString;
						while(spFIFOs->SendQueue.try_pop(SendString))
						{
							ENetPacket* packet = enet_packet_create (SendString.c_str(), SendString.length() + 1, ENET_PACKET_FLAG_RELIABLE);
							enet_peer_send(peer, 0, packet);
						}

						if(enet_host_service(client, &event, 0) > 0)
						{
							switch (event.type)
							{
							case ENET_EVENT_TYPE_RECEIVE:
								/*printf ("A packet of length %u containing %s was received from %s on channel %u.\n",
								event.packet -> dataLength,
								event.packet -> data,
								event.peer -> data,
								event.channelID);*/
								spFIFOs->ReceiveQueue.push(string((char*)event.packet->data));
								enet_packet_destroy (event.packet);
								break;
							case ENET_EVENT_TYPE_DISCONNECT:
								spFIFOs->ReceiveQueue.push(string("Disconnected."));
								Quit = true;
							}
						}
					}

				} else {
					enet_peer_reset(peer);
					spFIFOs->ReceiveQueue.push(string("Connection to server failed."));
				}

			}

		}

		spFIFOs->SendQueue.clear();
		if(client) enet_host_destroy(client);
		enet_deinitialize();
		return 0;
	}
};

Share this post


Link to post
Share on other sites
Sign in to follow this  

  • Advertisement