Thanks for taking a look.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
namespace NameChangedToProtectTheInnocent
{
public class MessagePipeline
{
private ConcurrentDictionary<Module, ConcurrentQueue<Message>> pipeline;
private ConcurrentDictionary<Message, Bucket<dynamic>> resultBlockage;
public MessagePipeline()
{
int count = Enum.GetValues(typeof(Module)).Length;
pipeline = new ConcurrentDictionary<Module, ConcurrentQueue<Message>>(count, count);
resultBlockage = new ConcurrentDictionary<Message, Bucket<dynamic>>();
}
public void Add(Message newbie)
{
pipeline.GetOrAdd(newbie.TargetModule, m => new ConcurrentQueue<Message>()).Enqueue(newbie);
}
public Message? Dequeue(Module target)
{
Message result;
if (pipeline.GetOrAdd(target, m => new ConcurrentQueue<Message>()).TryDequeue(out result))
{
return (result);
}
return (null);
}
// TODO: configurable default timeout.
public T Fetch<T>(Message msg, Func<T> onTimeout = null, int? timeout = null)
{
if (!timeout.HasValue) { timeout = (int)TimeSpan.FromSeconds(180).TotalMilliseconds; }
if (onTimeout == null) { onTimeout = () => default(T); }
bool newbie = false;
var bucket = resultBlockage.GetOrAdd(msg, m => { newbie = true; return (new Bucket<dynamic>()); });
try
{
bucket.Barrier.AddParticipant();
if (newbie)
{
this.Add(msg);
}
if (!bucket.Barrier.SignalAndWait(timeout.Value))
{
resultBlockage.TryRemove(msg, out bucket);
return (onTimeout());
}
// if this is the incorrect type, let it toss.
return (bucket.Value);
}
finally
{
bucket.Barrier.Dispose();
}
}
public T Fetch<T>(Message msg, Func<T> onTimeout, TimeSpan timeout) { return Fetch<T>(msg, onTimeout, (int)timeout.TotalMilliseconds); }
public void SendResult(Message msg, dynamic value)
{
Bucket<dynamic> bucket;
if (resultBlockage.TryRemove(msg, out bucket))
{
bucket.Value = value;
bucket.Barrier.RemoveParticipant();
}
}
}
internal class Bucket<T>
{
private T storage = default(T);
public readonly Barrier Barrier = new Barrier(1);
public T Value
{
get
{
return (storage);
}
set
{
storage = value;
}
}
}
}