How to do Ordered Multithreading: 2 or more threads executing 1 same function.

Started by
24 comments, last by tom_mai78101 11 years, 6 months ago
An interesting thing is to take the program and wrap it in a reasonably long loop (I've removed the sleep and the output to better illustrate the point):

public class Test {
Thread t1;
Thread t2;
Thread t3;
List<Integer> array = new ArrayList<Integer>();

Runnable run = new Runnable() {
@Override
public void run() {
while (array.size() < 50) {
synchronized (array) {
try {
array.wait();
}
catch (InterruptedException e) {
}
array.add(Thread.currentThread().hashCode());
// System.out.println(Thread.currentThread().getName());
}
}
}
};

Runnable wait = new Runnable() {
@Override
public void run() {
// Random r = new Random();
while (array.size() < 50) {
synchronized (array) {
/*
try {
Thread.sleep(r.nextInt(200) + 100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
*/
array.notify();
}
}
}
};

public Test() {
t1 = new Thread(run);
t1.setName("Thread 1 - First Thread");
t2 = new Thread(run);
t2.setName("Thread 2 - Last Thread");
t3 = new Thread(wait);
}

public void start() {
t1.start();
t2.start();
t3.start();
}

public void join() throws InterruptedException {
final long ONE_MINUTE = 1000 * 60;
t1.join(ONE_MINUTE);
t2.join(ONE_MINUTE);
t3.join(ONE_MINUTE);
}

public static void main(String[] ar) throws InterruptedException {
for(int i = 0 ; i < 1000 ; ++i) {
Test t = new Test();
t.start();
t.join();
int count = Thread.activeCount();
System.out.println("Run " + i + ": elements " + t.array.size() + " outstanding threads: " + count);
}
System.out.println("Done");
}
}

On my system, running this program usually results in at least one thread hanging after about 200 - 250 iterations (sometimes more, sometimes less). It is possible that around this point the JVM has decided to do some more advanced optimisations after monitoring the code for some time. Another interesting thing to note is that even when finished running, the list sometimes contains 50 elements, other times 51.

These are the concurrent bugs in your program that I can see:

  • It contains a data race - threads read the size() of the array without holding the lock.

  • Use of wait()/notify() without a logical condition associated with it.

  • You are not calling Object.wait() in a loop. This means that the code is sensitive to spurious or delayed wakeups.

  • You are using Object.notify() rather than Object.notifyAll(). Notifying a single thread is advanced.


Here is one situation where your program could hang. Imagine that array.size() is 49. Both "run" threads are waiting on the condition variable. The "wait" thread is just after acquiring the lock, it then calls notify(), releases the lock and gets context switched out. The thread that was signalled is now runnable and will eventually get context switched in. It re-acquires the lock on exiting wait(), and adds an element to the array and releases its lock. During its next iteration of the loop, the array is size 50 so it ends. The system (eventually) context switches back to the "wait" thread, which also may notice that the size is 50. If it does, it will also end. There still is a thread waiting for a signal that will never arrive. Other times, the "wait" thread will not notice, it will enter the synchronized block (which guarantees that it's view of the events to become up to date), it notify()s again and finishes. In this case the third thread is signalled again and also finishes.

But these bugs aside, let us imagine you fix all that. The code behaves the way you want - in every way except performance. Due to the coarse nature of the locks, the code is actually slower than the equivalent straight line code in a single thread.

Performance aside - how would we fix the correctness of your program? Well, the first thing I want to do is define what I believe the program should do. My decision is that it should populate a list of 50 elements with the identifiers of the threads, such that no two identifiers are adjacent in the final array. Once we understand that, we can then consider what our wait/notify condition should be. As a given thread, we need to wait when last element in the array is our identifier. This implies we don't wait when the array is empty. We also don't wait when the array is "full" - when our limit of 50 has been reached.

Once we have waited for this condition to be met, our thread can act. Because we have two competing conditions we don't know which of them caused us to exit the wait loop. If it was the array becoming full, we must end. Otherwise, we can add our identifier to the array. Finally, we notify the waiters on the condition variable.

This is such a program, which I believe to be correct:

public class Test {
private static final int Max = 50;
private final Thread t1;
private final Thread t2;
private final List<Long> array = new ArrayList<Long>();

public Test() {
Runnable run = new Runnable() {
@Override
public void run() {
final Long me = Long.valueOf(Thread.currentThread().getId());
boolean running = true;
while (running) {
synchronized (array) {
while(array.size() < Max &amp;&amp; !array.isEmpty() &amp;&amp; array.get(array.size() - 1).equals(me)) {
try {
array.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

if(array.size() < Max) {
array.add(me);
} else {
running = false;
}
array.notifyAll();
}
}
}
};

t1 = new Thread(run);
t1.setName("Thread 1 - First Thread");
t2 = new Thread(run);
t2.setName("Thread 2 - Last Thread");
}

public void start() {
t1.start();
t2.start();
}

public void join() throws InterruptedException {
final long ONE_MINUTE = 1000 * 60;
t1.join(ONE_MINUTE);
t2.join(ONE_MINUTE);
}

public boolean check() {
synchronized (array) {
Long previous = null;
for(Long current : array) {
if(previous != null &amp;&amp; previous.equals(current)) {
return false;
}
previous = current;
}
return array.size() == Max;
}
}

public static void main(String[] ar) throws InterruptedException {
for(int i = 0 ; i < 1000 ; ++i) {
Test t = new Test();
t.start();
t.join();
if(!t.check()) {
throw new IllegalStateException("Bad check!");
}
int count = Thread.activeCount();
System.out.println("Outstanding threads: " + count);
}
System.out.println("Done");
}
}


Note that the runnable code supports an arbitrary number of threads - provided there are at least two (it will deadlock if there is a single thread as the wait condition of the last entry not being its own identifier will never change):

public class Test {
private static final int Max = 50;
private final List<Thread> threads = new ArrayList<Thread>();
private final List<Long> array = new ArrayList<Long>();

public Test() {
Runnable run = new Runnable() {
@Override
public void run() {
final Long me = Long.valueOf(Thread.currentThread().getId());
boolean running = true;
while (running) {
synchronized (array) {
while(array.size() < Max &amp;&amp; !array.isEmpty() &amp;&amp; array.get(array.size() - 1).equals(me)) {
try {
array.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

if(array.size() < Max) {
array.add(me);
} else {
running = false;
}
array.notifyAll();
}
}
}
};

int numThreads = new Random().nextInt(42) + 2;
for(int i = 0 ; i < numThreads ; ++i) {
threads.add(new Thread(run));
}
}

public void start() {
for(Thread thread : threads) {
thread.start();
}
}

public void join() throws InterruptedException {
final long ONE_MINUTE = 1000 * 60;
for(Thread thread : threads) {
thread.join(ONE_MINUTE);
}
}

public boolean check() {
synchronized (array) {
Long previous = null;
for(Long current : array) {
if(previous != null &amp;&amp; previous.equals(current)) {
return false;
}
previous = current;
}
return array.size() == Max;
}
}

public static void main(String[] ar) throws InterruptedException {
for(int i = 0 ; i < 1000 ; ++i) {
Test t = new Test();
t.start();
t.join();
if(!t.check()) {
throw new IllegalStateException("Bad check!");
}
int count = Thread.activeCount();
System.out.println("Outstanding threads: " + count);
}
System.out.println("Done");
}
}

Note that this program does not cause each thread to be interleaved in-order. That is, you could get interleavings like:

ABACBACAB
[/quote]
Advertisement
For comparison, the performance difference between a naive serial version of that program and the given parallel one is two orders of magnitude. I only included a "fixed" version for reference purposes, not because I think it is a good solution to your problem.

the performance difference between a naive serial version of that program and the given parallel one is two orders of magnitude

I guess that the above means that the serial version is around a hundred times faster. Am I right?
Actually, I was incorrect, the performance difference is actually just over 50 times faster.

This is a simple program running 100,000 iterations for 2 candidate entries (i.e. 2 threads in the parallel version), filling a 50 element array 3 times each. The serial logic is like this:

public static Long run(Long [] candidates) {
List<Long> array = new ArrayList<Long>();
Random random = new Random();
while(array.size() < Common.Max) {
int index = random.nextInt(candidates.length);
Long candidate = candidates[index];
if(array.isEmpty() || !candidate.equals(array.get(array.size() - 1))) {
array.add(candidate);
}
}
Common.check(array);
return array.get(random.nextInt(array.size()));
}

The return value is used to prevent the compiler from optimising everything away. The parallel implementation was modified to behave similarly.

The driver program looks like this:

public class Profiler
{
private static long profile(String name, Callable<Long> callable) throws Exception {
long accumulator = 0;
long timer = 0;
for(int i = 0 ; i < 1000 ; ++i) {
long start = System.nanoTime();
Long value = callable.call();
accumulator += value.longValue();
long end = System.nanoTime();
long duration = end - start;
timer += duration;
}
System.out.println(name + " duration: " + timer + " nanoseconds");
System.out.println(name + " accumulator: " + accumulator);
return timer;
}

public static void main(String [] args) throws Exception
{
final Long [] candidates = { Long.valueOf(42), Long.valueOf(13) };

for(int i = 0 ; i < 3 ; ++i) {
long a = profile("Serial", new Callable<Long>() {
@Override
public Long call() throws Exception {
return Serial.run(candidates);
}
});

long b = profile("Parallel", new Callable<Long>() {
@Override
public Long call() throws Exception {
return Parallel.run(candidates);
}
});

if(a > b) {
System.out.println(a / (double)b);
} else {
System.out.println(b / (double)a);
}
}
}
}

Increasing the array size to 10,000 (and reducing the number of iterations) makes the performance difference about 40% faster. Obviously the thread creation / joining times skew the result much more heavily when the array is only 50 elements.

Of course, this is a very unrealistic scenario - the "multi-threaded" implementation is just as serial as the naive implementation - it would never be a candidate for production code. The purpose is to illustrate in actual numbers what the excessive communication overhead can cost you if you try to multi-thread a serial algorithm.

the performance difference is actually just over 50 times faster

Ok, just one order of magnitude.
But what I wanted to emphasize is that the serial version will be ALWAYS faster than the multi-threaded version - where in this particular scenario multi-threaded does not mean parallel ];)
Very interesting posts! Thanks!

From rip-off's post, I see that there's a Callable class, which reminded me that there's another implementation that may help me out on multithreading issues, and probably use.

I'm somewhat 50% sure that if I were to use a thread pool, my situation would get better, and possibly just let the thread pool itself handle the two threads intertwining together. If a thread pool is like a queue, and I design my threads to be "run once per call", and repeatedly place threads back into the queue, it's more likely to get alternating thread executions while the application is running.

This topic is closed to new replies.

Advertisement