JobManager - Is my design flawed?

Started by
1 comment, last by Hodgman 12 years, 5 months ago
I am new to multi-threaded programming and I am trying to build a system to queue and execute jobs asynchronously.
I am posting the code the code here and would like to ask you guys if you see something obviously flawed about it.
The performance I am seeing is not anywhere close to what I expected.


class Job : public Object < Job >
{
public:

Job() : m_active(false)
{
}

void finish()
{
ScopedMutex lock(&m_mutex);
while(m_active)
m_inactive.wait(&m_mutex);
}

private:

friend class JobManager;

void activate()
{
ScopedMutex lock(&m_mutex);
while(m_active)
m_inactive.wait(&m_mutex);
m_active = true;
}

void run() = 0;

void deactivate()
{
ScopedMutex lock(&m_mutex);
m_active = false;
m_inactive.signal();
}

Mutex m_mutex;
bool m_active;
ConditionVariable m_inactive;
};

class JobManager : public Object < JobManager >
{
public:

JobManager(size_t const p_workers)
{
for(size_t i = 0; i < p_workers; ++i)
m_free.push(new Worker(this));
}

void addJob(Job * const p_job)
{
p_job->activate();
getWorker()->assign(p_job);
}

private:

class Worker : public Object < Worker, Thread >, public LinkBase < Worker >
{
public:

Worker(JobManager * const p_scheduler) : m_scheduler(p_scheduler), m_active(true), m_Job(nullptr)
{
start();
}

virtual ~Worker()
{
terminate();
join();
}

void assign(Job * const p_Job)
{
ScopedMutex lock(&m_mutex);
m_Job = p_Job;
m_assigned.signal();
}

private:

void run() override
{
ScopedMutex lock(&m_mutex);
while(m_active)
{
while(m_active && !m_Job)
m_assigned.wait(&m_mutex);
if(m_Job)
{
m_Job->run();
m_Job->deactivate();
m_Job = nullptr;
m_scheduler->freeWorker(this);
}
}
}

void terminate()
{
ScopedMutex lock(&m_mutex);
m_active = false;
m_assigned.signal();
}

JobManager * const m_scheduler;
Mutex m_mutex;
bool m_active;
Job * m_Job;
ConditionVariable m_assigned;
};

Worker * const getWorker()
{
ScopedMutex lock(&m_mutex);
while(!m_free.count())
m_workerFree.wait(&m_mutex);
return m_free.pop();
}

void freeWorker(Worker * const p_worker)
{
ScopedMutex lock(&m_mutex);
m_free.push(p_worker);
m_workerFree.signal();
}

Mutex m_mutex;
List < Worker > m_free;
ConditionVariable m_workerFree;
};


I am using pthread win32 inside the wrapper classes and with 8 worker threads, I can only queue 2x 16ms jobs in a 33ms frame on a 8 cores computer. I was not expecting to see 8x 16ms in 33ms, but I have the feeling something is wrong here.
Advertisement
There's a lot of potential for deadlocks and starvation in this code. You're also holding a mutex in run() for the duration of the run() call, which means that another thread can never call terminate() successfully (it will never acquire the mutex to set m_active to false) nor can you call assign() to put a new task on the thread's queue. Essentially you have a really complicated and fragile serialization system which ensures that you will never run more than one worker thread at a time - which aligns with the number and length of jobs you described.

Unfortunately that is by no means a complete list of possible ways this code could go wrong. Multithreading is extremely tough, and you're generally best served using someone else's proven solution for things like job queues instead of trying to roll your own, especially when just starting out.

Don't take it personally, though! Again this is a hard thing to get right, and most programmers never actually learn how to do it properly. Even those with a deep understanding of concurrency will often make subtle and nasty mistakes.

Wielder of the Sacred Wands
[Work - ArenaNet] [Epoch Language] [Scribblings]

I haven't looked too closely at the code, but the amount of mutex locking in there makes me imagine there's probably a lot of parts being forced into running in serial.

Things like Mutex and ConditionVariable are often implemented in terms of the OS's scheduler, which often runs at a frequency of about 15ms...

I'd say the most straightforward to implement is a central shared queue, and have the workers pick items from the queue:thread_safe_queue jobs;

void Worker::run( Pool& pool )
{
while( !pool.quit )
{
Job* job;
if( pool.queue.pop( job ) )//lock/unlock a mutex (etc) internally
job->Run();
else
Sleep(0);
}
}
The easiest way to make a "thread_safe_queue" to to wrap an existing queue structure in a mutex, but later on you can replace it with a high-performance lock-free queue if required.

Or my personal preference is to make a list of jobs in advance, and then run every job on every thread, meaning you don't need any locks to read from the job queue because the work to be done by each thread is completely predictable:vector<Job*> jobs;

//in serial on one thread:
Job myJob
atomic( myJob.done = 0 )
jobs->push_back( &myJob )

//then on each worker thread:
for each job in jobs
job->run( worker.index, pool.numWorkers )
atomic( job->done++ )

bool Job::Done(pool) { return this.done == pool.numWorkers; }

void exampleJob( threadIndex, numThreads )
count = numItems / numThreads
begin = count * threadIndex
for i=begin; i!=begin+count; ++i
items->update()

void exampleSingleThreadedJob( threadIndex, numThreads )
if threadIndex != 0
return
update()

This topic is closed to new replies.

Advertisement