Deadlock dilemma with condition variables

Started by
3 comments, last by KaiserJohan 11 years, 7 months ago
Hello,

I have a problem with condition variables; they are causing a deadlock in my code.
I have a bunch of threads running the "Worker" method (using Boost::bind and boost::function), and when I destroy my threadpool, I do TerminateAllWorkers().

About one out of ten times it causes a deadlock, so there must be something fundamentally wrong with the way I use the condition variables.


void ThreadPool::TerminateAllWorkers()
{
mMutex->Lock();
mDesiredNumThreads = 0;
mMutex->Unlock();
mCondVar_NewTaskOrKillWorker->Broadcast();
while (mNumThreads > 0)
{
mLogger.LogInfo() << "TerminateAllWorkers::mCondVar_WorkDoneOrWorkerKilled->Wait()" << std::endl;
mCondVar_WorkDoneOrWorkerKilled->Wait();
}
}


void ThreadPool::Worker(void* arg)
{
bool running = true;
while (running)
{
mMutex->Lock();
Task task = NULL;
// either terminate the worker, or check for tasks
if (mDesiredNumThreads < mNumThreads)
{
running = false;
mNumThreads--;
}
else
{
// if there is a task, pop it
if (!mScheduledTasks.empty())
{
task = mScheduledTasks.front();
mScheduledTasks.erase(mScheduledTasks.begin());
}
}
mMutex->Unlock();



// if task, run and then signal task is complete
if (task)
{
task();
mCondVar_WorkDoneOrWorkerKilled->Signal();
}

// terminate if requested, otherwise wait for new task
if (running)
{
mLogger.LogInfo() << "Worker::mCondVar_NewTaskOrKillWorker->Wait()" << std::endl;
mCondVar_NewTaskOrKillWorker->Wait();
}
else
{
mLogger.LogInfo() << "Worker::mCondVar_WorkDoneOrWorkerKilled->Signal()" << std::endl;
mCondVar_WorkDoneOrWorkerKilled->Signal();
}
}
}




I am testing on Windows platform, by the way. Mutex lock/unlock is wrappers around WaitForSingleObject/ReleaseMutex, and here's simply ConditionVariable wait/broadcast/signal:

void ConditionVariable::Wait()
{
mCondVarState = ConditionVariable::WAITING;
#if defined _WIN32 || _WIN64
WaitForSingleObject(mCondVarHandle, INFINITE);
#else
mMutex.Lock();
pthread_cond_wait(&mCondVarHandle, &mMutex.GetNativeHandle());
mMutex.Unlock();
#endif
mCondVarState = ConditionVariable::READY;
}
void ConditionVariable::Signal()
{
#if defined _WIN32 || _WIN64
SetEvent(mCondVarHandle);
ResetEvent(mCondVarHandle);
#else
mMutex.Lock();
pthread_cond_signal(&mCondVarHandle);
mMutex.Unlock();
#endif
}
void ConditionVariable::Broadcast()
{
#if defined _WIN32 || _WIN64
SetEvent(mCondVarHandle);
ResetEvent(mCondVarHandle);
#else
mMutex.Lock();
pthread_cond_broadcast(&mCondVarHandle);
mMutex.Unlock();
#endif
}
Advertisement


void ThreadPool::TerminateAllWorkers()
{
mMutex->Lock();
mDesiredNumThreads = 0;
mMutex->Unlock();
mCondVar_NewTaskOrKillWorker->Broadcast();
while (mNumThreads > 0)


Here, you're accessing mNumThreads without having locked the associated mutex, which presumably is mMutex.

Also, consider using the scoped-locking pattern. You'll find it in pretty much any C++ threads library (for good reason).



{
mLogger.LogInfo() << "TerminateAllWorkers::mCondVar_WorkDoneOrWorkerKilled->Wait()" << std::endl;
mCondVar_WorkDoneOrWorkerKilled->Wait();

[/quote]
Waiting on a condition variable should internally unlock a mutex and ensure it is locked when the wait call returns. Your implementation doesn't seem to be doing that, here.



...

// terminate if requested, otherwise wait for new task
if (running)
{
mLogger.LogInfo() << "Worker::mCondVar_NewTaskOrKillWorker->Wait()" << std::endl;
mCondVar_NewTaskOrKillWorker->Wait();

[/quote]
And again here.

I would look at the use of condition variables closer, but the Windows implementation below doesn't look at all correct sad.png

void ConditionVariable::Wait()
{
mCondVarState = ConditionVariable::WAITING;
#if defined _WIN32 || _WIN64
WaitForSingleObject(mCondVarHandle, INFINITE);
#else
mMutex.Lock();
pthread_cond_wait(&mCondVarHandle, &mMutex.GetNativeHandle());
mMutex.Unlock();
#endif
mCondVarState = ConditionVariable::READY;
}
void ConditionVariable::Signal()
{
#if defined _WIN32 || _WIN64
SetEvent(mCondVarHandle);
ResetEvent(mCondVarHandle);

[/quote]
For example, here SetEvent could wake a waiting thread, which could then immediately wait on the condition variable again and be instantly let through, before ResetEvent is called. This is not the correct behaviour. Upon waiting again, the thread should block until a subsequent signal or broadcast awakens it.

If you are targeting Windows Vista and later, it's probably worth looking at Windows' CONDITION_VARIABLE structure and the associated functions. If you need to implement a condition variable from scratch, this informal paper (PDF) is pleasantly readable and points out the pitfalls in some other attempts.

Also, for the mutex you'll almost certainly be better served by Windows' CRITICAL_SECTION structure rather than a full mutex HANDLE, as locking and unlocking CRITICAL_SECTIONS require dipping in to the kernel far less often. You could also look at the slim reader/writer locks, but they are again Vista and onwards.

Edited to add:

You might also be well-served by separating-out the part of your Worker() function that needs to do synchronization from the part that's just churning through tasks. I don't know the rest of your code at all, but it seems more complicated than it might otherwise need to be.


// Roughly...

// Returns null if the ThreadPool wants us to stop processing
task *get_next_task()
{
scoped_lock lock(mtx);

while (tasks.empty() && state == running)
condvar.wait(lock);

if (state != running)
return 0;

assert(!tasks.empty());
task *ret = tasks.front();
tasks.pop_front();

return task;
}

void worker()
{
while (true)
{
task *next = get_next_task();
if (!next)
return;

next->run();
}
}

void stop_processing()
{
{
scoped_lock lock(mtx);
state = stopped;
}
condvar.broadcast();

for each thread:
thread.join();
}
Thanks for the reply!
Some subsequent questions:

Waiting on a condition variable should internally unlock a mutex and ensure it is locked when the wait call returns. Your implementation doesn't seem to be doing that, here.[/quote]


Could you elaborate on this one? I'm not sure I follow. I suppose I am missing than a mutex variable for the windows-variant, but why should it be locked when the wait call returns?


For example, here SetEvent could wake a waiting thread, which could then immediately wait on the condition variable again and be instantly let through, before ResetEvent is called. This is not the correct behaviour. Upon waiting again, the thread should block until a subsequent signal or broadcast awakens it.[/quote]


I understand; but, someone has to call ResetEvent() to reset the signal - how would you prevent a thread from slipping inbetween the SetEvent() and ResetEvent()?

Waiting on a condition variable should internally unlock a mutex and ensure it is locked when the wait call returns. Your implementation doesn't seem to be doing that, here.


Could you elaborate on this one? I'm not sure I follow. I suppose I am missing than a mutex variable for the windows-variant, but why should it be locked when the wait call returns?
[/quote]
Well, this is just how a condition variable works; a condition variable is always associated with a mutex. More precisely, multiple condition variables can be used with a single mutex, but the same mutex should always be used with a given condition variable. Quite often, theres a 1:1 association. The Python standard library folds the mutex in to the condition variable, in fact. I don't know if that's ultimately a good idea, but there we are.

It looks to me as if you've put a mutex inside your condition variable class simply because the pthreads API needs one, without understanding what that mutex is really for. It's important to understand their relationship to use condition variables correctly.

The key operations on a condition variable are:


class condvar
{
public:
void wait(scoped_lock &lock);

void notify(); // wake one of the waiters, if any
void broadcast(); // wake all waiters, if any
};


Note how the wait() function takes a locked mutex. In the pthreads API, a pthread_mutex_t is passed, which is assumed to be locked. In C++ we can do a little better by way of the scoped-locking pattern; we can only call wait if we have a scoped_lock, which in turn means that we can only call wait if a mutex is locked. We're still responsible for locking the associated mutex, but passing a scoped_lock ensures we have one less thing to get right.

A mutex is needed to protect shared, mutable state. Sometimes it's nice to be told when that state changes and this is what a condition variable is for. But note that just because we're using a condition variable, doesn't mean there's any less shared mutable state. So we always need to use a mutex in conjunction with a condition variable.

The classic example is a producer-consumer queue, in much the same vein as you're doing inside your thread pool. produce() looks like this:


void produce(int item)
{
{
scoped_lock lock(mtx); // lock mtx for the duration of this inner scope
queue.push_back(item);
}
cv.notify();
}


Simple enough. The mutex 'mtx' is protecting the queue, which is our shared, mutable state. Here's the consumer code:


int consume()
{
scoped_lock lock(mtx); // lock mtx for the duration of the function

while (queue.empty())
cv.wait(lock);

int item = queue.front();
queue.pop_front();

return item;
}


Since the queue is our shared, mutable state, we must lock the mutex protecting it. If we see the queue is empty, we wait to be notified of any changes to the queue.

Note again how the lock or locked mutex (depending on implementation) is passed to the condition variable's wait() method. Inside wait() the condition variable implementation unlocks the mutex. This allows another thread calling produce() to actually make progress; if wait() didn't unlock the mutex internally, produce() could never take the lock needed to modify the queue.

Now, once an item has been put in to the queue in produce(), and notify() has been called, wait() can return in the consumer thread. Upon returning, wait() re-locks the mutex.

So any at point in consume() where we touch our shared, mutable state, we have the lock, as required for correct synchronization.

I hope you see why a condition variable always has an associated mutex.

Some finer points:

The "while (condition not satisified) cv.wait(lock);" pattern is required by pretty much every condition variable implementation I've ever seen. It is necessary because 'spurious wakeups' are often possible i.e. the condition variable is signaled by the kernel for some reason other than a notify/broadcast call. Some condition variables allow you to use a call-back of some kind which is evaluated each time the condition needs to be checked, thereby avoiding the need for the while loop.

In produce(), I called notify() while the mutex was unlocked. For some condition variable implementations this is a requirement, for others it isn't. Even when it's not required, it's usually a good idea to avoid undue context switching; wait() cannot return while the producer thread holds the lock, so if notify() is called while the lock is held, this can cause unnecessary scheduler ping-pong while the situation works itself through.


I understand; but, someone has to call ResetEvent() to reset the signal - how would you prevent a thread from slipping inbetween the SetEvent() and ResetEvent()?
[/quote]
You can't. A condition variable can't be implemented in this manner sad.png

First, make sure you thoroughly understand the simple producer/consumer example above. Then have a look at the PDF I posted. It ends with a very simple (I'm tempted to say 'obvious') condition variable implementation. It uses semaphores, mutexes and thread-local storage. You might be able to use events rather than semaphores in that implementation but I doubt it will make much difference, as each thread has its own private semaphore (in TLS).
Big thanks, I will try it out!

This topic is closed to new replies.

Advertisement