Sign in to follow this  
Lithic

[.net] Multithreaded Function Queue (Source Included)

Recommended Posts

Heres a multithreaded queue class I wrote. Very useful in game development. These are good for expensive operations to smooth out framerate. I also like to use one Queue for all my rendering because all render calls need to be on the same thread in OpenGL. Enjoy!
using System;
using System.Collections;
using System.Threading;

namespace Decimation
{
	/// <summary>
	/// Creates a multithreaded queue to run asynchronous operations on.
	/// </summary>
	public class MultithreadedQueue
	{
		private class NotificationQueue : Queue 
		{
			public event MultithreadedQueue.QueueFunctionHandler Queued;
			public override void Enqueue(object obj)
			{
				base.Enqueue (obj);
				Queued();
			}
		}

		private Thread _TheThread;
		private Queue _Handlers;
		public delegate void QueueFunctionHandler();
		private event QueueFunctionHandler _HandlerBinder;
		private bool _Enabled = false;
		private bool _Asleep = false;

		public MultithreadedQueue() 
		{
			NotificationQueue nQueue = new NotificationQueue();
			_TheThread = new Thread(new ThreadStart(ProcessQueue));
			nQueue.Queued += new QueueFunctionHandler(Handlers_Queued);
			_Handlers = Queue.Synchronized(nQueue);
		}

		public MultithreadedQueue(ThreadPriority priority) 
		{
			NotificationQueue nQueue = new NotificationQueue();
			_TheThread = new Thread(new ThreadStart(ProcessQueue));
			_TheThread.Priority = priority;
			nQueue.Queued += new QueueFunctionHandler(Handlers_Queued);
			_Handlers = Queue.Synchronized(nQueue);
		}

		~MultithreadedQueue() 
		{
			Stop();
		}

		// Begins execution
		public void Start() 
		{
			_Enabled = true;
			_TheThread.Start();
		}

		// Puts the thread to sleep and sets the asleep member
		private void Suspend() 
		{
			_TheThread.Suspend();
		}

		// Wakes the thread
		private void Resume() 
		{
			_TheThread.Resume();
		}

		// Stops the execution until start is called.
		public void Stop() 
		{
			_Enabled = false;
			if (IsSleeping)
				Resume();
		}

		// The queue's thread does runs through this loop
		private void ProcessQueue()
		{
			while (_Enabled) 
			{
				while (_Handlers.Count > 0) 
				{
					QueueFunctionHandler dh = _Handlers.Dequeue() as QueueFunctionHandler;
					_HandlerBinder += dh;
					_HandlerBinder();
					_HandlerBinder -= dh;
				}
				if (_Enabled)
					Suspend();
			}
		}

		// Returns the queue for addition or removal of functions
		public Queue Queue 
		{
			get 
			{
				return _Handlers;
			}
		}

		// Allows outside code to see if the execution thread is sleeping
		public bool IsSleeping 
		{
			get 
			{
				return _TheThread.ThreadState == ThreadState.Suspended;
			}
		}

		// This awakes the execution thread whenever the queue is no longer empty
		private void Handlers_Queued() 
		{
			if (_Asleep)
				Resume();
		}

		// This function will not return until the queue is cleared.
		public void Finish() 
		{
			if (IsSleeping) 
			{
				Queue.Enqueue(new QueueFunctionHandler(Thread.CurrentThread.Resume));
				Thread.CurrentThread.Suspend();
			}
		}
	}
}

Edit: Class was quite buggy, this appears to have rectified all problems, namely the Exceptions thrown on Interrupt() calls and a problem in the Finish method where both threads would simultaneously stop execution. Feel free to comment ways to make this class better. [Edited by - Lithic on November 12, 2004 4:40:54 PM]

Share this post


Link to post
Share on other sites
System.Threading.ThreadPool is quite different. Firstly, ThreadPool is static, one cannot define instances of it. Secondly, this class operates on a single thread that automatically suspends itself when it has no more functions queued. This class also allows other threads to wait on its completion. The threadpool automatically designates a thread to work on a part rather than allowing the user to choose a specific thread. OpenGL operations MUST be run on the thread that the OpenGL was created on to not give error (At least with the Tao Lib). The ThreadPool class is very different indeed.

Share this post


Link to post
Share on other sites
The previous version was buggy as all hell. Often the threads would lock up, especially during Finish() calls.

This version is working excellently with my main code. The Finish code would lock up if called on the main thread because it would suspend itself and lock the whole app. Having the thread spinwait until it's free is a tiny bit slower, but it is more reliable.

using System;
using System.Collections;
using System.Threading;

namespace Decimation
{
/// <summary>
/// Creates a multithreaded queue to run asynchronous operations on.
/// </summary>
public sealed class MultithreadedQueue
{
private class NotificationQueue : Queue
{
public event MultithreadedQueue.QueueFunctionHandler Queued;
public override void Enqueue(object obj)
{
base.Enqueue (obj);
Queued();
}
}

private Thread _TheThread;
private Queue _Handlers;
public delegate void QueueFunctionHandler();
private event QueueFunctionHandler _HandlerBinder;
private bool _Enabled = false;
private Queue _WaitThreads = new Queue();

public MultithreadedQueue()
{
NotificationQueue nQueue = new NotificationQueue();
_TheThread = new Thread(new ThreadStart(ProcessQueue));
nQueue.Queued += new QueueFunctionHandler(Handlers_Queued);
_Handlers = Queue.Synchronized(nQueue);
}

public MultithreadedQueue(ThreadPriority priority)
{
NotificationQueue nQueue = new NotificationQueue();
_TheThread = new Thread(new ThreadStart(ProcessQueue));
_TheThread.Priority = priority;
nQueue.Queued += new QueueFunctionHandler(Handlers_Queued);
_Handlers = Queue.Synchronized(nQueue);
}

~MultithreadedQueue()
{
Stop();
}

// Begins execution
public void Start()
{
_Enabled = true;
_TheThread.Start();
}

// Puts the thread to sleep and sets the asleep member
private void Suspend()
{
_TheThread.Suspend();
}

// Wakes the thread
private void Resume()
{
_TheThread.Resume();
}

// Stops the execution until start is called.
public void Stop()
{
_Enabled = false;
if (IsSleeping)
Resume();
}

// The queue's thread does runs through this loop
private void ProcessQueue()
{
while (_Enabled)
{
while (_Handlers.Count > 0)
{
QueueFunctionHandler dh = _Handlers.Dequeue() as QueueFunctionHandler;
_HandlerBinder += dh;
_HandlerBinder();
_HandlerBinder -= dh;
}
if (_Enabled)
Suspend();
}
}

// Returns the queue for addition or removal of functions
public Queue Queue
{
get
{
return _Handlers;
}
}

// Allows outside code to see if the execution thread is sleeping
public bool IsSleeping
{
get
{
return _TheThread.ThreadState == ThreadState.Suspended || _TheThread.ThreadState == ThreadState.SuspendRequested;
}
}

// This awakes the execution thread whenever the queue is no longer empty
private void Handlers_Queued()
{
if (IsSleeping)
Resume();
}

// This function will not return until the queue is cleared.
public void Finish()
{
while (!IsSleeping)
Thread.SpinWait(1);
}
}
}

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this