Jump to content

  • Log In with Google      Sign In   
  • Create Account






Coroutines: Simple HTTP Server

Posted by Net Gnome, 25 February 2014 · 475 views

Coroutine Concurrent Programming C# HTTP
Ok, what are we up to this time?
This time we're going to go about making a very simple HTTP server that will serve static content and handle roughly 20,000 requests per second (I know in earlier posts I stated ~30k, but I refined aspects of the server to be more standard friendly, and lost performance as a result). To do so we'll need to expand the AsyncCS library we've been working on a bit to include a couple things: Coroutine Workers and a Resource Pool.

NOTE: Under no circumstances should you ever use this server in a production setting. It is buggy and should not be considered secure. You have been warned.

What is a Coroutine Worker?
Simply put, a Coroutine Worker is just a worker thread and a worker thread is just a standalone thread that runs in a specialized infinite loop that works only on a a set of work that is given to it. In our case, it pulls its work from our Resource Pool.

What is a Resource Pool?
A Resource Pool is just that, a pool of resources. In our case, its a set of concurrent collections and some access mechanisms surrounding them.

What is a Concurrent Collection?
Concurrent collections are a special type of thread-safe collection (i.e., generic containers) that allow multiple threads to utilize them at any given time. They make our life easy. If you want an idea as to how .Net goes about implementing them, I suggest you take a gander over at the Mono project and view their source here.

So how do we go about putting this all together?
Ok, this is going to be mainly a straight-forward construction due to the amount of code involved, so no "as I figured it out" type posts, sorry :/ that said, first lets construct our Resource Pool. For it we'll need a few things: a FIFO structure, so a queue to contain coroutines as they are issued a way to issue a new coroutine and a way to place a coroutine back into the queue after we've done some work on it.
public static class ResourcePool{

	public static ConcurrentQueue<Coroutine> coroutine_queue = new ConcurrentQueue<Coroutine> ();

	public static void issue_coroutine(Coroutine coroutine){
		coroutine.initialize (null);
		ResourcePool.coroutine_queue.Enqueue (coroutine);
	}

	public static void issue_coroutine(Coroutine coroutine, object input){
		coroutine.initialize (input);
		ResourcePool.coroutine_queue.Enqueue (coroutine);
	}

	public static void enqueue_coroutine(Coroutine coroutine){
		ResourcePool.coroutine_queue.Enqueue (coroutine);
	}

	public static object[] parameterize(params object[] parameters){
		return parameters;
	}
}
What about the Worker?
The worker is pretty simple. It needs to have an interrupt able infinite loop (i.e., while(bool_variable)), the ability to retrieve a coroutine from the queue, a max number of coroutines to work on before ending its loop-cycle, a period to remain dormant after its loop-cycle is complete, a way to identify it, a way to force it to shutdown, and whatever other spurious info you may want (for debugging and testing i added a "tasks_completed" counter. My worker ended up looking like so:
public class Worker{

	public Worker(){
		RUNNING = true;
	}

	private static bool RUNNING = true;

	public int max_count = 10;

	public int tasks_complete = 0;

	public int ID = 0;

	public long sleep_time = 10000L;

	public void run(){

		Console.WriteLine ("Worker {0} Starting...",ID);

		while (RUNNING) {
			if (ResourcePool.coroutine_queue.Count > 0) {
				for (int i = 0; i < (ResourcePool.coroutine_queue.Count > max_count ? max_count : ResourcePool.coroutine_queue.Count); i++) {

					Coroutine coroutine;

					if (ResourcePool.coroutine_queue.TryDequeue (out coroutine)) {
						coroutine.next ();

						if (coroutine.can_move_next)
							ResourcePool.enqueue_coroutine (coroutine);
						else
							this.tasks_complete++;
					} else
						continue;
				}
			}

			Thread.Sleep (new TimeSpan(sleep_time));
		}
		Console.WriteLine ("Worker {0} Stopping... {1} Coroutines Completed...",ID, this.tasks_complete);
	}

	public void shutdown(){
		RUNNING = false;
	}
}
While this isn't perfect, its good to show as a way to implement a simple worker thread. Methods for improvement may be to create a number of queues equal to the number of workers and then distribute the workload among those. It could allow for more efficient processing. I just wanted to keep it simple and reduce the number of problem areas.

Ok, so how do I use them?
Using them is fairly easy. You basically create a worker, or an array of workers then start them running on separate threads. You then issue coroutines to be "worked" via the ResourcePool. That code would look like so (also found in my test cases):
public class AdderTask : Coroutine{
	private int val;

	public override IEnumerable<object> process ()
	{
		for (int i = 0; i < 100000; i++) {
			if (i % 2 == 0)
				val += 2;
			else
				val += 3;
		}

		yield return YieldComplete(val);
	}
}

public void do_stuff(){

        //create our worker and assign a thread to run it
	Worker worker = new Worker ();
	Thread thread = new Thread (worker.run);

        //add our coroutines to the queue
	for (int i = 0; i < 100; i++) {
		ResourcePool.issue_coroutine (new AdderTask ());
	}

        //start the thread, wait a second, then kill it
	thread.Start ();
	Thread.Sleep (1000);
	worker.shutdown ();
	thread.Join ();
}


fairly simple eh? Improvements here could be offered in setting some sort of boolean stating that the coroutine is "done" and have it set by the Worker when it "dumps" the coroutine.

Easy stuff, now what?
Now we'll work on creating a simple HTTP server. Since we're using coroutines, and .NET by default doesn't use them in its methods, we're going to need to work at a lower level. This means we will not get to use the nice friendly HTTP Server/Listener/Request/Response constructs. We get to use a grittier construct, a more evil construct, the infamous NETWORK SOCKET! MWAAA HAHAHHAHAHAH! err... umm... yea... Seriously though, the socket construct gives everything we need and lets us blow the doors off the other crap, err... I mean less flexible constructs.

Ok... so how does a HTTP server operate?
It 1) listens to requests on a given port; 2) when a request is received, it determines what the request is for; 3) attempts to fulfill that request; 4) responds as appropriate when that request has been attempted. Its a fairly simple 4-step process. Since we're going to handle static content we need a couple of things: A main listen-loop, a way to read data from a Socket, a way to interpret a request, a way to handle a request, a way to read files from the file system, a way to handle a response, and a way to send data to a Socket.

From the top, where do we start?
We start by setting up our workers and start listening on ports. If/when we get a request, we want a way to handle it and then toss it that way and go back to listening. That construct looks as follows:
class MainClass
{
	private static Socket listener;
	private static List<Worker> workers = new List<Worker>();
	static bool run = true;

	public static void Main (string[] args)
	{
		int numThreads = int.Parse(args[0]);//Environment.ProcessorCount;

		Thread[] threads = new Thread [numThreads];

		for (int i = 0; i < numThreads; i++) {
			Worker worker = new Worker();
			worker.ID = i;
			worker.max_count = 10;
			worker.sleep_time = 10000L;//sleep for 1ms
			workers.Add (worker);
			threads [i] = new Thread (worker.run);
			threads [i].Start ();
		}

		listener = new Socket (AddressFamily.InterNetwork,
		                       SocketType.Stream,
		                       ProtocolType.Tcp);

		//just use standard localhost and http testing port
		IPAddress address = IPAddress.Parse("127.0.0.1");
		IPEndPoint endpoint = new IPEndPoint (address, 8080);
		listener.Bind (endpoint);			

		Console.Out.WriteLine ("listening...");

		//run forever
		while (run) {
			try{
				listener.Listen (1000);
				Socket sock = listener.Accept ();

				ResourcePool.issue_coroutine(new RequestHandler(), sock);

			}catch(Exception){
			}
		}

		listener.Close ();
	}
}
pretty basic eh? what is nice here is that socket.Accept() gives us a reference to a socket for that specific connection request.

How do we handle those requests?
We use a special Coroutine to handle them, and it is called RequestHandler. With it we want to read the data from the socket, then use that data to create a basic Request object. We'll then interrogate that object to determine what we need to do, then grab the data needed, build a response around that data, then send it over the socket to the computer that requested it. All that looks like this:
public class RequestHandler : Coroutine
{
	private Socket _socket = null;

	public override void initialize (object in_value)
	{
		_socket = (Socket)in_value;

		base.initialize (in_value);
	}

	public override IEnumerable<object> process ()
	{
		//read entire request
		SocketReader sock_reader = new SocketReader ();
		yield return YieldFrom(sock_reader,this._socket);

		string data = sock_reader.data;

		Request req = new Request (data);
		if (!req.parse ()) {
			this._socket.Close ();
			yield return YieldComplete ();
		}

		//double check that this was infact a GET request
		if (req.method != "GET") {
			//not a GET request, terminate request
			this._socket.Close ();
			yield return YieldComplete ();
		}

		//if its a valid resource, retrieve it
		if (req.uri != "") {
			//retrieve resource and then send
			Reader reader = new Reader();
			yield return YieldFrom(reader, req.uri);

			if (reader.data == null) {
				this._socket.Close ();
				yield return YieldComplete ();
			}

			Response response = new Response (reader.data);
			response.prepare_data ();

			if (response.data == null) {
				this._socket.Close ();
				yield return YieldComplete ();
			}

			object[] pkg = new object[2];
			pkg [0] = this._socket;
			pkg [1] = response.data;
			yield return YieldFrom (new SocketSender (), pkg);

		} else {
			//was not a valid resource...
			Console.Out.WriteLine ("Bad Request...");
		}

		//clean up and complete
		this._socket.Close();
		yield return YieldComplete();
	}

}
Nothing fancy, just the basics.
Ok, how do those smaller pieces work? Start with the SocketReader...
This is where coroutines start to show their strength. Their ability to tackle pieces of a larger task becomes valuable during I/O operations, especially during socket reading. I/O takes time... a... loooooonnnnngggg.... time.... and you could be doing something useful in that time. Coroutines allow you to do smaller I/O operations, then yield to do other work, then come back where you left off and continue onward. Its these aspects that allow you to achieve higher concurrency than other models when properly implemented. Anyway, enough with the proselytizing, time to show some code:
public class SocketReader : Coroutine
{
	private static int BUFFER_SIZE = 1024;
	private Socket _socket = null;
 	public string data = "";

        public SocketReader ()
	{
	}

	#region implemented abstract members of Coroutine

	public override void initialize (object in_value)
	{
		_socket = (Socket)in_value;
		base.initialize (in_value);
	}

	public override IEnumerable<object> process ()
	{
		byte[] buffer = new byte[BUFFER_SIZE];
		int dataSize = 0;

		//read entire request
		while((dataSize = _socket.Receive (buffer)) == BUFFER_SIZE){

			data += Encoding.UTF8.GetString (buffer, 0, dataSize);

			yield return data;
		}

		//convert to a string
		data += Encoding.UTF8.GetString (buffer, 0, dataSize);

		yield return YieldComplete (data);
	}

	#endregion
}
While nice 'n neat, this code does have an edge cast to beware of. If the incoming data is a multiple of BUFFER_SIZE, you could end up calling _socket.Receive() an additional time. This would be a blocking operation and would cause it to hang until it times out... fun times... Why did I choose 1024 as my buffer size? No reason especially, I like 1024, its nice, and power of two-y. Though keeping this small-ish is to your benefit. Remember, more time reading I/O is time you could be doing OTHER stuff!

So what does Request do?
Nothing but parse the data. Its nothing special, just attempts to loosely follow the HTTP 1.0 spec and parse the basic request info (i.e., request command [GET], resource [/get/this/resource/located/here/dag/nabit], and HTTP version [HTTP/1.0]). Its nothing special, you can look at it at the cHTTP link at the end of the article.

Ok then, What about Reader?
Reader works alot like SockerReader, except grabs stuff from the file system. It looks like so:
public class Reader : Coroutine
{
	private string _file_name = "";
	public byte[] data;
	private int BUFFER_SIZE = 1024;

	#region implemented abstract members of Coroutine
	public override void initialize (object in_value)
	{
		_file_name = ((string)in_value).Remove(0,1);

		base.initialize (in_value);
	}

	public override IEnumerable<object> process ()
	{
		if(!File.Exists(_file_name))
		   yield return YieldComplete();

		//open file so that it can be read by multiple threads
		FileStream fs = new FileStream (_file_name, FileMode.Open, FileAccess.Read, FileShare.Read);
		BinaryReader br = new BinaryReader (fs);

		//get length and how many iterations will be required for BUFFER_SIZE
		long length = fs.Length;
		data = new byte[length];

		int index = 0;
		int block_size = data.Length > BUFFER_SIZE ? BUFFER_SIZE : data.Length;

		//grab that file!
		while (br.Read (data, index, block_size) > 0) {
			index += block_size;
			block_size = data.Length - index > BUFFER_SIZE ? BUFFER_SIZE : data.Length - index;
			yield return data;
		}

		br.Close ();
		fs.Close ();

		yield return YieldComplete (data);
	}
	#endregion
}
Notice that we share read access to the file we are reading. This is due to file system read-locks. We tell the reader that we don't want to lock the file. Which is important, especially for static content given we are not using a cache. It allows other threads to read the file while another thread is reading the file, yay! The rest just reads the file piecemeal and yields progressive data chunks.

What about Response?
Its kinda like Request. In its case, it just generates a very basic HTTP 1.0 compliant response. Some additional re-factoring could be done to allow it to be created piecemeal in case the data payload is large, allowing more stuff to get done while its being constructed.

Ok, on to SocketSender!
SocketSender, like the other I/O classes sends the data payload piecemeal over the given socket though successive yields. Again, this allows work to happen in-between yields. It looks like so:
public class SocketSender : Coroutine
{
	private int BUFFER_SIZE = 1024;
	private Socket _socket = null;
	private byte[] _data;

	public SocketSender ()
	{
	}

	public override void initialize (object in_value)
	{
		object[] pkg = (object[])in_value;
		this._socket = (Socket)pkg [0];
		this._data = (byte[])pkg [1];
		base.initialize (in_value);
	}

	#region implemented abstract members of Coroutine

	public override IEnumerable<object> process ()
	{
		int index = 0;
		int block_size = _data.Length > BUFFER_SIZE ? BUFFER_SIZE : _data.Length;

		while (_socket.Send(_data, index, block_size, SocketFlags.None) > 0) {
			index += block_size;
			block_size = _data.Length - index > BUFFER_SIZE ? BUFFER_SIZE : _data.Length - index;
			yield return Yield ();
		}

		yield return YieldComplete ();
	}

	#endregion
	}
}
Very similar to our other classes.

Anything Else?
Nope, that is all there is to it. To test it, I compiled it on Kubuntu 14.04 using Mono and ran it with 32 threads. I then had Apache Bench toss 100,000 requests its way using the command "ab -n 100000 -c 32 http://127.0.0.1:8080/index.html". The results were as so:
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        
Server Hostname:        127.0.0.1
Server Port:            8080

Document Path:          /index.html
Document Length:        708 bytes

Concurrency Level:      32
Time taken for tests:   5.090 seconds
Complete requests:      100000
Failed requests:        0
Total transferred:      75200000 bytes
HTML transferred:       70800000 bytes
Requests per second:    19646.46 [#/sec] (mean)
Time per request:       1.629 [ms] (mean)
Time per request:       0.051 [ms] (mean, across all concurrent requests)
Transfer rate:          14427.87 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.2      0       2
Processing:     0    1   1.0      1      25
Waiting:        0    1   0.9      1      25
Total:          0    2   1.0      1      26

Percentage of the requests served within a certain time (ms)
  50%      1
  66%      2
  75%      2
  80%      2
  90%      2
  95%      3
  98%      5
  99%      6
 100%     26 (longest request)
Not too bad, 95% of the requests took only 3ms to accomplish Posted Image

So what is next?
Well, first you can check out the full project over at github called cHTTP. Next I'll go about talking abstractly on how coroutines could be used to create a basis of a concurrent game engine. No promises and nothing fancy, it'll be more of a theory article. I'll try to whip up a SDL2-CS example, but we'll see. SDL2-CS is tricky to use as it is a wrapper around SDL2, but it would be a good place to start and would be familiar ground.

Anyway, that's all for now.




August 2014 »

S M T W T F S
     12
3456789
10111213141516
17181920 21 2223
24252627282930
31      

Recent Comments

PARTNERS