Sign in to follow this  
Enrico

Problems with pthreads' condition variables

Recommended Posts

Hi, I am trying to implement a small program with pthreads under Linux. The basic program flow should be: 1) Create some threads and put them to sleep 2) Main thread creates data and wakes up any of the sleeping threads 3) The waked up thread processes the data and goes back to sleep 4) Continue at 1) The problem is now, that only the last created thread will be woken up instead of any of the threads. I know due to scheduling and stuff, it is not predictable which thread will be woken up. However, I think it is very suspicious that only the last thread is working... I have absolutely no idea why this happens :-( The complete code:
// 
// File:   threaddispatcher.cc
// Author: ezschemi
//
// Created on January 28, 2008, 11:39 AM
//
#define _MULTI_THREADED
#include <iostream>
#include <pthread.h>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>

using namespace std;

const size_t NumberOfThreads = 4;

pthread_t threads[NumberOfThreads];
pthread_cond_t dataCondition = PTHREAD_COND_INITIALIZER;
pthread_mutex_t conditionMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t dataMutex = PTHREAD_MUTEX_INITIALIZER;

int sum = 0, n = 0;
bool dataPresent = false;

#define checkPThreadFunction(returnValue)     if(0 != returnValue) {         cout << "Problem at line " << __LINE__ << ": " << returnValue << endl;     }


void *workerFunction(void *argument) {

    const char *threadName = (const char*)argument;
    
    // wait for data
    while(true) {
        checkPThreadFunction(pthread_mutex_lock(&conditionMutex));
        while(!dataPresent) {
            cout << "Will wait for data: " << threadName << endl;
            checkPThreadFunction(pthread_cond_wait (&dataCondition, &conditionMutex));
        }
        checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));
        
        
        checkPThreadFunction(pthread_mutex_lock(&dataMutex));
        sum += n;
        cout << threadName << " got data, will process it: " << n  << " ==> sum = " << sum << endl;
        dataPresent = false;
        checkPThreadFunction(pthread_mutex_unlock(&dataMutex));

    }
    
    
    return NULL;
}

int main(int argc, char** argv) {
    
    cout << "Creating " << NumberOfThreads << " worker threads..." << endl;
    for(size_t t=0; t < NumberOfThreads; ++t) {
        ostringstream ost;
        ost << "Thread #" << t;
        
        checkPThreadFunction(pthread_create(&threads[t], NULL, &workerFunction, (void*)ost.str().c_str()));
    }
    
    sleep(2);
    srand(time(NULL));
    
    cout << "Working..." << endl << endl;
   
    int currentRun = 0;
    while(currentRun < 10) {
        
        checkPThreadFunction(pthread_mutex_lock(&dataMutex));
        n = rand() % 10;
        dataPresent = true;
        checkPThreadFunction(pthread_mutex_unlock(&dataMutex));
        
        // tell threads we have data
        checkPThreadFunction(pthread_mutex_lock(&conditionMutex));
        checkPThreadFunction(pthread_cond_signal(&dataCondition));
        checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));

        currentRun++;
        
        // need to put a sleep here or it won't work
        sleep(1);
    }
    
    sleep(3);
    cout << "Result: " << sum << endl;
    
    // Clean up
    cout << "Cleaning up..." << flush;
    for(size_t t=0; t < NumberOfThreads; ++t) {
        checkPThreadFunction(pthread_cancel(threads[t]));
    }
    checkPThreadFunction(pthread_cond_destroy(&dataCondition));
    checkPThreadFunction(pthread_mutex_destroy(&dataMutex));
    checkPThreadFunction(pthread_mutex_destroy(&conditionMutex));
    
    cout << "done." << endl;
    
    return (EXIT_SUCCESS);
}
Any hints are appreciated :-) Thanks, Enrico

Share this post


Link to post
Share on other sites
Ok, the first problem was easy: The thread's output functionality was not working correctly.

So now each of the created threads is used. However, the order is always: 1, 2, 3, 4, 1, 2, 3, 4, ...

And I still need the sleep(1) to get this to work.

Any ideas?

Share this post


Link to post
Share on other sites
There's at least three problems that I can see.

1. You're using sleep(1) to yield the main thread's timeslice. 1 second is a very long time to the computer, and the worker thread will certainly have finished it's job and gone back to sleep again before the main loop gets any more data. Replace with a yield (sched_yield()), a shorter sleep (usleep()) or proper blocking mechanism.

2. Having a seperate mutex for the condition variable is pointless and exposes you to race conditions and deadlocks. Lose it, and signal on the dataMutex.

3. As the code stands, your worker threads lock dataMutex for the entirety of their processing time. This means that none of the other worker threads will ever be able to do any work until that thread is finished, making the whole use of multithreading rather pointless. You'll need those threads to take copies of that data for processing, rather than acting directly upon it, or otherwise have a separate dataMutex for each thread (or use a semaphore).

Share this post


Link to post
Share on other sites
Quote:
I have absolutely no idea why this happens :-(


Because of this:
int sum = 0, n = 0;

bool dataPresent = false;
Your data is global. As such, you only provide enough work for a single thread.

Also - your code has logical error, which means the results of such dispatching will be non-deterministic. The while loop is comprised out of two parts: WAIT, WORK. Now this can happen:


T1 T2 Dispatcher
WAIT
WAIT
datapresent = true; wake all
*woken* *woken*
WORK
datapresent=false
WAIT
WORK <------- BOOM


Why is T2 doing work, when datapresent is set to false?


The following is possible if context switch occurs between these two lines:
checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));
<-- here
checkPThreadFunction(pthread_mutex_lock(&dataMutex));
While it may seem unlikely, it shouldn't take more than a few seconds for this to happen on multi-core CPU.

Share this post


Link to post
Share on other sites
Quote:
Original post by Sandman
There's at least three problems that I can see.

1. You're using sleep(1) to yield the main thread's timeslice. 1 second is a very long time to the computer, and the worker thread will certainly have finished it's job and gone back to sleep again before the main loop gets any more data. Replace with a yield (sched_yield()), a shorter sleep (usleep()) or proper blocking mechanism.

2. Having a seperate mutex for the condition variable is pointless and exposes you to race conditions and deadlocks. Lose it, and signal on the dataMutex.

3. As the code stands, your worker threads lock dataMutex for the entirety of their processing time. This means that none of the other worker threads will ever be able to do any work until that thread is finished, making the whole use of multithreading rather pointless. You'll need those threads to take copies of that data for processing, rather than acting directly upon it, or otherwise have a separate dataMutex for each thread (or use a semaphore).

You are correct (of course ;-) ). The initial purpose for this program was to get used to signals.

I changed the program according to your comments, and now everything seems to be fine. The program is now filling a block of memory with some calculated values, which is easy to split up into threads without copying data. And the performance increase from multi core is quite nice =)

However, there is one thing which I need to improve: Right now my main loop starts giving work to the threads. When at least one thread has finished, another job is started. When looking at my code, I feel something is not correct: Imagine four worker threads, so I want to start four jobs at the beginning. Then the programs waits until at least one thread has finished and starts a new job. What I assume is the following: The main loop starts the first job and then waits until this job has finished. Then another job is started and the main lopp waits for it to finish. This is my understanding, so can somebody please confirm this?
However, when looking at the results and CPU usage (max 25%) there are several jobs running parallel which I do not really understand :-( Maybe you can explain this to me?

Another minor thing to change is the killing of the threads. Currently this is not correct, the locks and conditions might not be released.


#define _MULTI_THREADED
#include <iostream>
#include <pthread.h>
#include <sstream>
#include <string>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <cmath>

using namespace std;

const size_t NumberOfThreads = 1;

typedef struct sThreadInfo {
size_t number;
double *memory;

} ThreadInfo;

pthread_t threads[NumberOfThreads];
ThreadInfo threadInformation[NumberOfThreads];

// used to signal when data is present
pthread_cond_t dataPresentCondition = PTHREAD_COND_INITIALIZER;
pthread_mutex_t dataPresentConditionMutex = PTHREAD_MUTEX_INITIALIZER;
bool dataPresent = false;

// used to signal when data is ready
pthread_cond_t dataFinishedCondition = PTHREAD_COND_INITIALIZER;
pthread_mutex_t dataFinishedConditionMutex = PTHREAD_MUTEX_INITIALIZER;
bool dataFinished = false;

bool doProcessing = true;

const size_t nEntries = 10000000; // Number of entries in memory
const size_t EntriesPerThread = nEntries / NumberOfThreads;

#define checkPThreadFunction(returnValue) if(0 != returnValue) { cout << "Problem at line " << __LINE__ << ": " << returnValue << endl; }


void *workerFunction(void *argument) {

const ThreadInfo *threadInfo = (const ThreadInfo*) argument;

ostringstream ost;
ost << "Thread #" << threadInfo->number;
const string ThreadName(ost.str());

double *memory = threadInfo->memory;

// wait for data
while(doProcessing) {
checkPThreadFunction(pthread_mutex_lock(&dataPresentConditionMutex));
while(!dataPresent && doProcessing) {
cout << "Will wait for data: " << ThreadName << endl;
checkPThreadFunction(pthread_cond_wait (&dataPresentCondition, &dataPresentConditionMutex));
}
dataPresent = false;
cout << ThreadName << " got data. Processing..." << endl;
checkPThreadFunction(pthread_mutex_unlock(&dataPresentConditionMutex));

// do some expensive calculation to simulate real work
const double result = rand();
for(size_t r=0; r < 1; ++r) {
for(size_t e=0; e < EntriesPerThread; ++e) {
memory[e] = sqrt(result) + sin(result) / cos(result) / tan(result);
}
}

checkPThreadFunction(pthread_mutex_lock(&dataFinishedConditionMutex));
dataFinished = true;
checkPThreadFunction(pthread_mutex_unlock(&dataFinishedConditionMutex));
checkPThreadFunction(pthread_cond_signal(&dataFinishedCondition));
}
return NULL;
}

int main(int argc, char** argv) {

double *memory = new double[nEntries];


cout << "Creating " << NumberOfThreads << " worker threads..." << endl;
cout << "Each thread will work on " << EntriesPerThread << " entries." << endl;
for(size_t t=0; t < NumberOfThreads; ++t) {
threadInformation[t].number = t;
threadInformation[t].memory = memory + t * EntriesPerThread;
checkPThreadFunction(pthread_create(&threads[t], NULL, &workerFunction, (void*) (&threadInformation[t]) ) );
}

sleep(2);
srand(time(NULL));

cout << "Working..." << endl << endl;

int currentJob = 0;
const int nOverallJobs = 20;
// execute jobs
while(currentJob < nOverallJobs) {

checkPThreadFunction(pthread_mutex_lock(&dataPresentConditionMutex));
cout << endl << "Processing job " << currentJob+1 << "/" << nOverallJobs << endl;
dataPresent = true;
checkPThreadFunction(pthread_mutex_unlock(&dataPresentConditionMutex));

// tell threads we have data
checkPThreadFunction(pthread_cond_broadcast(&dataPresentCondition));


currentJob++;

// wait for at least one thread to finish
checkPThreadFunction(pthread_mutex_lock(&dataFinishedConditionMutex));
while(!dataFinished) {
checkPThreadFunction(pthread_cond_wait (&dataFinishedCondition, &dataFinishedConditionMutex));
}
dataFinished = false;
checkPThreadFunction(pthread_mutex_unlock(&dataFinishedConditionMutex));
}

// Clean up
cout << "Waiting for threads to finish..." << flush;

for(size_t t=0; t < NumberOfThreads; ++t) {
// ugly, because locks and conditions might not get released properly
checkPThreadFunction(pthread_cancel(threads[t]));
}
cout << "cleaning up..." << flush;
checkPThreadFunction(pthread_cond_destroy(&dataPresentCondition));
checkPThreadFunction(pthread_mutex_destroy(&dataPresentConditionMutex));

delete []memory; memory = NULL;

cout << "done." << endl;

return (EXIT_SUCCESS);
}




Thank you,
Enrico

Share this post


Link to post
Share on other sites
Quote:
Original post by Enrico
However, there is one thing which I need to improve: Right now my main loop starts giving work to the threads. When at least one thread has finished, another job is started. When looking at my code, I feel something is not correct: Imagine four worker threads, so I want to start four jobs at the beginning. Then the programs waits until at least one thread has finished and starts a new job. What I assume is the following: The main loop starts the first job and then waits until this job has finished. Then another job is started and the main lopp waits for it to finish. This is my understanding, so can somebody please confirm this?


First of all, the new code NumberOfThreads = 1, so unless that's a typo then you don't have four threads running [grin].

However, suppose you do have four threads. The pthread_cond_broadcast wakes up all of them, and they all attempt to lock the dataMutex (this is done implicitly on leaving the pthread_cond_wait() function)
One of them will succeed, exit it's while loop and immediately sets dataPresent to false before unlocking dataMutex. The next and subsequent threads to lock dataMutex will remain in the while loop dataPresent is false again, and will go back into pthread_cond_wait(). The end result is that you should only be seeing 1 worker thread actually doing anything at a time.

You should probably have a dataPresent flag for each thread. Also, you may find that semaphores would be better suited to this problem than mutexes.

Quote:

Another minor thing to change is the killing of the threads. Currently this is not correct, the locks and conditions might not be released.


I'm not sure I completely trust pthread_cancel either, although judging from the man page it should be fairly safe; it's not so much pulling the rug out from under the thread as asking it nicely to stop, which it will do so when it reaches a suitable cancellation point. Probably the reason you're getting errors is because you're not actually waiting for the threads to terminate before trying to destroy the mutexes; you should always call pthread_join to wait for your threads to terminate properly.

If you want to avoid pthread_cancel and do things explicitly, you can implement your own requests to ask threads to stop; it looks like you've started to do so with your apparently unused doProcessing flag. You'll still need to call pthread_join though.

Share this post


Link to post
Share on other sites
Quote:
However, when looking at the results and CPU usage (max 25%) there are several jobs running parallel which I do not really understand :-( Maybe you can explain this to me?


My guess would be that synchronizing between dispatcher and workers is the bottleneck.

If you want true concurrency, you need to organize data in such a manner that nobody waits for anybody.

The usual worker thread handlers would work something like this:

struct Worker;

struct WorkUnit {
// worker parameters
};

class Dispatcher {
public:
friend struct Worker;

Dispatcher( unsigned int n_workers )
{
// create and initialize worker threads
}

void add_job( WorkUnit * w ) {
checkPThreadFunction(pthread_mutex_lock(&job_mutex));
// add job to the queue
jobs.push_back( w );
// notify in case any threads are waiting
checkPThreadFunction(pthread_cond_signal(&job_condition));
checkPThreadFunction(pthread_mutex_unlock(&job_mutex));
}
private:
// Returns next job to be processed
// Will block if no jobs are available
// Returns NULL if worker should shut down
WorkUnit * get_next_job() {
if (shutting_down) return NULL;
// lock available jobs
checkPThreadFunction(pthread_mutex_lock(&job_mutex));
// do we have any work to do?
if (jobs.size() > 0) {
// Yes, remove one job from the queue, and return it
WorkUnit * job = jobs.front();
jobs.pop();
checkPThreadFunction(pthread_mutex_unlock(&job_mutex));
return job;
}
// no, wait until we do
checkPThreadFunction(pthread_cond_wait (&job_condition, &job_mutex));
checkPThreadFunction(pthread_mutex_unlock(&job_mutex));
}

// job queue
std::queue< WorkUnit * > jobs;

// locks for accessing the job queue
pthread_mutex_t job_mutex;
pthread_cond_t job_condition;

bool shutting_down;


// store all workers somewhere, needed to clean up the threads.
std::vector< Worker * > workers;
};

struct Worker {

Worker( Dispatcher * dispatcher )
: d(dispatcher)
{}

void do_work() {
WorkUnit * job = d->get_next_job();
while (job != NULL) {

// do actual work here

job = d->get_next_job();
}
}

Dispatcher * d;
};






User sends work to be done to dispatcher via add_job. Dispatcher puts it to the queue, and notifies any threads that might be waiting.

Workers run in a loop, whenever they are idle, they request new job from dispatcher. If dispatcher is shutting down, the job returned will be NULL.

The obvious bottle-neck here will be job manipulation. The design however lends itself trivially to replacement of job queue with lock-less implementation.

There is no global or static data. Everything that a worker needs to perform its work is stored in WorkUnit.

PS: While you're using C, the example is C++, but classes aren't required and should be trivially convertible to functions. The job queue would be array of WorkUnits, and you'd need to manage addition and removal yourself.

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this