Sign in to follow this  

(C#) how to make this concurrent without destroying performance (now with code!)

This topic is 2785 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I have 1 callback being called from a thread on a regular basis, it specifies a buffer size that must be filled. This passes to the Renderer, which should just spit out a buffer. (We're talking, approx 300-1000 doubles) in order to calculate the buffer, I have to run all "elements" tick procedure, and then if the element is marked as "final" it needs to add its value to the current cell in the array. after all elements have done this, the index is advanced, and we loop through for the size of the buffer. --- the element output value itself is double buffered, and there's no locks or anything (Tick() will always be threadsafe) and so this seemed like a trivial thing to parallelize. Now, I don't know if it's because i'm dependant on the audio-driver or if that's causing any threading weirdness, but i've tried a few approaches and all of them seem to thrash performance. First, I tried manually creating Threads for each batch of work (split up across n cores). Overhead of creating the threads that frequently was what I assumed killed the performance of this one. So I tried seperate Threads created once, which should just wait on the work to be received; but this also shattered performance. I was wondering if this was related to using the spinwait method from Thread in between receiving batches of work. ThreadPool was what I tried next, for each element, Queuing a work item for its tick procedure, and using a WaitHandle.WaitAll() on all of them, prior to restarting its index. This didn't perform well either. I noticed that my program was spawning lots of threads and using 100% cpu consistently, although before i made it 'concurrent' it was maxing out at 100% on a single core. Also, the data generated (audio) was now all stuttery as a result of it not being able to keep up with filling the buffer at the rate the hardware wanted. I figured, rather than have lots of threads, I'd limit ThreadPool to one thread per processor, but didn't get anywhere with this. Finally, I tried making each Element have a single WorkItem in the threadpool, roughly this design:

[Work item   #1]                                  

loop
{
   -Wait for signal to advance
   -Element.Tick()
   -if (Element Final) results[index]=Element.Emit();
   -Signal done
}
["        " ...]  //same as above for N elements
["        " ...]
["        " ..N]


[Synchronizing Thread]
- Create all work items
loop
{
    Reset all WorkItem signals
    Signal that it is okay to advance
    Wait for all WorkItems to signal done
    Advance index value
}

I'm sure there's something wrong with how i'm solving this problem. So to sum up: we have N elements, which can tick in parallel, but each one must only tick once. After all elements have ticked, a mutually shared index value must be incrememented. After the index reaches the buffer size, all processing should stop. And it has to happen at a very fast rate, as determined by hardware that is emitting the callback for the buffer to be filled. Sorry if this was too long, I'm just stuck and could use some help!! Thanks for any who take the time to read! :) If you have any pointers regarding which datatypes for signalling, thread management etc. that I should be using in this scenario, that would also be greatly appreciated! [Edited by - djz on April 29, 2010 6:09:49 PM]

Share this post


Link to post
Share on other sites
There is a general tendency to believe that concurrency is always going to improve performance. Concurrency is not free. Performance improvement is going to be true in two cases only:

- There are enough available "cores". With the current hardware, where two cores is the norm, this is pretty difficult to be true.

- There are waiting times. It is there where you can get advantage of using concurrency. When one thread is waiting for data from maybe a network, UI device, HD or whatever, other threads can use cpu time instead of waste that time doing nothing.

It seems that in your case none of the above apply, so in order to get a real improvement in performance you will have to take a different approach.

By the way, have you "profiled" your code in order to discover real bottlenecks?

Share this post


Link to post
Share on other sites
One other thing to watch out for is that is that if two threads are updating the same cache line then you'll get a lot of thrashing as writes from one CPU evict things from the cache on the other one. A cache line is usually 64 or 128 bytes depending on the CPU.

You should also make sure each item of work is big enough that the overhead of setting up and retrieving the work item isn't higher than doing the work itself.

Share this post


Link to post
Share on other sites
Quote:
Original post by ppgamedev
There is a general tendency to believe that concurrency is always going to improve performance. Concurrency is not free. Performance improvement is going to be true in two cases only:

- There are enough available "cores". With the current hardware, where two cores is the norm, this is pretty difficult to be true.

- There are waiting times. It is there where you can get advantage of using concurrency. When one thread is waiting for data from maybe a network, UI device, HD or whatever, other threads can use cpu time instead of waste that time doing nothing.

It seems that in your case none of the above apply, so in order to get a real improvement in performance you will have to take a different approach.

By the way, have you "profiled" your code in order to discover real bottlenecks?


Yes, I have profiled the non-concurrent model extensively; it is quite efficient. As far as cache trashing goes, all of the elements are light-weight objects; mostly doing some arithmetic, but in some cases do have relatively expensive computations to perform; so one element might run for 4x as long as another element in order to compute the next state. Very rarely, an object might have a buffer that it maintains internally that it has to reference, normally only a few kb of data but sometimes it is large - that's its own problem as far as caching goes, but I can deal with that later. Number of cores isn't a huge consideration; I am targeting a minimum 4 core machine.

I am surprised I'm running into problems, since each element only depends on the previous state of all the other elements; the only "synchronization" that is required is to make sure that after each Element ticks it does not tick again, until all of the other elements have ticked.

Your point about waiting times gave me some ideas that I'll have to try out and see how they measure up. Maybe doing some precalculating while awaiting the next buffer request from the hardware.

Share this post


Link to post
Share on other sites

class ThreadedBufferFill
{
const int BufferSize = 1000;
double[] buffer = new double[BufferSize];
class WorkWrapper
{
public List<CalcObj> CalcObjs { get; set; }
public ManualResetEvent TickSignal {get;set;}
public ManualResetEvent[] AllTickSignals { get; set; }
public ManualResetEvent FinalSignal { get; set; }
public int MaxValue { get; set; }
public double[] Result { get; set; }
}

void Calcthread(object o)
{
int i = 0;
WorkWrapper Work = (WorkWrapper)o;
Work.Result = new double[BufferSize];
while (i < Work.MaxValue)
{
Work.TickSignal.Reset();
foreach (CalcObj CalcObj in Work.CalcObjs)
{
CalcObj.Tick();
Work.Result[i] += CalcObj.Emit();
}
Work.TickSignal.Set();
WaitHandle.WaitAll(Work.AllTickSignals);
i++;
}
Work.FinalSignal.Set();
}

ManualResetEvent[] AllTickSignals = new ManualResetEvent[2];
ManualResetEvent[] AllFinalSignals = new ManualResetEvent[2];

public void FillBuffer()
{
AllTickSignals[0] = new ManualResetEvent(false);
AllTickSignals[1] = new ManualResetEvent(false);
AllFinalSignals[0] = new ManualResetEvent(false);
AllFinalSignals[1] = new ManualResetEvent(false);
List<CalcObj> CalcObjs0 = new List<CalcObj>();
List<CalcObj> CalcObjs1 = new List<CalcObj>();

for (int i = 0; i < 100; ++i)
{
CalcObjs0.Add(new CalcObj());
CalcObjs1.Add(new CalcObj());
}

WorkWrapper Work1 = new WorkWrapper()
{
CalcObjs = CalcObjs0,
TickSignal = AllTickSignals[0],
AllTickSignals = AllTickSignals,
FinalSignal = AllFinalSignals[0],
MaxValue = BufferSize
};

WorkWrapper Work2 = new WorkWrapper()
{
CalcObjs = CalcObjs1,
TickSignal = AllTickSignals[1],
AllTickSignals = AllTickSignals,
FinalSignal = AllFinalSignals[1],
MaxValue = BufferSize
};

ThreadPool.QueueUserWorkItem(new WaitCallback(Calcthread), Work1);
ThreadPool.QueueUserWorkItem(new WaitCallback(Calcthread), Work2);
WaitHandle.WaitAll(AllFinalSignals);

for (int i = 0; i < BufferSize; ++i )
{
buffer[i] = Work1.Result[i] + Work2.Result[i];
}
System.Console.WriteLine("Complete!");
System.Console.ReadKey();
}
}





I wrote this (away from my main source-base as somewhat in transit at the moment) as a prototype for what might work. Obviously this one is just targetting 2 cores but won't be hard to make work for N. Are there any problems with this I should be identifying?

edit...the main problem with the original algo i think was that the way the threads were balancing didn't work correctly in terms of signalling each other. I was trying to do too much with a single ManualResetEvent. The second attempt, of having 1 thread in threadpool per element had too much threading overhead.

This setup seems to perform okay but I only have a single core machine for testing for the next 24 hours before I can plug this into my existing codebase. What I am most concerned about is whether or not I am using ManualResetEvent correctly for this type of signalling.

[Edited by - djz on April 29, 2010 6:01:14 PM]

Share this post


Link to post
Share on other sites
Still not working right! I made the threads spit out some debug information to the console, and while I'd expect to see:


Worker 1: Calculating
Worker 2: Calculating
Worker 1: Calculating
Worker 2: Calculating


I'm actually seeing

Worker 1: Calculating
Worker 1: Calculating
Worker 2: Calculating
Worker 1: Calculating
Worker 2: Calculating
Worker 2: Calculating
Worker 2: Calculating
Worker 1: Calculating
Worker 1: Calculating


So obviously my understanding of the signalling events is not correct. Grrrr.....

Share this post


Link to post
Share on other sites
Quote:
Original post by djz
I have 1 callback being called from a thread on a regular basis, it specifies a buffer size that must be filled. This passes to the Renderer, which should just spit out a buffer. (We're talking, approx 300-1000 doubles)

in order to calculate the buffer, I have to run all "elements" tick procedure, and then if the element is marked as "final" it needs to add its value to the current cell in the array.

after all elements have done this, the index is advanced, and we loop through for the size of the buffer.


Can you post the source to correct, single threaded version of the above?

Share this post


Link to post
Share on other sites
Quote:
Original post by Antheus
Quote:
Original post by djz
I have 1 callback being called from a thread on a regular basis, it specifies a buffer size that must be filled. This passes to the Renderer, which should just spit out a buffer. (We're talking, approx 300-1000 doubles)

in order to calculate the buffer, I have to run all "elements" tick procedure, and then if the element is marked as "final" it needs to add its value to the current cell in the array.

after all elements have done this, the index is advanced, and we loop through for the size of the buffer.


Can you post the source to correct, single threaded version of the above?


This is from memory, so there might be a syntax error or something - but I've proofread a couple times and it looks right:


public class RealTimeRenderer
{
public List<IElement> Elements { get;set; }
public double[] GetBuffer(int num_samples)
{
//If the client hasn't specified a list of Elements, return an
//empty buffer
if(Elements == null) return new double[num_samples];


double[] ret = new double[num_samples];

for(int i=0; i<num_samples; ++i)
{
foreach(IElement Element in Elements)
{
Element.ClearFrame(); // this moves the previously calculated
// CurrentValue to the BufferedValue
// that is actually returned by Element.Emit()
// so that when Elements Tick() they are
// reading the previous state
}
foreach(IElement Element in Elements)
{
Element.Tick(); //this may call other Element's Emit() method
if(Element.Final) ret[i] += Element.Emit();
}
}
return ret;
}
}


Pretty basic, as you can see.

Load balancing I can add later (i.e. have each Element record its CPU time, and then give 4 "heavy" Elements to one thread, and 16 "light" Elements to another thread) , but for now all I need to get working is one thread per core, each performing


foreach(IElement Element in Elements)
{
Element.Tick();
if(Element.Final) ret[i] += Element.Emit();
}


And not performing that loop again, until all elements have ticked and the buffer index advanced. Right now I'm reading through this trying to figure exactly how to signal this properly, but I'm getting stumped.

Share this post


Link to post
Share on other sites
Quote:
Right now I'm reading through this trying to figure exactly how to signal this properly, but I'm getting stumped.

That is no surprise, since that is a typical reference manual which explains the tools, it doesn't deal with distributing the work.

Quote:
This is from memory, so there might be a syntax error or something - but I've proofread a couple times and it looks right:


Let there be n entries in Elements.

The code then becomes:
double[][] ret = new double[n][num_samples];
for (int i = 0; i < num_samples; i++) {

for (int e = 0; e < n; e++) {
Elements.get(e).ClearFrame();
}
for (int e = 0; e < n; e++) {
Element el = Elements.get(e);
el.Tick();
if (el.Final) ret[e][i] += el.Emit(); // no conflict, (e,i) is unique
}
}


In above code, the second for loop (first one is trivial) would now be replaced with Parallel.For (which is conveniently part of new version of .Net.


A concurrent version would then look like this:
void threadFunc(...) {
// input parameters
// - left, start index
// - right, end index
// - i, current iteration
for (int e = left; e < right; e++) {
Element el = Elements.get(e);
el.Tick();
if (el.Final) ret[e][i] += el.Emit(); // no conflict, (e,i) is unique
}
// Signal completion here if doing manually, do nothing if using Parallel.For
}



However, to get most benefit from concurrency, it would be the actual state (stuff hidden inside ClearFrame(), Tick() and Emit()) that would be replicated. So there would be num_samples copies of that state.

This would then effectively result in a matrix, which would allow a single for loop without blocking, meaning that only one wait is needed, rather than num_samples of synchronizations.

Share this post


Link to post
Share on other sites
Oh man, that's the ticket. Took me a few readovers because I didn't think it would work initially, but I see now exactly why it works. Antheus, you seem to be especially knowledgable in regards to concurrency, you've helped me a couple times now.

Are there any resources you would recommend specifically on the topic of concurrency?

Share this post


Link to post
Share on other sites

This topic is 2785 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this