# Code Review Request

This topic is 2754 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

## Recommended Posts

I have a message queue class meant to provide threadsafe in-memory communication between a set number of modules. I've passed it around the office for a review, but with things like this it is important to get as many eyes as possible. Everything seems kosher, and we haven't been able to break it via automated testing. The code not included is Module which is an enum for the various logical pieces of the app, and Message which is a struct that contains the TargetModule for the message, a message name, and a dictionary of parameters/data.

Thanks for taking a look.

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Collections.Concurrent; using System.Threading; namespace NameChangedToProtectTheInnocent { public class MessagePipeline { private ConcurrentDictionary<Module, ConcurrentQueue<Message>> pipeline; private ConcurrentDictionary<Message, Bucket<dynamic>> resultBlockage; public MessagePipeline() { int count = Enum.GetValues(typeof(Module)).Length; pipeline = new ConcurrentDictionary<Module, ConcurrentQueue<Message>>(count, count); resultBlockage = new ConcurrentDictionary<Message, Bucket<dynamic>>(); } public void Add(Message newbie) { pipeline.GetOrAdd(newbie.TargetModule, m => new ConcurrentQueue<Message>()).Enqueue(newbie); } public Message? Dequeue(Module target) { Message result; if (pipeline.GetOrAdd(target, m => new ConcurrentQueue<Message>()).TryDequeue(out result)) { return (result); } return (null); } // TODO: configurable default timeout. public T Fetch<T>(Message msg, Func<T> onTimeout = null, int? timeout = null) { if (!timeout.HasValue) { timeout = (int)TimeSpan.FromSeconds(180).TotalMilliseconds; } if (onTimeout == null) { onTimeout = () => default(T); } bool newbie = false; var bucket = resultBlockage.GetOrAdd(msg, m => { newbie = true; return (new Bucket<dynamic>()); }); try { bucket.Barrier.AddParticipant(); if (newbie) { this.Add(msg); } if (!bucket.Barrier.SignalAndWait(timeout.Value)) { resultBlockage.TryRemove(msg, out bucket); return (onTimeout()); } // if this is the incorrect type, let it toss. return (bucket.Value); } finally { bucket.Barrier.Dispose(); } } public T Fetch<T>(Message msg, Func<T> onTimeout, TimeSpan timeout) { return Fetch<T>(msg, onTimeout, (int)timeout.TotalMilliseconds); } public void SendResult(Message msg, dynamic value) { Bucket<dynamic> bucket; if (resultBlockage.TryRemove(msg, out bucket)) { bucket.Value = value; bucket.Barrier.RemoveParticipant(); } } } internal class Bucket<T> { private T storage = default(T); public readonly Barrier Barrier = new Barrier(1); public T Value { get { return (storage); } set { storage = value; } } } } 

##### Share on other sites
I'll try and get the ball rolling a bit...

1. I don't get the use of Modules in there. For my taste, it appears that your class has one too many responsibilities. For example, why not simply give each module a MessageQueue which is independent of the Module class? In the current system I'd say that you've introduced unnecessary contention by insisting on everything going through the 'pipeline' ConcurrentDictionary member. Also, what if you want to send a message to something other than a Module? There might be a good reason for this design that I'm not seeing, however.

2. So is it a pipeline or a message queue? These typically mean different things, but you seem to me to be conflating the terms here.

3. Zero documentation. What are the semantics you're aiming for with each method? Nobody can check you've got things right if we don't know what 'right' is supposed to be.

FWIW, I'm not at all clued up on C#, so may not be be able to give much further feedback. But because of the above points, it makes it hard for anyone to review in any concrete fashion. This *might* go some way to explaining the dearth of responses.

##### Share on other sites
Woo, feedback!

1. Sure, this might be better broken out into two classes. The initial motivation in not making multiples is that A - how would you lookup which queue to put the message into? The piece that pulls/processes the message will change (test apps might mock some of the pieces for example). To provide that sort of resolution you'd end up with something like this I'd think. and B - the initial use case is for new code. I have a decent idea what modules will exist, but I also know that it will change with requirements. And I know that modules will often talk with multiple others. At the time it seemed like it would be far less trouble to have one reference that could be passed into all the modules and not deal with the refactoring for new modules/references needed than the potential trouble of having stuff be able to send/get messages to/from any module.

2. It's a bunch of queues technically so that name doesn't apply... couldn't think of a better name. And perhaps a little of #1 causing that.

3. I'd hoped that was more clear, but yes, some of the naming is not fantastic.

In general, there's two modes of communication this thing provides. One where you don't care about a result and one where you do.

Add and Dequeue provide the functionality for when you don't care. Add puts a message on the queue that the message targets and returns. Dequeue returns the top message off the requested queue (null if none). If nothing is on the other end pulling off messages, they'll pile up.

Fetch and SendResult provide the functionality for when you do care about the result. Fetch puts a message on the queue that the message targets and blocks. The code that is pulling messages off of that queue via Dequeue is expected to call SendResult (using the instigating message as the parameter). Any blocking Fetches will then wake up; grab the result that was sent and return it. Fetch has a timeout for cases when the other thread takes too long to respond. If the timeout is triggered, the optional Func is called (or default(T) if none).

...and most importantly, all of the operations should be threadsafe as seen by consuming code.

##### Share on other sites

Woo, feedback!

1. Sure, this might be better broken out into two classes. The initial motivation in not making multiples is that A - how would you lookup which queue to put the message into?

The same way you know which Module constant to use as the "key". What am I missing here?

B - the initial use case is for new code. I have a decent idea what modules will exist, but I also know that it will change with requirements. And I know that modules will often talk with multiple others. At the time it seemed like it would be far less trouble to have one reference that could be passed into all the modules and not deal with the refactoring for new modules/references needed than the potential trouble of having stuff be able to send/get messages to/from any module.
[/quote]
I just don't understand why you need this Module concept at all. If it was used in a game for example, you might have physicsSolver.getMessageQueue(), animator.getMessageQueue(), or whatever where physicsSolver and animator are objects or actual C# modules. What's the advantage of using enums instead? Seems like you're restricting the usage of the class for no reason.

2. It's a bunch of queues technically so that name doesn't apply... couldn't think of a better name. And perhaps a little of #1 causing that.
[/quote]
IMHO, it really muddies the intent of the code. But I'd encourage you not to use pipeline because as I said, that's really something else.

3. [...]

Fetch and SendResult provide the functionality for when you do care about the result. Fetch puts a message on the queue that the message targets and blocks.
[/quote]

In my experience, this can cause trouble e.g. deadlock. It's often better to send a message back as a response. I don't understand your intended usage so you might be fine, but take care.

Example: Thread 1 sends a message to thread 2 via a queue. Thread 1 is blocking, waiting for a result. In processing the message, thread 2 calls some opaque callback that does who-knows-what, but quite possible tries to send a blocking message back to thread 1. So thread 1 is now stuck waiting on thread 2 and vice versa. (Replace "thread" with "subsystem" or "module" if it makes more sense to your particular implementation -- the problem remains).

The code that is pulling messages off of that queue via Dequeue is expected to call SendResult (using the instigating message as the parameter). Any blocking Fetches will then wake up; grab the result that was sent and return it.
[/quote]
Relying on the other Module/thread/system to do something is again something that can cause the system to lock-up if protocol isn't strictly obeyed. What should happen if a thread throws an exception before it manages to call SendResult? Expecting developers to be *really careful* is one answer, of course, but it makes the system harder to use. There might be some nifty way of using C#'s "using" statement which would send the exception back to the thread that enqueued the message.

Incidentally, what you're doing here is reinventing something close to the existing concept of a "future" or "promise". I'd be a little surprised if C# didn't already provide something like that.

...and most importantly, all of the operations should be threadsafe as seen by consuming code.
[/quote]
It's a good idea to be more specific about what you mean by "threadsafe".

1. Do you mean in the sense that multiple MessagePipelines can be used at once, but by different threads?
2. Or that one thread can be enqueuing messages while another is dequeuing?
3. Or that you can have an arbitrary number of threads doing either enqueue or dequeue operations?

##### Share on other sites

Add and Dequeue provide the functionality for when you don't care. Add puts a message on the queue that the message targets and returns. Dequeue returns the top message off the requested queue (null if none). If nothing is on the other end pulling off messages, they'll pile up.

Names should form meaningful pairs: Add/Remove, Enqueue/Dequeue, Push/Pop.

A non-trivial Add should have ability to notify that queue is full or cannot accept messages. "They'll pile up" is really bad for such a system. Due to strange scheduling, one thread can overwhealm the consumers, causing a death spiral by producing more than consumers can handle in given time.

In asynchronous systems this typically results in cascading failure. Blocking doesn't solve this either, it's prone to same failure, but at least allows for submissions to be limited proactively, rather than after the system failed.

Fetch and SendResult provide the functionality for when you do care about the result. Fetch puts a message on the queue that the message targets and blocks. The code that is pulling messages off of that queue via Dequeue is expected to call SendResult (using the instigating message as the parameter).[/quote]
This is known as "Future", but I had no clue from reading the code. It's an established pattern, the implementation should mimic it. Or just use the built-in Future, which is apparently called Task these days.

##### Share on other sites

[quote name='Telastyn' timestamp='1299787802' post='4784113']
Woo, feedback!

1. Sure, this might be better broken out into two classes. The initial motivation in not making multiples is that A - how would you lookup which queue to put the message into?

The same way you know which Module constant to use as the "key". What am I missing here?

*snip*
I just don't understand why you need this Module concept at all. If it was used in a game for example, you might have physicsSolver.getMessageQueue(), animator.getMessageQueue(), or whatever where physicsSolver and animator are objects or actual C# modules. What's the advantage of using enums instead? Seems like you're restricting the usage of the class for no reason.
[/quote]

*nod* maybe so. During the initial design discussion, it was not clear if the modules would be able to have references to the others. physicsSolver (and its kin) would need to be IoC'd into place. At least for prototyping, that seemed like overkill. It's still not entirely clear, but it looks like they will be able to if needed.

3. [...]

Fetch and SendResult provide the functionality for when you do care about the result. Fetch puts a message on the queue that the message targets and blocks.
[/quote]

In my experience, this can cause trouble e.g. deadlock. It's often better to send a message back as a response. I don't understand your intended usage so you might be fine, but take care.
[/quote]

Absolutely, deadlocks are a concern. The dataflow (and the size of the app) make this relatively unlikely, but the timeout mechanisms are there as a failsafe to prevent the entire thing from locking up permanently. I'll have to think about it, but I'm inclined to think that the logic to re-start/pause work while something is waiting for a message would be fairly onerous. The blocking mechanism seems like it would make the consuming code cleaner.

The code that is pulling messages off of that queue via Dequeue is expected to call SendResult (using the instigating message as the parameter). Any blocking Fetches will then wake up; grab the result that was sent and return it.
[/quote]
Relying on the other Module/thread/system to do something is again something that can cause the system to lock-up if protocol isn't strictly obeyed. What should happen if a thread throws an exception before it manages to call SendResult?
[/quote]

The timeout will get hit. The system is likely to have a number of normal operational timeouts (network outages, delays due to load) so should deal with those cases gracefully.

Incidentally, what you're doing here is reinventing something close to the existing concept of a "future" or "promise". I'd be a little surprised if C# didn't already provide something like that.
[/quote]

There was a future class in the 4.0 CTP, but iirc it got removed. This seems like it might be a solid use case for the await stuff in C# 5, but...

[edit: apparently it got renamed; though I'll need to research how to integrate that when the callsite doesn't know/care what is being invoked]

It's a good idea to be more specific about what you mean by "threadsafe".

1. Do you mean in the sense that multiple MessagePipelines can be used at once, but by different threads?
2. Or that one thread can be enqueuing messages while another is dequeuing?
3. Or that you can have an arbitrary number of threads doing either enqueue or dequeue operations?
[/quote]

3.

Assuming Message is readonly, arbitrary number of threads should be able to call any methods on an arbitrary number of instances without data corruption, data loss, or permanent deadlock - without explicit locks.

1. 1
2. 2
3. 3
Rutin
18
4. 4
JoeJ
14
5. 5

• 14
• 10
• 23
• 9
• 32
• ### Who's Online (See full list)

There are no registered users currently online

×