Calling a virtual function, illegal call of non-static member function?

Started by
24 comments, last by rip-off 10 years, 2 months ago

I have not figured out how/if instances of this WorkerThread can deallocate themselves after being cleared from the ThreadPool. Until I resolve this I guess I have to send a message telling the worker function to break its loop and clean up. But I have a KillThread method just in case.

Any comments or suggestions for this code?


#pragma once
class WorkerThread
{
public:
	WorkerThread(DWORD i) : m_Quit(false), m_Shutdown(false)
	{
		InitializeCriticalSection(&Request_Mutex);
		InitializeCriticalSection(&Completed_Mutex);
		hThread = CreateThread(NULL, 0, WorkerThread::ThreadRoutine, this, NULL, NULL);
	}

	~WorkerThread(void)
	{
		if(hThread)
		{
			CloseHandle(hThread);
		}

		DeleteCriticalSection(&Request_Mutex);
		DeleteCriticalSection(&Completed_Mutex);
	}

	void TickMain();	//Only called by Client code
	bool SendRequest(std::shared_ptr<InstructionBase> spNewRequest);
	bool GetCompleted(std::shared_ptr<InstructionBase> &spNewRequest);
	void KillWorkerThread();

protected:
	bool m_Quit, m_Shutdown;
	virtual void ThreadFunction() = 0;

	//Functions called by ThreadFunction()
	void ThreadGet();
	void ThreadSend();

	std::queue<std::shared_ptr<InstructionBase> > Thread_Requests;
	std::vector<std::shared_ptr<InstructionBase> > Thread_Completed;

private:
	//Functions called by TickMain()
	void MainSend();
	void MainGet();

	//Main side containers
	std::vector<std::shared_ptr<InstructionBase> > Main_Requests;
	std::queue<std::shared_ptr<InstructionBase> > Main_Completed;

	//In between containers and mutex
	CRITICAL_SECTION Request_Mutex;
	CRITICAL_SECTION Completed_Mutex;
	std::vector<std::shared_ptr<InstructionBase> > Mutex_Requests;
	std::vector<std::shared_ptr<InstructionBase> > Mutex_Completed;

	//Thread system
	HANDLE hThread;
	static DWORD WINAPI ThreadRoutine(LPVOID param)
	{
		static_cast<WorkerThread*>(param)->ThreadFunction();
		return 0;
	}
};

class DerivedA : public WorkerThread
{
public:
	DerivedA(DWORD i) : WorkerThread(i)
	{
	}

private:
	void ThreadFunction()
	{
		//Thread variables

		while(!m_Quit){
			void ThreadGet();

			if(Thread_Requests.empty()){
				Sleep(1);
			} else 	{
				std::shared_ptr<InstructionBase> spNewRequest = Thread_Requests.front();
				Thread_Requests.pop();

				//Do work on spNewRequest

				Thread_Completed.push_back(spNewRequest);				
			}

			void ThreadSend();
		}
	}
}; 

#include "WorkerThread.h"

void WorkerThread::KillWorkerThread()
{
	m_Quit = true;
	m_Shutdown = true;
	if(hThread != NULL)
	{
		Sleep(50);
		TerminateThread(hThread, 0);
		hThread = NULL;
	}
}

bool WorkerThread::SendRequest(std::shared_ptr<InstructionBase> spNewRequest)
{
	if(m_Quit || m_Shutdown || hThread == NULL){
		return false;
	}

	Main_Requests.push_back(spNewRequest);
	return true;
}

bool WorkerThread::GetCompleted(std::shared_ptr<InstructionBase> &spNewRequest)
{
	if(Main_Completed.empty()){
		return false;
	}

	spNewRequest = Main_Completed.front();
	Main_Completed.pop();

	return true;
}

void WorkerThread::TickMain()
{
	MainSend();
	MainGet();
}

void WorkerThread::MainSend()
{
	if(Main_Requests.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Main_Requests.size();i++){
			Mutex_Requests.push_back(Main_Requests[i]);
		}

		LeaveCriticalSection(&Request_Mutex);
		Main_Requests.clear();								
	}
}

void WorkerThread::MainGet()
{
	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Mutex_Completed.size();i++){
			Main_Completed.push(Mutex_Completed[i]);
		}

		Mutex_Completed.clear();
		LeaveCriticalSection(&Completed_Mutex);
	}
}

void WorkerThread::ThreadSend()
{
	if(Thread_Completed.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Thread_Completed.size();i++){
			Mutex_Completed.push_back(Thread_Completed[i]);
		}

		LeaveCriticalSection(&Completed_Mutex);
		Thread_Completed.clear();
	}
}

void WorkerThread::ThreadGet()
{
	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Mutex_Requests.size();i++){
			Thread_Requests.push(Mutex_Requests[i]);
		}

		Mutex_Requests.clear();
		LeaveCriticalSection(&Request_Mutex);
	}
} 
Advertisement

Just to note, you have a few places in your code where you're declaring functions, rather than calling them (e.g. void ThreadGet(); and void ThreadSend(); in ThreadFunction()). Also, you are reading and writing to the quit variables across multiple threads without synchronisation.

Seeing as you're using std::shared_ptr, have you considered using std::thread, etc? If not, look at that design to see how this issue was resolved.

Moved to General Programming, this is not For Beginners material.

I noticed that the compiler now allows for the virtual function to be private, which I think is odd. But I like it since now it is not exposed to the client code.


It's not "odd" at all (and it's not just "now"; C++ has always allowed that). There's one commonly-held belief that all virtual methods should always be private and that the polymorphic behavior of well-encapsulated objects should be an implementation detail of the public interface (though not for "abstract interface" types, of course).

It's handy, too. Even if you have a particular method you know will always need to be overridden in its entirety, having a small non-virtual public wrapper makes it easier to do logging, set breakpoints that capture all calls to the interface, etc. I don't think it should be done in all cases like some people do, but it's good practice in general.

Can you explain that a bit more? My brain is clearly not working too well this morning and I'm having trouble understanding what you mean.

if you think programming is like sex, you probably haven't done much of either.-------------- - capn_midnight

I noticed that the compiler now allows for the virtual function to be private, which I think is odd. But I like it since now it is not exposed to the client code.


It's not "odd" at all (and it's not just "now"; C++ has always allowed that). There's one commonly-held belief that all virtual methods should always be private and that the polymorphic behavior of well-encapsulated objects should be an implementation detail of the public interface (though not for "abstract interface" types, of course).

It's handy, too. Even if you have a particular method you know will always need to be overridden in its entirety, having a small non-virtual public wrapper makes it easier to do logging, set breakpoints that capture all calls to the interface, etc. I don't think it should be done in all cases like some people do, but it's good practice in general.

Can you explain that a bit more? My brain is clearly not working too well this morning and I'm having trouble understanding what you mean.

Search "non-virtual public interface". You make your public functions non-virtual, but have them call private virtual functions. Those function can then be implemented be child classes, but are only called though the public interface functions in the parent.

Just to note, you have a few places in your code where you're declaring functions, rather than calling them (e.g. void ThreadGet(); and void ThreadSend(); in ThreadFunction()).

Right, the "void" followed with copy/paste smile.png


Also, you are reading and writing to the quit variables across multiple threads without synchronisation.

Would it be better if I swapped out "bool m_Quit" with "std::atomic<bool> m_Quit"?


Seeing as you're using std::shared_ptr, have you considered using std::thread, etc? If not, look at that design to see how this issue was resolved.

I'm not sure how I can encapsulate the thread with its comm-interface such that I can clear it from ThreadPool and have the object clean itself up. I don't want to join the thread. I simply want an object that has a thread inside I can submit and retrieve work to/from. And it should be Polymorphic such that derived classes can implement their own work functions.

Getting run-time error, the std::thread is complaining about a pure virtual function.


class WorkerThread
{
public:
	WorkerThread(DWORD i) : m_Quit(false), m_Shutdown(false)
	{
		InitializeCriticalSection(&Request_Mutex);
		InitializeCriticalSection(&Completed_Mutex);
		Thread = std::thread(ThreadRoutine, this);
	}

	~WorkerThread(void)
	{
		m_Quit = true;
		Thread.detach();
	}

	void TickMain();	//Only called by Client code
	bool SendRequest(std::shared_ptr<InstructionBase> spNewRequest);
	bool GetCompleted(std::shared_ptr<InstructionBase> &spNewRequest);

protected:
	std::atomic<bool> m_Quit, m_Shutdown;
	virtual void ThreadFunction() = 0;

	//Functions called by ThreadFunction()
	void ThreadGet();
	void ThreadSend();
	void CleanUp()
	{
		DeleteCriticalSection(&Request_Mutex);
		DeleteCriticalSection(&Completed_Mutex);
	}

	std::queue<std::shared_ptr<InstructionBase> > Thread_Requests;
	std::vector<std::shared_ptr<InstructionBase> > Thread_Completed;

private:
	//Functions called by TickMain()
	void MainSend();
	void MainGet();

	//Main side containers
	std::vector<std::shared_ptr<InstructionBase> > Main_Requests;
	std::queue<std::shared_ptr<InstructionBase> > Main_Completed;

	//In between containers and mutex
	CRITICAL_SECTION Request_Mutex;
	CRITICAL_SECTION Completed_Mutex;
	std::vector<std::shared_ptr<InstructionBase> > Mutex_Requests;
	std::vector<std::shared_ptr<InstructionBase> > Mutex_Completed;

	//Thread system
	std::thread Thread;
	static DWORD WINAPI ThreadRoutine(LPVOID param)
	{
		static_cast<WorkerThread*>(param)->ThreadFunction();
		static_cast<WorkerThread*>(param)->CleanUp();
		return 0;
	}
};

class DerivedA : public WorkerThread
{
public:
	DerivedA(DWORD i) : WorkerThread(i)
	{
	}

private:
	void ThreadFunction()
	{
		//Thread variables

		while(!m_Quit){
			ThreadGet();

			if(Thread_Requests.empty()){
				Sleep(1);
			} else 	{
				std::shared_ptr<InstructionBase> spNewRequest = Thread_Requests.front();
				Thread_Requests.pop();

				//Do work on spNewRequest

				//If detatched in the middle of this loop
				//Then all these members become invalid

				Thread_Completed.push_back(spNewRequest);				
			}

			ThreadSend();
		}
	}
};

bool WorkerThread::SendRequest(std::shared_ptr<InstructionBase> spNewRequest)
{
	if(m_Quit || m_Shutdown){
		return false;
	}

	Main_Requests.push_back(spNewRequest);
	return true;
}

bool WorkerThread::GetCompleted(std::shared_ptr<InstructionBase> &spNewRequest)
{
	if(Main_Completed.empty()){
		return false;
	}

	spNewRequest = Main_Completed.front();
	Main_Completed.pop();

	return true;
}

void WorkerThread::TickMain()
{
	MainSend();
	MainGet();
}

void WorkerThread::MainSend()
{
	if(Main_Requests.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Main_Requests.size();i++){
			Mutex_Requests.push_back(Main_Requests[i]);
		}

		LeaveCriticalSection(&Request_Mutex);
		Main_Requests.clear();								
	}
}

void WorkerThread::MainGet()
{
	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Mutex_Completed.size();i++){
			Main_Completed.push(Mutex_Completed[i]);
		}

		Mutex_Completed.clear();
		LeaveCriticalSection(&Completed_Mutex);
	}
}

void WorkerThread::ThreadSend()
{
	if(Thread_Completed.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Thread_Completed.size();i++){
			Mutex_Completed.push_back(Thread_Completed[i]);
		}

		LeaveCriticalSection(&Completed_Mutex);
		Thread_Completed.clear();
	}
}

void WorkerThread::ThreadGet()
{
	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Mutex_Requests.size();i++){
			Thread_Requests.push(Mutex_Requests[i]);
		}

		Mutex_Requests.clear();
		LeaveCriticalSection(&Request_Mutex);
	}
}

I am trying a new approach to this whole thing. Instead of having the interface and the thread handling in the same object, I am seperating the interface into its own class.

The WorkerThread class will now only create a new thread and pass the address of a void pointer to the new thread. The new thread will create an interface and attach it to the passed **void.

Unfortunatly I get some errors: error C2664: 'std::_Ptr_base<_Ty>::_Reset0' : cannot convert parameter 1 from 'void *' to 'ThreadSafeCommunication *'


#pragma once
#include <memory>
#include <vector>
#include <queue>
#include <atomic>
#include <thread>
#include <Windows.h>

struct InstructionBase
{
	int test;
};

#define spInstructionBase std::shared_ptr<InstructionBase>
class ThreadSafeCommunication
{
public:
	ThreadSafeCommunication() : m_Quit(false)
	{
		InitializeCriticalSection(&Request_Mutex);
		InitializeCriticalSection(&Completed_Mutex);
	}

	~ThreadSafeCommunication(void)
	{
		DeleteCriticalSection(&Request_Mutex);
		DeleteCriticalSection(&Completed_Mutex);
	}

	//Only called by Client code
	void Shutdown();
	void TickMain();	
	bool MainSendRequest(spInstructionBase NewRequest);
	bool MainGetCompleted(spInstructionBase &NewRequest);

	//Only called by Thread code
	bool Running();
	void TickThread();	
	bool ThreadGetRequest(spInstructionBase &NewRequest);
	bool ThreadSendCompleted(spInstructionBase NewRequest);

private:
	std::atomic<bool> m_Quit;

	//Main side functions and containers
	void MainSend();
	void MainGet();	
	std::vector<spInstructionBase> Main_Requests;
	std::queue<spInstructionBase> Main_Completed;

	//In between containers and mutex
	CRITICAL_SECTION Request_Mutex;
	CRITICAL_SECTION Completed_Mutex;
	std::vector<spInstructionBase> Mutex_Requests;
	std::vector<spInstructionBase> Mutex_Completed;

	//Thread side functions and containers
	void ThreadGet();
	void ThreadSend();
	std::queue<spInstructionBase> Thread_Requests;
	std::vector<spInstructionBase> Thread_Completed;
};


void ThreadFunction(void **pInterface);

class WorkerThread
{
public:
	WorkerThread()
	{
		Thread = std::thread(ThreadFunction, &pInterface);
	}

	~WorkerThread(void)
	{
		static_cast<std::shared_ptr<ThreadSafeCommunication> >(pInterface)->Shutdown();
		Thread.detach();
	}

	void *pInterface;

private:
	std::thread Thread;
};

void ThreadFunction(void **pInterface)
{
	std::shared_ptr<ThreadSafeCommunication> spNewInterface(new ThreadSafeCommunication); 
	*pInterface = (void*)&spNewInterface;

	while(spNewInterface->Running())
	{
		spNewInterface->TickThread();
		//Do stuff
	}
}

#include "Comm.h"
#include <Windows.h>
#include <memory>

void ThreadSafeCommunication::Shutdown()
{
	m_Quit = true;
}

bool ThreadSafeCommunication::Running()
{
	return !m_Quit;
}

bool ThreadSafeCommunication::MainSendRequest(std::shared_ptr<InstructionBase> spNewRequest)
{
	if(m_Quit){
		return false;
	}

	Main_Requests.push_back(spNewRequest);
	return true;
}

bool ThreadSafeCommunication::MainGetCompleted(std::shared_ptr<InstructionBase> &spNewRequest)
{
	if(Main_Completed.empty()){
		return false;
	}

	spNewRequest = Main_Completed.front();
	Main_Completed.pop();

	return true;
}

bool ThreadSafeCommunication::ThreadGetRequest(std::shared_ptr<InstructionBase> &spNewRequest)
{
	if(Thread_Requests.empty()){
		return false;
	}
	spNewRequest = Thread_Requests.front();
	Thread_Requests.pop();

	return true;
}

bool ThreadSafeCommunication::ThreadSendCompleted(std::shared_ptr<InstructionBase> spNewRequest)
{
	if(m_Quit){
		return false;
	}
	
	Thread_Completed.push_back(spNewRequest);
	return true;
}

void ThreadSafeCommunication::TickMain()
{
	MainSend();
	MainGet();
}

void ThreadSafeCommunication::TickThread()
{
	ThreadSend();
	ThreadGet();
}

void ThreadSafeCommunication::MainSend()
{
	if(Main_Requests.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Main_Requests.size();i++){
			Mutex_Requests.push_back(Main_Requests[i]);
		}

		LeaveCriticalSection(&Request_Mutex);
		Main_Requests.clear();								
	}
}

void ThreadSafeCommunication::MainGet()
{
	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Mutex_Completed.size();i++){
			Main_Completed.push(Mutex_Completed[i]);
		}

		Mutex_Completed.clear();
		LeaveCriticalSection(&Completed_Mutex);
	}
}

void ThreadSafeCommunication::ThreadSend()
{
	if(Thread_Completed.empty()){
		return;
	}

	if(TryEnterCriticalSection(&Completed_Mutex)){
		for(size_t i=0;i<Thread_Completed.size();i++){
			Mutex_Completed.push_back(Thread_Completed[i]);
		}

		LeaveCriticalSection(&Completed_Mutex);
		Thread_Completed.clear();
	}
}

void ThreadSafeCommunication::ThreadGet()
{
	if(TryEnterCriticalSection(&Request_Mutex)){
		for(size_t i=0;i<Mutex_Requests.size();i++){
			Thread_Requests.push(Mutex_Requests[i]);
		}
		
		Mutex_Requests.clear();
		LeaveCriticalSection(&Request_Mutex);
	}
} 

You can pass a lambda to std::thread. I've not used C++11 extensively, I hope I'm using it correctly, I believe it should be something like this:

// Quick way to avoid changing lots of code
bool TryEnterCriticalSection(std::mutex *mutex) {
    return mutex->try_lock();
}
 
// Quick way to avoid changing lots of code
void LeaveCriticalSection(std::mutex *mutex) {
    mutex->unlock();
}
 
struct InstructionBase
{
    int test;
};
 
// No need to #define this
typedef std::shared_ptr<InstructionBase> spInstructionBase;
 
class ThreadSafeCommunication
{
public:
    ThreadSafeCommunication() : m_Quit(false)
    {
    }
 
    ~ThreadSafeCommunication()
    {
    }
 
    //Only called by Client code
    void Shutdown();
    void TickMain();    
    bool MainSendRequest(spInstructionBase NewRequest);
    bool MainGetCompleted(spInstructionBase &NewRequest);
 
    //Only called by Thread code
    bool Running();
    void TickThread();    
    bool ThreadGetRequest(spInstructionBase &NewRequest);
    bool ThreadSendCompleted(spInstructionBase NewRequest);
 
private:
    std::atomic<bool> m_Quit;
 
    //Main side functions and containers
    void MainSend();
    void MainGet();    
    std::vector<spInstructionBase> Main_Requests;
    std::queue<spInstructionBase> Main_Completed;
 
    //In between containers and mutex
    std::mutex Request_Mutex;
    std::mutex Completed_Mutex;
    std::vector<spInstructionBase> Mutex_Requests;
    std::vector<spInstructionBase> Mutex_Completed;
 
    //Thread side functions and containers
    void ThreadGet();
    void ThreadSend();
    std::queue<spInstructionBase> Thread_Requests;
    std::vector<spInstructionBase> Thread_Completed;
};
 
 
void ThreadFunction(std::shared_ptr<ThreadSafeCommunication> pInterface);
 
class WorkerThread
{
public:
    WorkerThread() : pInterface(new ThreadSafeCommunication)
    {
        Thread = std::thread([&] {
            ThreadFunction(pInterface);
        });
    }
 
    ~WorkerThread()
    {
        pInterface->Shutdown();
        Thread.detach();
    }
 
    std::shared_ptr<ThreadSafeCommunication> pInterface;
 
private:
    std::thread Thread;
};
 
void ThreadFunction(std::shared_ptr<ThreadSafeCommunication> pInterface)
{
    while(pInterface->Running())
    {
        pInterface->TickThread();
        //Do stuff
    }
}
 
void ThreadSafeCommunication::Shutdown()
{
    m_Quit = true;
}
 
bool ThreadSafeCommunication::Running()
{
    return !m_Quit;
}
 
bool ThreadSafeCommunication::MainSendRequest(std::shared_ptr<InstructionBase> spNewRequest)
{
    if(m_Quit){
        return false;
    }
 
    Main_Requests.push_back(spNewRequest);
    return true;
}
 
bool ThreadSafeCommunication::MainGetCompleted(std::shared_ptr<InstructionBase> &spNewRequest)
{
    if(Main_Completed.empty()){
        return false;
    }
 
    spNewRequest = Main_Completed.front();
    Main_Completed.pop();
 
    return true;
}
 
bool ThreadSafeCommunication::ThreadGetRequest(std::shared_ptr<InstructionBase> &spNewRequest)
{
    if(Thread_Requests.empty()){
        return false;
    }
    spNewRequest = Thread_Requests.front();
    Thread_Requests.pop();
 
    return true;
}
 
bool ThreadSafeCommunication::ThreadSendCompleted(std::shared_ptr<InstructionBase> spNewRequest)
{
    if(m_Quit){
        return false;
    }
    
    Thread_Completed.push_back(spNewRequest);
    return true;
}
 
void ThreadSafeCommunication::TickMain()
{
    MainSend();
    MainGet();
}
 
void ThreadSafeCommunication::TickThread()
{
    ThreadSend();
    ThreadGet();
}
 
void ThreadSafeCommunication::MainSend()
{
    if(Main_Requests.empty()){
        return;
    }
 
    if(TryEnterCriticalSection(&Request_Mutex)){
        for(size_t i=0;i<Main_Requests.size();i++){
            Mutex_Requests.push_back(Main_Requests[i]);
        }
 
        LeaveCriticalSection(&Request_Mutex);
        Main_Requests.clear();                                
    }
}
 
void ThreadSafeCommunication::MainGet()
{
    if(TryEnterCriticalSection(&Completed_Mutex)){
        for(size_t i=0;i<Mutex_Completed.size();i++){
            Main_Completed.push(Mutex_Completed[i]);
        }
 
        Mutex_Completed.clear();
        LeaveCriticalSection(&Completed_Mutex);
    }
}
 
void ThreadSafeCommunication::ThreadSend()
{
    if(Thread_Completed.empty()){
        return;
    }
 
    if(TryEnterCriticalSection(&Completed_Mutex)){
        for(size_t i=0;i<Thread_Completed.size();i++){
            Mutex_Completed.push_back(Thread_Completed[i]);
        }
 
        LeaveCriticalSection(&Completed_Mutex);
        Thread_Completed.clear();
    }
}
 
void ThreadSafeCommunication::ThreadGet()
{
    if(TryEnterCriticalSection(&Request_Mutex)){
        for(size_t i=0;i<Mutex_Requests.size();i++){
            Thread_Requests.push(Mutex_Requests[i]);
        }
        
        Mutex_Requests.clear();
        LeaveCriticalSection(&Request_Mutex);
    }
} 

I've used std::mutex instead of CRITICAL_SECTION. Also note the use of typedef over #define. I haven't examined the code in detail for multi-threaded correctness.

It seems that passing a shared_ptr by reference is not thread safe. But passing by value is.

Yes, passing a reference to anything to another thread is risky. You're racing the work the thread is doing against the lifetime of the object. For example, there is danger with the code I posted - imagine the WorkerThread object gets destroyed before the helper thread gets a chance to start.

The following is probably safer:

std::shared_ptr<ThreadSafeCommunication> copy = pInterface;
Thread = std::thread([copy] {
    ThreadFunction(copy);
});

You could probably drop the thread as a member, just have it as a local in the constructor and detach it immediately.

This topic is closed to new replies.

Advertisement