Threads

Started by
10 comments, last by Kurioes 20 years, 5 months ago
Hi, I need to use threads (to decode an ogg/vorbis file while doing something else)... for which I made a thread class; -- thread.h

#ifndef __THREAD_H_INCLUDED__
#define __THREAD_H_INCLUDED__

#include <windows.h>

class cThread
{
public:
    bool                    Create(bool Suspended, unsigned int StackSize = 0);
    // stop returns after thread proc returns

    void                    Stop(DWORD Time = 0);
    void                    Resume();
    void                    Suspend();

    bool                    IsRunning() const { return pr_bRunning; };

private:
    HANDLE                  pr_hThread;
    bool                    pr_bHalted;
    static DWORD WINAPI     thread_func(void *param);

protected:
    bool                    pr_bRunning;
    bool                    pr_bNeedStop;

    virtual DWORD           ThreadFunc() = 0;

public:
    cThread();
    virtual ~cThread();
};

#endif // __THREAD_H_INCLUDED__

-- thread.cpp

#include <windows.h>
#include "thread.h"

bool cThread::Create(bool Suspended, unsigned int StackSize)
{
    DWORD dwThreadID;
    pr_hThread = (HANDLE) CreateThread(0,                       // security attributes

                                       StackSize,               // stack size (0=same as creating process)

                                       cThread::thread_func,    // thread starting address

                                       (void *)this,            // paramater to pass to thread func

                                       (Suspended) ? CREATE_SUSPENDED : 0,
                                       &dwThreadID);
    if (pr_hThread) {
        pr_bRunning = true;
        pr_bNeedStop = false;
        pr_bHalted = Suspended;
        return true;
    } else {
        pr_bRunning = false;
        pr_bNeedStop = false;
        pr_bHalted = false;
        return false;
    }
}

void cThread::Stop(DWORD Time)
{
    pr_bNeedStop = true;
    if (pr_hThread) {
        if (!pr_bHalted) {
            if (WaitForSingleObject(pr_hThread, (Time) ? Time : INFINITE) == WAIT_TIMEOUT)
            {
                // kill kill kill

                TerminateThread(pr_hThread, 0);
            }
        }
        CloseHandle(pr_hThread);
        pr_hThread = 0;
    }
    pr_bRunning = false;
}

void cThread::Suspend()
{
    if (pr_hThread){
        if (!pr_bHalted) {
            SuspendThread(pr_hThread);
        }
    }
}

void cThread::Resume()
{
    if (pr_hThread) {
        if (pr_bHalted) {
            ResumeThread(pr_hThread);
        }
    }
}

DWORD WINAPI cThread::thread_func(void *param)
{
    return ((cThread *)param)->ThreadFunc();
    CloseHandle(((cThread *)param)->pr_hThread);
    ((cThread *)param)->pr_hThread = 0;
    ((cThread *)param)->pr_bRunning = false;
    ((cThread *)param)->pr_bHalted = false;
    ((cThread *)param)->pr_bNeedStop = false;
    return 0;
}

cThread::cThread()
{
    pr_hThread = 0;
    pr_bRunning = false;
    pr_bNeedStop = false;
    pr_bHalted = false;
}

cThread::~cThread()
{
    Stop();
}
To use this class I "override" (rather implement) the ThreadFunc function, which does the actual work. I tried to make the code fool-proof so you can''t for instance crash anything by stopping a thread that''s not created. My ogg/vorbis player waits for directsound signals to decode an ogg/vorbis stream when needed. My real question is: have I overlooked anything? Does this code work? Can you spot errors? It seems to work but I think someone more experienced should take a look before I assume it works correctly. Thanks in advance. btw Anyone who trusts the code may rip it for his/her own uses.. I don''t care... but I''m not responsible for it.
Advertisement
One thing that concerns me is the lack of locking or some sort of critical section to guarantee atomicity of your flag checking. Maybe you already thought of this and structured them so it won''t happen. I haven''t really looked at it very critically.

Another thing I noticed is your pr_bHalted and pr_bNeedStop seem relatively useless as they do not get reset or checked at appropriate times. You may also want to consider making some flags "volatile" so you don''t get missreported values due to caching or optimizations.
1) You should using _beginthreadex() instead of CreateThread(). You won''t get the C runtime initialization you need otherwise; it works simply because you haven''t used critical runtime functions yet.

2) TerminateThread() is considered rude and can have unforeseen side effects. Use a boolean flag instead that you''d poll at convenient moments to gracefully call _endthread(). Maybe some base class ''bool CanThreadContinue(void)'' member function that the thread code uses to check the flag would do.

3) Suspend() and Resume() have little or no purpose in real life. They''re fine additions to any OS, but don''t bother to provide or debug them. Same thinking goes to creating suspended threads as there are a better tools to control thread execution (semaphores, events, critical sections, ...)

4) You might want to add priority control because the decoder thread might not need the same amount of CPU as the playing thread.
Can you guys point me in the directions you went to learn about threading?

I would like to begin incorporating it into my projects, but I''m unsure how to do so.

George D. Filiotis

I am a signature virus. Please add me to your signature so that I may multiply.
Geordi
George D. Filiotis
Actually, according to MSDN:

quote:
A thread that uses functions from the static C run-time libraries should use the beginthread and endthread C run-time functions for thread management rather than CreateThread and ExitThread. Failure to do so results in small memory leaks when ExitThread is called. Note that this is not a problem with the C run-time in a DLL.


.
I think it''s a little old, but O''Reilly''s "Win32 Multithreaded Programming" is decent, and comes with good example code.
jermz: Good point... I did make a criticalsection class which I didn't include... I guess you're right, but I never intended the thread to be "commanded" by another thread (just the main thread OR by itself). I'll add it anyway... who knows when it'll come around and bite me.

-- criticalsection.h

#ifndef __CRITICALSECTION_H_INCLUDED__
#define __CRITICALSECTION_H_INCLUDED__

#include <windows.h>

/*
CRITICAL SECION CLASS

A critical section is an object meant to prevent two threads
from executing the same piece of code simultaneously. This
may sometimes be undesirable. Imagine the following;
A thread is executionc code that modifies some data. While it
is doing so, another thread tries to access that information.
While the data is being written, it is (partially) corrupted.

In this situation, the code should be "enclosed" in a
critical section.

This class functions as follows:
- Upon creation a CRITICAL_SECTION object is initialized.
- Upon destruction it is released.
- When code needs to be "synchronised" it is enclosed by
cCritialSection::Lock();
<br> cCriticalSection::Unlock()<br><br> Lock will lock the code and block if a lock has already been<br> acquired. If not, the thread will continue. If a thread has<br> been blocked it will continue once the other thread has<br> unlocked.<br><br> TryLock returns false if no lock was acquired and true if<br> a lock was acquired. Make sure not to execute the code when<br> false is returned. Call Unlock once for every acquired lock.<br> TryLock can be useful when other tasks could be executed<br> first instead of waiting.<br><br> NOTE: I have no idea how two waiting threads will be handled.<br> I assume it will work just the same.<br><br> NOTE: TryLock, for some obscure reason beyond my <br> understanding, won't compile as TryEnterCriticalSection<br> is an undeclared identifier.<br>*/</font><br><br><font color=blue>class</font> cCriticalSection<br>{<br><font color=blue>public</font>:<br> <font color=gray>// enter critical section<br></font><br> <font color=blue>void</font> Lock() { EnterCriticalSection(&pr_CriticalSection); };<br> <font color=gray>// try entering critical section<br></font><br> <font color=gray>//bool TryLock() { return TryEnterCriticalSection(&pr_CriticalSection); }; // huh? won't compile?<br></font><br> <font color=gray>// leave critical section<br></font><br> <font color=blue>void</font> Unlock() { LeaveCriticalSection(&pr_CriticalSection); };<br><br><font color=blue>private</font>:<br> CRITICAL_SECTION pr_CriticalSection;<br><br><font color=blue>public</font>:<br> cCriticalSection() {<br> InitializeCriticalSection(&pr_CriticalSection);<br> };<br> ~cCriticalSection() {<br> DeleteCriticalSection(&pr_CriticalSection);<br> }<br>};<br><br><font color=green>#define</font> CRITICAL(x, y) { (x).Lock(); y; (x).Unlock(); }<br><br><font color=green>#endif</font> <font color=gray>// __CRITICALSECTION_H_INCLUDED__<br></font><br></pre><!--ENDSCRIPT--><br><br>AP: 1) I guess I should look into that then <img src="smile.gif" width=15 height=15 align=middle> thanks for pointing that out.<br><br>2) That's why you can pass 0 as a parameter to stop. That causes the Stop() to wait indefinately and not call TerminateThread. The flag pr_bNeedStop tells the thread whether it needt to stop or not. If it doesn't respond, Stop() just enables you to help the thread a little <img src="wink.gif" width=15 height=15 align=middle> not that I'd recommend it though.<br><br>3) For my music playing purpose suspending would be nice... &#111;n second hand... the thread IS waiting (WaitForSingleObject) and as such isn't wasting any cycles... I guess I'll remove it for the sake of simplicity.<br><br>4) For completeness I will add it, however, when DirectSOund tells me to decode some sound data, it means it. The thread is not wasting any cycles as it uses WaitForSIngleObject to start decoding a new block.<br><br><br><br>Thanks for all your replies!<br><br>I have another question: is it absolutely nessesary to call ExitThread()? My Thread class doen't do so right now. It is, however, releasing the thread's handle. <br><br>EDIT: stupid typos removed<br><br><hr><br>[ <a href="http://home.wanadoo.nl/r.hartskeerl/bananas.avi">Bananas</a> | <a href="http://home.wanadoo.nl/r.hartskeerl/">My dead site</a> | <a href="http://www.sgi.com">www.sgi.com</a> | <a href="http://www.goegel.be">Goegel</a> ] <br><br><SPAN CLASS=editedby>[edited by - Kurioes on November 7, 2003 5:44:59 PM]</SPAN>
quote:Original post by jermz
Another thing I noticed is your pr_bHalted and pr_bNeedStop seem relatively useless as they do not get reset or checked at appropriate times. You may also want to consider making some flags "volatile" so you don''t get missreported values due to caching or optimizations.


Hmm... I seem to have missed this part. What does the volatile keyword do? I''ve seen it before.

And I must agree I''m using way too many flags
volatile tells the compiler that this variable can change at any time, and the compiler must take this into account as not to employ optimization that break the way it works.

If you set a flag from a different thread, the flag ought to be volatile so that the compiler does not optimize outt the check (and do it only once, instead of every loop for example). And when it's set, it will hit memory where it is set, it will not stay in a register until the end of the function.

This is my threading code, works on Windows and Linux
#ifdef _MSC_VER#pragma once#endif#ifndef MKH_SYNC_HPP#define MKH_SYNC_HPP//std#include <vector>#include <algorithm>#include <cassert>//Common#include "Integers.hpp"#include "Error.hpp"#include "String.hpp"#if defined(_MSC_VER)	#pragma once	//Win32	#include <Windows.h>	#include <Process.h>		namespace MKH		{		namespace Sync			{			inline void Sleep(int ms)				{				::Sleep(ms);				}			using namespace MKH::System;						//Name changed from CWait to Wait, and method Wait changed to WaitFor			template<bool bAlertable=false>			class Wait				{				public:					Wait() : m_bWaiting(false)						{}										Wait(HANDLE hEvent) : m_bWaiting(false)						{						m_vEvents.insert(m_vEvents.end(), hEvent);						}										Wait(HANDLE hEvent1, HANDLE hEvent2) : m_bWaiting(false)						{						m_vEvents.insert(m_vEvents.end(), hEvent1);						m_vEvents.insert(m_vEvents.end(), hEvent2);						}										Wait(HANDLE* begin, HANDLE* end) : m_bWaiting(false)						{						assert(end>begin);						m_vEvents.resize(end-begin);						m_vEvents.assign(begin, end);						}										~Wait()						{}										void AddEvent(HANDLE hEvent)						{						ASSERTMSG(!m_bWaiting, "Do NOT mess with the event vector while waiting on it!");						m_vEvents.push_back(hEvent);						}										void RemoveEvent(HANDLE hEvent)						{						assert(!m_bWaiting); //Do NOT mess with the event vector while waiting on it!						tyEventVector::iterator it = std::find(m_vEvents.begin(), m_vEvents.end(), hEvent);						if(it!=m_vEvents.end())							m_vEvents.erase(it);						}					void RemoveAll()						{						if(!m_vEvents.empty())							{							m_vEvents.resize(0);							}						}										DWORD WaitFor(const DWORD dwTime_ms, const BOOL bWaitForAll=FALSE)						{						assert(!m_vEvents.empty());						using namespace MKH::System;												HANDLE* pEvent = &m_vEvents[0];						int cEvents = (i32)m_vEvents.size();						assert(cEvents<=MAXIMUM_WAIT_OBJECTS);//"Exceeded OS limitation of simultaneous wait handles");						m_bWaiting=true;						DWORD dwResult = WaitForMultipleObjectsEx(cEvents, pEvent, bWaitForAll, dwTime_ms, bAlertable);						m_bWaiting=false;						return dwResult;						}											typedef std::vector<HANDLE> tyEventVector;					tyEventVector& events()						{						ASSERTMSG(!m_bWaiting, "Do NOT mess with the event vector while waiting on it!");						return m_vEvents;						}				private:					tyEventVector m_vEvents;					volatile BOOL m_bWaiting;				};			template<bool bAutoReset=true, bool bInitialState=false>			struct Event				{				private:					Event(HANDLE hEvent);									public:					Event(LPSECURITY_ATTRIBUTES SecurityAttr=NULL, TCHAR* szName=NULL): m_hEvent(INVALID_HANDLE_VALUE)						{						m_hEvent = CreateEvent(SecurityAttr, !bAutoReset, bInitialState, szName);												assert(m_hEvent);//"Event creation failed!"						assert(m_hEvent!=INVALID_HANDLE_VALUE);//"Event creation failed!"						assert(m_hEvent!=(void*)0x1);						}										Event(const Event& event)						{						m_hEvent = event.m_hEvent;						}										~Event()						{						if(INVALID_HANDLE_VALUE!=m_hEvent)							CloseHandle(m_hEvent);						}										operator HANDLE()						{						assert(m_hEvent);//"Copied a NULL HANDLE - BSA!!!"						assert(m_hEvent!=INVALID_HANDLE_VALUE);//"Copied an INVALID HANDLE - BSA!!!"						return(m_hEvent);						}					operator HANDLE*()						{						assert(m_hEvent);//"This usually indicates a bug"						assert(m_hEvent!=INVALID_HANDLE_VALUE);//"This usually indicates a bug"						return &m_hEvent;						}										bool Signal()						{						assert(m_hEvent);						return !!SetEvent(m_hEvent);						}										bool Reset()						{						assert(m_hEvent);						return !!ResetEvent(m_hEvent);						}										bool Pulse()						{						assert(m_hEvent);						return !!PulseEvent(m_hEvent);						}										bool IsSignaled()						{						return(WAIT_OBJECT_0==WaitForSingleObject(m_hEvent, 0));						}										bool IsReset()						{						return(WAITSCIMEOUT==WaitForSingleObject(m_hEvent, 0));						}										bool WaitFor(DWORD dwTimeout_ms=INFINITE)						{						assert(m_hEvent);						return(WAIT_OBJECT_0==WaitForSingleObject(m_hEvent, dwTimeout_ms));						}				public:					HANDLE m_hEvent;				};						template<typename TSync = CriticalSection>			class Lock				{				public:					typedef typename TSync syncSC;					inline Lock(TSync& Sync_) : m_Sync(Sync_)						{						m_Sync.Lock();						}					inline Lock(TSync* pSync_) : m_Sync(*pSync_)						{						assert(pSync_);						m_Sync.Lock();						}					inline ~Lock()						{						m_Sync.Unlock();						}				protected:					TSync& m_Sync;				};			//#define CLock Lock						/*			template<typename TSync = CriticalSection>			struct auto_sync				{				typedef typename TSync syncSCype;				inline auto_sync(syncSCype& Sync_) : sync(Sync_)					{					assert(&Sync_);					sync.Lock();					}				inline auto_sync(syncSCype* pSync_) : sync(*pSync_)					{					assert(pSync_);					sync.Lock();					}				inline ~auto_sync()					{					sync.Unlock();					}				private:					syncSCype& sync;				};			*/						class FakeCriticalSection				{				public:					inline void Lock(){}					inline void Unlock(){}				};			//typedef FakeCriticalSection CFakeCriticalSection;						class CriticalSection				{				private:					//Critical Sections should not be copied.					CriticalSection(const CriticalSection& refCritSec);					CriticalSection& operator=(const CriticalSection& refCritSec);				public:					inline CriticalSection()   {InitializeCriticalSection(&this->cs);}					inline ~CriticalSection()  {    DeleteCriticalSection(&this->cs);}					inline void Lock()         {     EnterCriticalSection(&this->cs);}					inline void Unlock()       {     LeaveCriticalSection(&this->cs);}					operator CRITICAL_SECTION&() {return(cs);}				protected:					CRITICAL_SECTION cs;				};			//typedef CriticalSection CCriticalSection;			class Mutex				{				public:				HANDLE mutex;				Mutex()					{					this->mutex = CreateMutex(NULL, FALSE, SC(""));					}				~Mutex()					{					CloseHandle(this->mutex);					}				void Signal()					{					SetEvent(this->mutex);					}				bool WaitFor(DWORD timeout = 5000)					{					return(WAIT_OBJECT_0 == WaitForSingleObject(this->mutex, timeout));					}				};			//typedef Mutex CMutex;						class Semaphore				{				public:					Semaphore(long cMax, long cInitial=0, LPCTSTR stzName=NULL)						{						m_hSemaphore = CreateSemaphore(NULL, cInitial, cMax, stzName);						}					~Semaphore()						{						//BOOL bResult =						CloseHandle(m_hSemaphore);						}					operator HANDLE()						{						return(m_hSemaphore);						}										BOOL Release(long cInc=1, long* pcPrev=0)						{						return ReleaseSemaphore(m_hSemaphore, cInc, pcPrev);						}					BOOL Add(long cDec=1, long* pcPrev=0)						{						return ReleaseSemaphore(m_hSemaphore, -cDec, pcPrev);						}				private:					HANDLE m_hSemaphore;				};			//typedef Semaphore CSemaphore;							const ExitEvent     = WAIT_OBJECT_0;			const WaitFailed    = WAIT_FAILED;			const WaitTimeOut   = WAIT_TIMEOUT;			const WaitAbandoned = WAIT_ABANDONED;						inline int CurrentThreadID()				{				return ::GetCurrentThreadId();				}			template <typename ParentClass>			struct Thread				{				public:					typedef void (*fpLogFunc)(const TCHAR*const);					fpLogFunc logfunc;										static void noLog(const TCHAR*const){}					Thread() : logfunc(&Thread::noLog), m_hThread(INVALID_HANDLE_VALUE), m_dwID(0), m_pParentClass(0)						{						}										~Thread()						{						if(IsValid())							{							if(!IsTerminated())								{								//ASSERTMSG(0, "Thread object being destroyed prior to thread termination!\n");								this->logfunc(SC("Thread object being destroyed prior to thread termination!\r\n"));								Exit();								Join();								}							CloseHandle(m_hThread);							m_hThread=INVALID_HANDLE_VALUE;							}						}										HANDLE handle(){return m_hThread;}										typedef unsigned int (ParentClass::*MethodProc)(void);					typedef Event<false, false> tyEvent;					tyEvent m_evExit;										bool Create(ParentClass* pThis, MethodProc pThreadMethod, const char* const name = 0)						{						//ASSERTMSG(INVALID_HANDLE_VALUE==m_hThread, "Don't call Thread<>::Create without calling Close!");						if(INVALID_HANDLE_VALUE!=m_hThread)							{							this->logfunc(SC("Closing previously created thread due to second Thread<>::Create\r\n"));							this->Close();//return false;							}												m_evExit.Reset();						m_pParentClass = pThis;						m_pMethodProc = pThreadMethod;						//m_hThread = (HANDLE)_beginthreadex(NULL, 0, &DefaultProc, this, 0, (UINT*)&m_dwID);						m_hThread = CreateThread(NULL, NULL, &DefaultProc, this, 0, &m_dwID);												if( (0==m_hThread) || (INVALID_HANDLE_VALUE==m_hThread) )							return false;						else							return true;						}					int ThreadID() {return m_dwID;}					bool IsExiting()						{						return this->m_evExit.IsSignaled();						}					bool Exit()						{						return m_evExit.Signal();						}					bool Join(DWORD dwTime_ms=5000)						{						Lock<> AutoLock(m_csLock);												if(this->IsValid())							{							//We may not neccessarily want to always trigger an exit when waiting for it							//m_evExit.Signal();							if(this->WaitForTermination(dwTime_ms))								return true;							else								{								TCHAR szMsg[128];								String::tcscpy(szMsg, SC("* "));								String::tcscpy(&szMsg[String::tcslen(szMsg)], SC("Thread<> "));								String::tcscpy(&szMsg[String::tcslen(szMsg)], SC(" forcefully terminated\n"));								assert(String::tcslen(szMsg)<sizeof(szMsg));								this->logfunc(szMsg);								//Debug::Console.Out(szMsg);								return this->Terminate();								}							}						else							return true;						}										bool IsValid() {return(INVALID_HANDLE_VALUE!=m_hThread);}					bool IsTerminated() {return (WAIT_OBJECT_0==WaitForSingleObject(m_hThread, 0));}					bool IsRunning() {return (WAITSCIMEOUT==WaitForSingleObject(m_hThread, 0));}										BOOL Suspend()						{						assert(INVALID_HANDLE_VALUE!=m_hThread);						return SuspendThread(m_hThread);						}										BOOL Resume()						{						assert(INVALID_HANDLE_VALUE!=m_hThread);						return ResumeThread(m_hThread);						}										/*					//reference, Win32 only					enum ePriority						{						TimeCritical = THREAD_PRIORITY_TIME_CRITICAL,						Highest      = THREAD_PRIORITY_HIGHEST,						AboveNormal  = THREAD_PRIORITY_ABOVE_NORMAL,						Normal       = THREAD_PRIORITY_NORMAL,						BelowNormal  = THREAD_PRIORITY_BELOW_NORMAL,						Lowest       = THREAD_PRIORITY_LOWEST,						Idle         = THREAD_PRIORITY_IDLE						};					*/					int LowestPriority()  {return THREAD_PRIORITY_IDLE;}					int DefaultPriority() {return THREAD_PRIORITY_NORMAL;}					int HighestPriority() {return THREAD_PRIORITY_TIME_CRITICAL;}					int IncPriority(int priority)						{						switch(priority)							{							case THREAD_PRIORITY_TIME_CRITICAL:								return THREAD_PRIORITY_TIME_CRITICAL;														case THREAD_PRIORITY_HIGHEST:								return THREAD_PRIORITY_TIME_CRITICAL;														case THREAD_PRIORITY_ABOVE_NORMAL:								return THREAD_PRIORITY_HIGHEST;														case THREAD_PRIORITY_NORMAL:								return THREAD_PRIORITY_ABOVE_NORMAL;														case THREAD_PRIORITY_BELOW_NORMAL:								return THREAD_PRIORITY_NORMAL;														case THREAD_PRIORITY_LOWEST:								return THREAD_PRIORITY_BELOW_NORMAL;														case THREAD_PRIORITY_IDLE:								return THREAD_PRIORITY_LOWEST;														default:								assert(0);								return THREAD_PRIORITY_NORMAL;							}						}					int DecPriority(int priority)						{						switch(priority)							{							case THREAD_PRIORITY_TIME_CRITICAL:								return THREAD_PRIORITY_HIGHEST;														case THREAD_PRIORITY_HIGHEST:								return THREAD_PRIORITY_ABOVE_NORMAL;														case THREAD_PRIORITY_ABOVE_NORMAL:								return THREAD_PRIORITY_NORMAL;														case THREAD_PRIORITY_NORMAL:								return THREAD_PRIORITY_BELOW_NORMAL;														case THREAD_PRIORITY_BELOW_NORMAL:								return THREAD_PRIORITY_LOWEST;														case THREAD_PRIORITY_LOWEST:								return THREAD_PRIORITY_IDLE;														case THREAD_PRIORITY_IDLE:								return THREAD_PRIORITY_IDLE;														default:								assert(0);								return THREAD_PRIORITY_NORMAL;							}						}					int SetPriority(int priority)						{						if(0==SetThreadPriority(m_hThread, priority))							MKH_THROWONFAILURE(GetLastError());						return priority;						}					int GetPriority()						{						if(0==GetThreadPriority(m_hThread, priority))							MKH_THROWONFAILURE(GetLastError());						return priority;						}										bool WaitForTermination(DWORD dwTime_ms=INFINITE)						{						assert(INVALID_HANDLE_VALUE!=m_hThread);						return(WAIT_OBJECT_0==WaitForSingleObject(m_hThread, dwTime_ms));						}				protected:					static unsigned long __stdcall DefaultProc(void* pv)						{						if(pv==0) _asm{int 3}						assert(pv);						Sleep(10);						return reinterpret_cast<Thread*>(pv)->Run();						}					unsigned int Run()						{						try							{							assert(m_pParentClass);							assert(this);							assert(m_pMethodProc);							if(m_pParentClass && m_pMethodProc)								{								unsigned int x = (m_pParentClass->*m_pMethodProc)();								return x;								}							}						catch(...)							{							//assert(0);//"Thread threw an uncaught exception"							throw;							}						return -3;						}					MethodProc m_pMethodProc;					ParentClass* m_pParentClass;										//It seems only Win32 has this concept of closing thread handles					//It's not clear to me when a thread is utterly destroyed in posix					bool Close()						{						assert(INVALID_HANDLE_VALUE!=m_hThread);						//ASSERTMSG(IsTerminated(), "Terminate the thread prior to closing it's handle!");						if(!IsTerminated())							{							this->logfunc(SC("Terminating thread on Thread<>::Close\r\n"));							Exit();							Join(5000);							}						if(CloseHandle(m_hThread))							{							m_hThread=INVALID_HANDLE_VALUE;							return true;							}						else							return false;						}										bool Terminate()						{						Lock<> AutoLock(m_csLock);												if(INVALID_HANDLE_VALUE!=m_hThread)							if(TerminateThread(m_hThread, -42))								return true;							else								return false;						else							return true;						}									protected:					volatile HANDLE m_hThread;					DWORD m_dwID;					CriticalSection m_csLock;				};							struct OverlappedBase				{				OverlappedBase() : Internal(0), InternalHigh(0), Offset(0), OffsetHigh(0), hEvent(0) {}								ULONG_PTR Internal;				ULONG_PTR InternalHigh;				DWORD     Offset;				DWORD     OffsetHigh;				HANDLE    hEvent;								operator OVERLAPPED&() {return reinterpret_cast<OVERLAPPED&>(*this);}				operator OVERLAPPED*() {return reinterpret_cast<OVERLAPPED*>(this);}				};						struct Overlapped : Event<>, OverlappedBase				{				typedef Event<> tyEvent;				Overlapped()					{					hEvent = m_hEvent;					}				~Overlapped()					{					hEvent=INVALID_HANDLE_VALUE;					}				};			}//ns Sync		}//ns MKH#elif defined(__GNUC__) && (defined(linux) || defined(__linux) || defined(__linux__))	#include <pthread.h>	#include <unistd.h>	#include <sched.h>	namespace MKH		{		namespace Sync			{			inline void Sleep(int ms)				{				if(ms==0)					//pthread_yeild();					sched_yield();				else					{					//struct timespec delay;					//delay.tv_sec  = ms - (ms%1000);					//delay.tv_nsec = (ms%1000) * 1000000;					//pthread_delay_np(&delay);					usleep(ms*1000);					}				}			struct Mutex				{				public:					Mutex()						{						MKH_THROWONFAILURE(pthread_mutexattr_init(&this->attr));						MKH_THROWONFAILURE(pthread_mutexattr_settype(&this->attr, PTHREAD_MUTEX_RECURSIVE));						MKH_THROWONFAILURE(pthread_mutex_init(&this->mutex, &this->attr));						}					~Mutex()						{						MKH_THROWONFAILURE(pthread_mutex_destroy(&this->mutex));						MKH_THROWONFAILURE(pthread_mutexattr_destroy(&this->attr));						}					inline void Lock()						{						MKH_THROWONFAILURE(pthread_mutex_lock(&this->mutex));						}					inline void Unlock()						{						MKH_THROWONFAILURE(pthread_mutex_unlock(&this->mutex));						}				protected:					pthread_mutexattr_t attr;					pthread_mutex_t mutex;				};			struct CriticalSection				{				public:					CriticalSection()						{						MKH_THROWONFAILURE(pthread_mutexattr_init(&this->attr));						MKH_THROWONFAILURE(pthread_mutexattr_settype(&this->attr, PTHREAD_MUTEX_RECURSIVE));						MKH_THROWONFAILURE(pthread_mutexattr_setpshared(&this->attr, PTHREAD_PROCESS_PRIVATE));						MKH_THROWONFAILURE(pthread_mutex_init(&this->mutex, &this->attr));						}					~CriticalSection()						{						MKH_THROWONFAILURE(pthread_mutex_destroy(&this->mutex));						MKH_THROWONFAILURE(pthread_mutexattr_destroy(&this->attr));						}					inline void Lock()						{						MKH_THROWONFAILURE(pthread_mutex_lock(&this->mutex));						}					inline void Unlock()						{						MKH_THROWONFAILURE(pthread_mutex_unlock(&this->mutex));						}				protected:					pthread_mutexattr_t attr;					pthread_mutex_t mutex;				};			struct FakeCriticalSection				{				void Lock(){}				void Unlock(){}				};			//TODO MKH Linux sempaphroe			struct Semaphore				{				//Just haven't needed it yet				};			//TODO MKH Linux event-wait, This is going to be *hard*			struct Wait				{				};			template<typename TSync = CriticalSection>			class Lock				{				public:					typedef TSync sync_t;					inline Lock(TSync& Sync_) : m_Sync(Sync_)						{						m_Sync.Lock();						}					inline Lock(TSync* pSync_) : m_Sync(*pSync_)						{						assert(pSync_);						m_Sync.Lock();						}					inline ~Lock()						{						m_Sync.Unlock();						}				protected:					TSync& m_Sync;				};			template<bool bAutoReset=true, bool bInitialState=false>			struct Event				{				Event() : Signaled(bInitialState), Pulsing(false), Waiting(0)					{					MKH_THROWONFAILURE(pthread_condattr_init(&this->attr));					MKH_THROWONFAILURE(pthread_condattr_setpshared(&this->attr, PTHREAD_PROCESS_PRIVATE));					MKH_THROWONFAILURE(pthread_cond_init(&this->event, &this->attr));					}				~Event()					{					MKH_THROWONFAILURE(pthread_cond_destroy(&this->event));					MKH_THROWONFAILURE(pthread_condattr_destroy(&this->attr));					}				bool Reset()					{					Lock<> lock(cond_mutex);					this->Signaled = false;					return true;					}				bool Signal()					{					Lock<> lock(cond_mutex);					this->Signaled = true;					return true;					}				bool WaitFor(int ms)					{					Lock<> lock(cond_mutex);					timespec waituntil;					MKH_THROWONFAILURE(clock_gettime(CLOCK_REALTIME, &waituntil));					waituntil.tv_sec  += ms/1000;					waituntil.tv_nsec += (ms%1000) * 1000000;										while(this->IsReset())						{						this->Waiting++;						int err = pthread_cond_timedwait(&this->event, &this->cond_mutex, &waitfor);						if(err==0)							{							if(this->Singaled || this->Pulsing)								{								if(bAutoReset && this->Pulsing)									{									this->Singaled = false;									this->Pulsing  = false;									}								this->Waiting--;								return true;								}							else								continue;							}						else if(err==ETIMEDOUT)							{							this->Waiting--;							return false;							}						else if(err==EINVAL)							{							this->Waiting--;							MKH_THROWONFAILURE(err);							}						else if(err==EPERM)							{							this->Waiting--;							MKH_THROWONFAILURE(err);							}						else							{							this->Waiting--;							MKH_THROWONFAILURE(err);							}						}					this->Waiting--;					return true;					}				bool IsReset()					{					Lock<> lock(cond_mutex);					return !this->Signaled;					}				bool IsSignaled()					{					Lock<> lock(cond_mutex);					return this->Signaled;					}				bool Pulse()					{					/*					If the event is manual, all waiting threads are released, the event is set to nonsignaled,						and PulseEvent returns. If the event is automatic, a single thread is released, the event						is set to nonsignaled, and PulseEvent returns. 					If no threads are waiting, or no threads can be released immediately, PulseEvent sets the						state of the event to nonsignaled and returns.*/					Lock<> lock(cond_mutex);					this->Signaled=false;					if(Waiting>0)						{						if(bAutoReset)							{							this->Pulsing=true;							MKH_THROWONFAILURE(pthread_cond_signal(&this->event))							//pthread_cond_signal(&this->event);							}						else							{							this->Pulsing=true;							MKH_THROWONFAILURE(pthread_cond_broadcast(&this->event))							//pthread_cond_broadcast(&this->event);							}						Sleep(0);						}										return true;					}				protected:					CriticalSection cond_mutex;					//CriticalSection sync_mutex;					pthread_cond_t event;					pthread_condattr_t attr;					volatile bool Signaled;					volatile bool Pulsing;					int Waiting;				};						inline int CurrentThreadID()				{				pthread_t thr = ::pthread_self();				return (int)thr;				}			template<class ParentClass>			struct Thread				{				typedef Thread<ParentClass> this_t;				Thread() : pthread(0), parent(0)					{}				~Thread()					{					Exit();					Join();					}				typedef unsigned int (ParentClass::*MethodProc)(void);								bool Create(ParentClass* pThis, MethodProc pThreadMethod, const char* const name = 0)					{					//TODO MKH Linux thread priorities					this->parent = pThis;					this->methodproc = pThreadMethod;					//pthread_attr_t attr;					//pthread_attr_init(&attr);					//pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);					if(0==pthread_create(&this->pthread, NULL, &DefaultProc, this))						{						//pthread_attr_destroy(&attr);						return true;						}					else						{						//pthread_attr_destroy(&attr);						this->pthread=0;						this->parent=0;						this->methodproc=0;						return false;						}					}				bool Close()					{					if(!IsTerminated())						{						this->logfunc(SC("Terminating thread on Thread<>::Close\r\n"));						Exit();						Join(5000);						}					if(close(this->pthread))						{						this->pthread=0;						return true;						}					else						return false;					}				bool IsExiting()					{					return evExit.IsSignaled();					}				bool Exit()					{					return evExit.Signal();					}				bool Join(int wait_ms = 5000)					{					Lock<> lock(join_mutex);					if(this->IsValid())						{						//evExit.Signal();						void* ret=0;						if(pthread_join(this->pthread, &ret))							{//error joining							//this->pthread=0;							return false;							}						else							{							this->pthread=0;							return true;							}						}					else						return true;					}				//bool Suspend(); bool Resume();								//bool IsRuning();				bool IsValid()					{					return !!this->pthread;					}				//bool IsTerminated()				//	{				// //TODO MKH Linux IsTerminated is way ugly...				//	return !this->pthread;  //This is the easy part				//	}								int LowestPriority()					{					//static					 int min_priority = _LowestPriority();					return min_priority;					}				int _LowestPriority()					{					int policy;					sched_param params;					MKH_THROWONFAILURE(pthread_getschedparam(this->pthread, &policy, &params))					return sched_get_priority_min(policy);					}				int DefaultPriority()					{					return 0;					}				int HighestPriority()					{					//static					 int max_priority = _HighestPriority();					return max_priority;					}				int _HighestPriority()					{					int policy;					sched_param params;					MKH_THROWONFAILURE(pthread_getschedparam(this->pthread, &policy, &params))					return sched_get_priority_max(policy);					}				int IncPriority(int priority)					{					return ++priority;					}				int DecPriority(int priority)					{					return --priority;					}				int SetPriority(int priority)					{					if(priority < LowestPriority())						priority = LowestPriority();					if(priority > HighestPriority())						priority = HighestPriority();					int policy;					sched_param params;					MKH_THROWONFAILURE(pthread_getschedparam(this->pthread, &policy, &params));					params.sched_priority = priority;					MKH_THROWONFAILURE(pthread_setschedparam(this->pthread, policy, &params));					return priority;					}				int GetPriority()					{					int policy;					struct sched_param params;					MKH_THROWONFAILURE(pthread_getschedparam(this->pthread, &policy, &params));					return params.sched_priority;					}								int ThreadID() {return this->id;}								protected:					//bool Close();					//bool Terminate();					static void* DefaultProc(void* pv)						{						unsigned int ret=0;						try							{							ret=reinterpret_cast<this_t*>(pv)->Run();							}						catch(std::exception& e)							{							//assert(0);							throw;							}						catch(...)							{							//assert(0);							throw;							}						pthread_exit((void*)ret);						}					unsigned int Run()						{						assert(parent);						assert(this);						assert(methodproc);						if(parent && methodproc)							{							unsigned int x = (parent->*methodproc)();							return x;							}						else							return UINT_MAX;						}				private:					int id;					pthread_t pthread;					MKH::Sync::CriticalSection join_mutex;					ParentClass* parent;					MethodProc methodproc;					MKH::Sync::Event<> evExit;				};			}//ns Sync		}//ns MKHusing MKH::Sync::Sleep;#endif//Portable Sync Codenamespace MKH	{	namespace Sync		{		template<typename T, typename Sync>		struct guard_lock_t			{			typedef Sync sync_t;			sync_t* sync;			T* guarded;			guard_lock_t(T* guard, sync_t* sync) : guarded(guard), sync(sync)				{				assert(this->sync);				this->sync->Lock();				}			guard_lock_t(guard_lock_t& copy) : guarded(copy.guarded), sync(copy.sync)				{				copy.guarded = 0;				copy.sync    = 0;				}			~guard_lock_t()				{				if(this->sync)					this->sync->Unlock();				}			inline T* operator->()				{				assert(guarded);				return guarded;				}			};		/*		//Syncronization Policies		namespace Policy			{			namespace				{				template<typename T, typename SyncObj>				struct _SyncPolicy					{					typedef SyncObj sync_t;					typedef Lock<SyncObj> lock_t;					typedef guard_lock_t<T, sync_t> guard_t;					};				}			template<typename T>			struct None : _SyncPolicy<T, FakeCriticalSection>				{};			//struct Atomic			//	{			//	};			template<typename T>			struct Thread : _SyncPolicy<T, CriticalSection>				{};			template<typename T>			struct Process : _SyncPolicy<T, Mutex>				{};			}//ns Policy			//*/

}//ns Sync

}//ns MKH


#endif //MKH_SYNC_HPP




[edited by - Magmai Kai Holmlor on November 7, 2003 8:42:43 PM]
- The trade-off between price and quality does not exist in Japan. Rather, the idea that high quality brings on cost reduction is widely accepted.-- Tajima & Matsubara
I used to use my own threading classes but have recently begun to use the Boost thread libraries. They are peer reviewed well enough that I can be confident there are no bugs, contain all the locking etc, work on almost anything and are pretty easy to use once you are used to them.

You might want to add some kind of TerminateAndWait function (i.e. one that sets a flag or whatever to tell the thread function to exit and then waits until it has exited) to your thread class so that you can exit cleanly and easily.

This topic is closed to new replies.

Advertisement