Problems with pthreads' condition variables

Started by
5 comments, last by Antheus 16 years, 2 months ago
Hi, I am trying to implement a small program with pthreads under Linux. The basic program flow should be: 1) Create some threads and put them to sleep 2) Main thread creates data and wakes up any of the sleeping threads 3) The waked up thread processes the data and goes back to sleep 4) Continue at 1) The problem is now, that only the last created thread will be woken up instead of any of the threads. I know due to scheduling and stuff, it is not predictable which thread will be woken up. However, I think it is very suspicious that only the last thread is working... I have absolutely no idea why this happens :-( The complete code:

// 
// File:   threaddispatcher.cc
// Author: ezschemi
//
// Created on January 28, 2008, 11:39 AM
//
#define _MULTI_THREADED
#include <iostream>
#include <pthread.h>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>

using namespace std;

const size_t NumberOfThreads = 4;

pthread_t threads[NumberOfThreads];
pthread_cond_t dataCondition = PTHREAD_COND_INITIALIZER;
pthread_mutex_t conditionMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t dataMutex = PTHREAD_MUTEX_INITIALIZER;

int sum = 0, n = 0;
bool dataPresent = false;

#define checkPThreadFunction(returnValue)     if(0 != returnValue) {         cout << "Problem at line " << __LINE__ << ": " << returnValue << endl;     }


void *workerFunction(void *argument) {

    const char *threadName = (const char*)argument;
    
    // wait for data
    while(true) {
        checkPThreadFunction(pthread_mutex_lock(&conditionMutex));
        while(!dataPresent) {
            cout << "Will wait for data: " << threadName << endl;
            checkPThreadFunction(pthread_cond_wait (&dataCondition, &conditionMutex));
        }
        checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));
        
        
        checkPThreadFunction(pthread_mutex_lock(&dataMutex));
        sum += n;
        cout << threadName << " got data, will process it: " << n  << " ==> sum = " << sum << endl;
        dataPresent = false;
        checkPThreadFunction(pthread_mutex_unlock(&dataMutex));

    }
    
    
    return NULL;
}

int main(int argc, char** argv) {
    
    cout << "Creating " << NumberOfThreads << " worker threads..." << endl;
    for(size_t t=0; t < NumberOfThreads; ++t) {
        ostringstream ost;
        ost << "Thread #" << t;
        
        checkPThreadFunction(pthread_create(&threads[t], NULL, &workerFunction, (void*)ost.str().c_str()));
    }
    
    sleep(2);
    srand(time(NULL));
    
    cout << "Working..." << endl << endl;
   
    int currentRun = 0;
    while(currentRun < 10) {
        
        checkPThreadFunction(pthread_mutex_lock(&dataMutex));
        n = rand() % 10;
        dataPresent = true;
        checkPThreadFunction(pthread_mutex_unlock(&dataMutex));
        
        // tell threads we have data
        checkPThreadFunction(pthread_mutex_lock(&conditionMutex));
        checkPThreadFunction(pthread_cond_signal(&dataCondition));
        checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));

        currentRun++;
        
        // need to put a sleep here or it won't work
        sleep(1);
    }
    
    sleep(3);
    cout << "Result: " << sum << endl;
    
    // Clean up
    cout << "Cleaning up..." << flush;
    for(size_t t=0; t < NumberOfThreads; ++t) {
        checkPThreadFunction(pthread_cancel(threads[t]));
    }
    checkPThreadFunction(pthread_cond_destroy(&dataCondition));
    checkPThreadFunction(pthread_mutex_destroy(&dataMutex));
    checkPThreadFunction(pthread_mutex_destroy(&conditionMutex));
    
    cout << "done." << endl;
    
    return (EXIT_SUCCESS);
}
Any hints are appreciated :-) Thanks, Enrico
--
Advertisement
Ok, the first problem was easy: The thread's output functionality was not working correctly.

So now each of the created threads is used. However, the order is always: 1, 2, 3, 4, 1, 2, 3, 4, ...

And I still need the sleep(1) to get this to work.

Any ideas?
--
There's at least three problems that I can see.

1. You're using sleep(1) to yield the main thread's timeslice. 1 second is a very long time to the computer, and the worker thread will certainly have finished it's job and gone back to sleep again before the main loop gets any more data. Replace with a yield (sched_yield()), a shorter sleep (usleep()) or proper blocking mechanism.

2. Having a seperate mutex for the condition variable is pointless and exposes you to race conditions and deadlocks. Lose it, and signal on the dataMutex.

3. As the code stands, your worker threads lock dataMutex for the entirety of their processing time. This means that none of the other worker threads will ever be able to do any work until that thread is finished, making the whole use of multithreading rather pointless. You'll need those threads to take copies of that data for processing, rather than acting directly upon it, or otherwise have a separate dataMutex for each thread (or use a semaphore).
Quote:I have absolutely no idea why this happens :-(


Because of this:
int sum = 0, n = 0;bool dataPresent = false;
Your data is global. As such, you only provide enough work for a single thread.

Also - your code has logical error, which means the results of such dispatching will be non-deterministic. The while loop is comprised out of two parts: WAIT, WORK. Now this can happen:

T1                   T2                   DispatcherWAIT                    WAIT                                      datapresent = true; wake all*woken*             *woken*WORKdatapresent=falseWAIT                    WORK  <------- BOOM


Why is T2 doing work, when datapresent is set to false?


The following is possible if context switch occurs between these two lines:
checkPThreadFunction(pthread_mutex_unlock(&conditionMutex));<-- here                checkPThreadFunction(pthread_mutex_lock(&dataMutex));
While it may seem unlikely, it shouldn't take more than a few seconds for this to happen on multi-core CPU.
Quote:Original post by Sandman
There's at least three problems that I can see.

1. You're using sleep(1) to yield the main thread's timeslice. 1 second is a very long time to the computer, and the worker thread will certainly have finished it's job and gone back to sleep again before the main loop gets any more data. Replace with a yield (sched_yield()), a shorter sleep (usleep()) or proper blocking mechanism.

2. Having a seperate mutex for the condition variable is pointless and exposes you to race conditions and deadlocks. Lose it, and signal on the dataMutex.

3. As the code stands, your worker threads lock dataMutex for the entirety of their processing time. This means that none of the other worker threads will ever be able to do any work until that thread is finished, making the whole use of multithreading rather pointless. You'll need those threads to take copies of that data for processing, rather than acting directly upon it, or otherwise have a separate dataMutex for each thread (or use a semaphore).

You are correct (of course ;-) ). The initial purpose for this program was to get used to signals.

I changed the program according to your comments, and now everything seems to be fine. The program is now filling a block of memory with some calculated values, which is easy to split up into threads without copying data. And the performance increase from multi core is quite nice =)

However, there is one thing which I need to improve: Right now my main loop starts giving work to the threads. When at least one thread has finished, another job is started. When looking at my code, I feel something is not correct: Imagine four worker threads, so I want to start four jobs at the beginning. Then the programs waits until at least one thread has finished and starts a new job. What I assume is the following: The main loop starts the first job and then waits until this job has finished. Then another job is started and the main lopp waits for it to finish. This is my understanding, so can somebody please confirm this?
However, when looking at the results and CPU usage (max 25%) there are several jobs running parallel which I do not really understand :-( Maybe you can explain this to me?

Another minor thing to change is the killing of the threads. Currently this is not correct, the locks and conditions might not be released.


#define _MULTI_THREADED#include <iostream>#include <pthread.h>#include <sstream>#include <string>#include <cstdio>#include <cstdlib>#include <unistd.h>#include <cmath>using namespace std;const size_t NumberOfThreads = 1;typedef struct sThreadInfo {    size_t number;    double *memory;    } ThreadInfo;pthread_t threads[NumberOfThreads];ThreadInfo threadInformation[NumberOfThreads];// used to signal when data is presentpthread_cond_t dataPresentCondition = PTHREAD_COND_INITIALIZER;pthread_mutex_t dataPresentConditionMutex = PTHREAD_MUTEX_INITIALIZER;bool dataPresent = false;// used to signal when data is readypthread_cond_t dataFinishedCondition = PTHREAD_COND_INITIALIZER;pthread_mutex_t dataFinishedConditionMutex = PTHREAD_MUTEX_INITIALIZER;bool dataFinished = false;bool doProcessing = true;const size_t nEntries = 10000000; // Number of entries in memoryconst size_t EntriesPerThread = nEntries / NumberOfThreads;#define checkPThreadFunction(returnValue)     if(0 != returnValue) {         cout << "Problem at line " << __LINE__ << ": " << returnValue << endl;     }void *workerFunction(void *argument) {    const ThreadInfo *threadInfo = (const ThreadInfo*) argument;    ostringstream ost;    ost << "Thread #" << threadInfo->number;    const string ThreadName(ost.str());        double *memory = threadInfo->memory;        // wait for data    while(doProcessing) {        checkPThreadFunction(pthread_mutex_lock(&dataPresentConditionMutex));        while(!dataPresent && doProcessing) {            cout << "Will wait for data: " << ThreadName << endl;            checkPThreadFunction(pthread_cond_wait (&dataPresentCondition, &dataPresentConditionMutex));        }        dataPresent = false;        cout << ThreadName << " got data. Processing..." << endl;        checkPThreadFunction(pthread_mutex_unlock(&dataPresentConditionMutex));                // do some expensive calculation to simulate real work        const double result = rand();        for(size_t r=0; r < 1; ++r) {            for(size_t e=0; e < EntriesPerThread; ++e) {                memory[e] = sqrt(result) + sin(result) / cos(result) / tan(result);            }        }                checkPThreadFunction(pthread_mutex_lock(&dataFinishedConditionMutex));        dataFinished = true;        checkPThreadFunction(pthread_mutex_unlock(&dataFinishedConditionMutex));        checkPThreadFunction(pthread_cond_signal(&dataFinishedCondition));    }    return NULL;}int main(int argc, char** argv) {    double *memory = new double[nEntries];           cout << "Creating " << NumberOfThreads << " worker threads..." << endl;    cout << "Each thread will work on " << EntriesPerThread << " entries." << endl;    for(size_t t=0; t < NumberOfThreads; ++t) {            threadInformation[t].number = t;        threadInformation[t].memory = memory + t * EntriesPerThread;        checkPThreadFunction(pthread_create(&threads[t], NULL, &workerFunction, (void*) (&threadInformation[t]) ) );    }        sleep(2);    srand(time(NULL));        cout << "Working..." << endl << endl;       int currentJob = 0;    const int nOverallJobs = 20;    // execute jobs    while(currentJob < nOverallJobs) {                checkPThreadFunction(pthread_mutex_lock(&dataPresentConditionMutex));        cout << endl << "Processing job " << currentJob+1 << "/" << nOverallJobs << endl;        dataPresent = true;        checkPThreadFunction(pthread_mutex_unlock(&dataPresentConditionMutex));                // tell threads we have data        checkPThreadFunction(pthread_cond_broadcast(&dataPresentCondition));                currentJob++;                // wait for at least one thread to finish        checkPThreadFunction(pthread_mutex_lock(&dataFinishedConditionMutex));        while(!dataFinished) {            checkPThreadFunction(pthread_cond_wait (&dataFinishedCondition, &dataFinishedConditionMutex));        }        dataFinished = false;        checkPThreadFunction(pthread_mutex_unlock(&dataFinishedConditionMutex));    }        // Clean up    cout << "Waiting for threads to finish..." << flush;        for(size_t t=0; t < NumberOfThreads; ++t) {        // ugly, because locks and conditions might not get released properly        checkPThreadFunction(pthread_cancel(threads[t]));    }    cout << "cleaning up..." << flush;    checkPThreadFunction(pthread_cond_destroy(&dataPresentCondition));    checkPThreadFunction(pthread_mutex_destroy(&dataPresentConditionMutex));        delete []memory; memory = NULL;        cout << "done." << endl;        return (EXIT_SUCCESS);}




Thank you,
Enrico
--
Quote:Original post by Enrico
However, there is one thing which I need to improve: Right now my main loop starts giving work to the threads. When at least one thread has finished, another job is started. When looking at my code, I feel something is not correct: Imagine four worker threads, so I want to start four jobs at the beginning. Then the programs waits until at least one thread has finished and starts a new job. What I assume is the following: The main loop starts the first job and then waits until this job has finished. Then another job is started and the main lopp waits for it to finish. This is my understanding, so can somebody please confirm this?


First of all, the new code NumberOfThreads = 1, so unless that's a typo then you don't have four threads running [grin].

However, suppose you do have four threads. The pthread_cond_broadcast wakes up all of them, and they all attempt to lock the dataMutex (this is done implicitly on leaving the pthread_cond_wait() function)
One of them will succeed, exit it's while loop and immediately sets dataPresent to false before unlocking dataMutex. The next and subsequent threads to lock dataMutex will remain in the while loop dataPresent is false again, and will go back into pthread_cond_wait(). The end result is that you should only be seeing 1 worker thread actually doing anything at a time.

You should probably have a dataPresent flag for each thread. Also, you may find that semaphores would be better suited to this problem than mutexes.

Quote:
Another minor thing to change is the killing of the threads. Currently this is not correct, the locks and conditions might not be released.


I'm not sure I completely trust pthread_cancel either, although judging from the man page it should be fairly safe; it's not so much pulling the rug out from under the thread as asking it nicely to stop, which it will do so when it reaches a suitable cancellation point. Probably the reason you're getting errors is because you're not actually waiting for the threads to terminate before trying to destroy the mutexes; you should always call pthread_join to wait for your threads to terminate properly.

If you want to avoid pthread_cancel and do things explicitly, you can implement your own requests to ask threads to stop; it looks like you've started to do so with your apparently unused doProcessing flag. You'll still need to call pthread_join though.
Quote:However, when looking at the results and CPU usage (max 25%) there are several jobs running parallel which I do not really understand :-( Maybe you can explain this to me?


My guess would be that synchronizing between dispatcher and workers is the bottleneck.

If you want true concurrency, you need to organize data in such a manner that nobody waits for anybody.

The usual worker thread handlers would work something like this:
struct Worker;struct WorkUnit {  // worker parameters};class Dispatcher {public:friend struct Worker;  Dispatcher( unsigned int n_workers )  {    // create and initialize worker threads      }  void add_job( WorkUnit * w ) {    checkPThreadFunction(pthread_mutex_lock(&job_mutex));    // add job to the queue    jobs.push_back( w );    // notify in case any threads are waiting    checkPThreadFunction(pthread_cond_signal(&job_condition));             checkPThreadFunction(pthread_mutex_unlock(&job_mutex));  }private:  // Returns next job to be processed  // Will block if no jobs are available  // Returns NULL if worker should shut down  WorkUnit * get_next_job() {    if (shutting_down) return NULL;    // lock available jobs    checkPThreadFunction(pthread_mutex_lock(&job_mutex));    // do we have any work to do?    if (jobs.size() > 0) {      // Yes, remove one job from the queue, and return it      WorkUnit * job = jobs.front();      jobs.pop();      checkPThreadFunction(pthread_mutex_unlock(&job_mutex));      return job;    }    // no, wait until we do    checkPThreadFunction(pthread_cond_wait (&job_condition, &job_mutex));    checkPThreadFunction(pthread_mutex_unlock(&job_mutex));  }  // job queue  std::queue< WorkUnit * > jobs;  // locks for accessing the job queue  pthread_mutex_t job_mutex;  pthread_cond_t job_condition;  bool shutting_down;  // store all workers somewhere, needed to clean up the threads.  std::vector< Worker * > workers;};struct Worker {   Worker( Dispatcher * dispatcher )    : d(dispatcher)  {}    void do_work() {     WorkUnit * job = d->get_next_job();    while (job != NULL) {       // do actual work here      job = d->get_next_job();    }      }  Dispatcher * d;};



User sends work to be done to dispatcher via add_job. Dispatcher puts it to the queue, and notifies any threads that might be waiting.

Workers run in a loop, whenever they are idle, they request new job from dispatcher. If dispatcher is shutting down, the job returned will be NULL.

The obvious bottle-neck here will be job manipulation. The design however lends itself trivially to replacement of job queue with lock-less implementation.

There is no global or static data. Everything that a worker needs to perform its work is stored in WorkUnit.

PS: While you're using C, the example is C++, but classes aren't required and should be trivially convertible to functions. The job queue would be array of WorkUnits, and you'd need to manage addition and removal yourself.

This topic is closed to new replies.

Advertisement