Jump to content

  • Log In with Google      Sign In   
  • Create Account


Morxeton

Member Since 07 Jan 2009
Offline Last Active May 14 2013 07:35 AM
-----

Topics I've Started

C# .NET 4.0/4.5 UDP send issue

18 March 2013 - 11:08 AM

Hello,

 

I am working on writing a network library in C# and originally had used the .NET 3.5 Framework.  I recently decided to switch to .NET 4.5 but started running into an issue with sending UDP packets.  What I'm running into is if UDP packets are sent too fast, the Socket.SendToAsync method completes with a SocketError of AddressFamilyNotSupported and the packets are never sent.  If I switch the project to .NET 3.5, I never run into the issue no matter how hard I try to repeat it.  This also can be reproduced in .NET 4.0.

 

I attached a project I put together to reproduce the issue.  If you spam the "ClientSnd" or "ServerSnd" buttons you'll see the error occur.  Switch the project to .NET 3.5 and spam all you want... no issues at all.

 

I haven't been able to find much useful information googling this issue.  Any ideas?

 

EDIT (added code from the sample project demoing the issue):

 

Here's where the binds are happening for both the client and server:

 

 

 

                
byte[] clientBuffer = new byte[32768];                
byte[] serverBuffer = new byte[32768];
 
                
IPEndPoint clientLocalEndPoint = GetLocalIPEndPoint(0, AddressFamily.InterNetwork);                
IPEndPoint serverLocalEndPoint = GetLocalIPEndPoint(6337, AddressFamily.InterNetwork);
 
                
m_ClientSocket.ExclusiveAddressUse = true;                
m_ServerSocket.ExclusiveAddressUse = true;
                
m_ClientSocket.Bind(clientLocalEndPoint);                
m_ServerSocket.Bind(serverLocalEndPoint);
 
                
m_ClientSendArgs.RemoteEndPoint = GetRemoteIPEndPoint("127.0.0.1", 6337, AddressFamily.InterNetwork);                
m_ClientRecvArgs.RemoteEndPoint = m_ClientSocket.LocalEndPoint;
                
m_ServerSendArgs.RemoteEndPoint = GetRemoteIPEndPoint("127.0.0.1", ((IPEndPoint)m_ClientSocket.LocalEndPoint).Port, AddressFamily.InterNetwork);                
m_ServerRecvArgs.RemoteEndPoint = m_ServerSocket.LocalEndPoint;
 
                
m_ClientSendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnClientCompletion);                
m_ClientRecvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnClientCompletion);                
m_ServerSendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnServerCompletion);                
m_ServerRecvArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnServerCompletion);
 
                
m_ClientRecvArgs.SetBuffer(clientBuffer, 0, clientBuffer.Length);                
m_ServerRecvArgs.SetBuffer(serverBuffer, 0, serverBuffer.Length);
 
                
ClientReceive();                
ServerReceive();
 

 

 

The GetRemoteIPEndPoint and GetLocalIPEndPoint methods:

 

 

 

private static IPEndPoint GetRemoteIPEndPoint(string address, int port, AddressFamily addressFamily)        
{            
	IPAddress[] ipAddresses = null;
            
	ipAddresses = Dns.GetHostAddresses(address);
	
	List<IPEndPoint> ipEndPointList = new List<IPEndPoint>();
 
	for (int i = 0; i < ipAddresses.Length; i++)            
	{                
		IPAddress ipAddress = ipAddresses[i];
 
		if (ipAddress.AddressFamily == addressFamily)                
		{                    
			IPEndPoint ipEndPoint = new IPEndPoint(ipAddress, port);

			ipEndPointList.Add(ipEndPoint);                
		}
	}

	return ipEndPointList.ToArray()[0];
}

private static IPEndPoint GetLocalIPEndPoint(int port, AddressFamily addressFamily)        
{            
	IPEndPoint localEndPoint = null;

	switch (addressFamily)            
	{                
		case AddressFamily.InterNetwork:                    
			{                        
				localEndPoint = new IPEndPoint(IPAddress.Any, port);

				break;                    
			}                
		case AddressFamily.InterNetworkV6:                    
			{                        
				localEndPoint = new IPEndPoint(IPAddress.IPv6Any, port);

				break;                    
			}
	}

	return localEndPoint;

} 

 

Since this happens regardless of who sends the data (client or server), I'll focus on the client being the sender:

 

Clicking the ClientSnd button:

 

private void Button_ClientSnd_Click(object sender, RoutedEventArgs e)        
{
	lock (SyncRoot)            
	{                
		byte[] buffer = Encoding.ASCII.GetBytes("Hello there.  Just testing.  Nothing to see here.  Move along.");

		m_ClientSendQueue.Enqueue(buffer);

		if (!m_ClientTransmitting)
		{                    
			m_ClientTransmitting = true;

			ClientSendBuffer();                
		}            
	}
} 

 

 

Sending methods for the client:

 

private void ClientSendBuffer()        
{
	lock (SyncRoot)            
	{                
		if (m_ClientSendQueue.Count > 0)                
		{                    
			byte[] buffer = m_ClientSendQueue.Dequeue();

			m_ClientSendArgs.SetBuffer(buffer, 0, buffer.Length);

			ClientSend();                
		}                
		else                
		{                    
			m_ClientTransmitting = false;                
		}            
	}
}
 
private void ClientSend()        
{
	if (!m_ClientSocket.SendToAsync(m_ClientSendArgs))            
	{                
		OnClientCompletion(this, m_ClientSendArgs);            
	}
} 

 

Completion callback for the client:

 

 

 

private void OnClientCompletion(object sender, SocketAsyncEventArgs e)        
{            
	SocketError socketError = e.SocketError;
 
	if (socketError != SocketError.Success)            
	{                
		ClientConsoleWrite("SocketError: {0}\r\n", socketError);            
	}

	switch (e.LastOperation)            
	{                
		case SocketAsyncOperation.SendTo:                    
			{                        
				if (socketError == SocketError.Success)                        
				{                            
					ClientConsoleWrite("Client message sent\r\n");                        
				}

				ClientSendBuffer();

				break;                    
			}                
		case SocketAsyncOperation.ReceiveFrom:                    
			{                        
				int bytesTransferred = e.BytesTransferred;

				byte[] buffer = new byte[bytesTransferred];

				Buffer.BlockCopy(e.Buffer, e.Offset, buffer, 0, bytesTransferred);

				string message = Encoding.ASCII.GetString(buffer);

				ClientConsoleWrite("Message received: {0}\r\n", message);

				ClientReceive();

				break;                    
			}            
	}
} 

Design for sending/receiving over async sockets for both UDP/TCP?

11 March 2012 - 10:53 AM

Hi everyone,

I'm working on a networking library written in C# that I plan to use in games that I will eventually write. The library supports both UDP and TCP. I'm trying to figure out the best design for sending and receiving data over asynchronous sockets. Here's a quick overview of my current design (using new async sockets methods in .NET 3.5):

UDP
For receiving, a call is made to ReceiveFromAsync and in the callback, grab the data from the buffer and pass along the buffer and IPEndPoint over into a method which will queue it up to be read in a dedicated thread loop that will dequeue and handle each packet. ReceiveFromAsync is then called at the end of the callback.

For sending, a call is made to the "Send(Packet)" method on a custom class "SocketState" that contains the reference to the socket and end point to send to. This method will call into another method and enqueue the data in a send queue to be handled by the same dedicated thread loop that is handling receive.

Should I use a separate thread loop for sending, or am I complicating things and should just send the packet directly? I am not yet done with the implementation. I am going to be accounting for sequence numbers, ACKs, and speed control (packets per second), etc.

TCP
For receiving, as data is read from the async socket, it's passed into a method that reads the packet header info and figures out where the packet ends and next begins based on the header info of each packet (or if static packet the attributes of the packet type). Each packet is handled during the callback and not a dedicated thread loop like UDP.

For sending, the data is just sent directly and at the end of the send callback, it calls back to the send method again and checks if there is more data to be sent, and if so, continues to send data until there is nothing left in the queue.

So you can see that in UDP I am using dedicated threads for receiving, handling and sending data whereas TCP it does a lot of processing in its callbacks.

Does anyone see any problems with the design of one or both and have suggestions to make improvements? Need more information? If so, please let me know. I don't have a lot of experience with network design. I appreciate any advice/feedback/suggestions, etc.! Posted Image

C# "Memory leak" issue

05 February 2010 - 01:58 PM

WARNING: This is a loooooooong post... I am working on a network library written in C# using the new async sockets methods in .NET 3.5 (SendAsync, ReceiveAsync, etc.). The library supports TCP and will eventually support UDP. I've rewritten the project from scratch a few months ago and I've now gotten to a point to begin some stress testing. My stress test consists of a test client and a test server where the client makes 200 connections to the server. The connections authenticate then I just let the server send ping packets (custom TCP packets) to the client in which the client responds with a pong packet (lets the server know the connection is alive and to stop the idle disconnect timer). The ping packets are being sent across all 200 connections once every second. After several minutes of testing, I've noticed that the resource usage in task manager is exceptionally high. I know using task manager isn't necessarily accurate for determining how much memory a .NET app is using, but if you watch a .NET process, you will usually see the memory climb and fall as GC is doing its job. In my case, the memory started out at around 30 MB (in debug mode in VS 2008), and by the time I decided to stop the process and debug, it was at around 380 MB. I used WinDbg to take a peek at the heap, and I noticed an awful lot of thread objects on the heap:
000007feefe5a8f0    57914      1389936 System.Threading.Overlapped
000007feefe27a90    59351      1899232 System.WeakReference
000007feee382ab8    59315      2372600 System.Windows.Forms.WindowsFormsSynchronizationContext
000007feefe393c8    57920      3706880 System.Threading.IOCompletionCallback
000007feefe28a28    59009      3776576 System.Threading.ContextCallback
000007feefa094d8    57600      6912000 System.Threading.OverlappedData
000007feefe32cb8   117514      8461008 System.Runtime.Remoting.Messaging.LogicalCallContext
000007feefe29c40   117569      8464968 System.Threading.ExecutionContext
000007feef820048    59006     24546496 System.Net.Sockets.SocketAsyncEventArgs

The 2nd column above is the number of objects. As you can see, that's A LOT of objects. It seems like the callbacks are not completing when you take a look at the heap, but the application is not failing by any means. All 200 ping packets make it across and 200 pong packets back across. Task manager shows only 25 threads being used. I have googled all over the place the past few days after discovering this issue and I have not been able to find any good answers as of yet. I'll post code from the library that is relevant to the issue. I'll start from the sender side. Here's how a packet is created (this is an extension method for the socketState):
        public static PacketWriter CreatePacket(this BaseSocketState socketState, Type type, params object[] data)
        {
            byte id = 0x00;

            if (!PacketInfo.GetPacketId(type, out id))
            {
                StatusHandler.GenerateMessage(0x0034, socketState, type.Name);

                return null;
            }

            BasePacket packet = (BasePacket)Factory.CreateInstance(type)();

            packet.PacketInfo = BaseController.GetPacketInfo(socketState, id);

            if (packet.IsStaticLength)
            {
                packet.Buffer = new byte[packet.PacketInfo.StaticLength];
            }

            PacketWriter writer = PacketWriterPool.Acquire();

            writer.Set(packet, socketState);

            if (data.Length &gt; 0)
            {
                writer.MultiWrite(data);
            }

            return writer;
        }

A new instance of the packet is created based off of the type using a custom object creation method. A PacketWriter (which is just a wrapper for the packet's buffer) is acquired from a pool and then assigned the packet and the socket in which the packet is to traverse. The writer is returned and therefore can be used to write data to the buffer (i.e. writer.Write(object)). When the writer is closed, the packet is returned. Passing in a boolean of true into the close method will send the packet upon closing. The Close method in PacketWriter:
        public BasePacket Close(bool transmitAfterClose)
        {
            if (m_Packet == null)
            {
                StatusHandler.GenerateMessage(0x0038, m_SocketState, "Close");

                this.Release();

                return null;
            }
            else if (m_Packet.Status == PacketStatus.Invalid)
            {
                StatusHandler.GenerateMessage(0x003D, m_SocketState, PacketInfo.ProtocolType, PacketInfo.Id);

                this.Release();

                return null;
            }
            else if (m_Packet.IsStaticLength && m_Packet.Buffer.Length != m_BufferStream.Length)
            {
                StatusHandler.GenerateMessage(0x003E, m_SocketState, PacketInfo.ProtocolType, PacketInfo.Id, m_BufferStream.Length, m_Packet.Buffer.Length);

                m_Packet.Status = PacketStatus.Invalid;

                this.Release();

                return null;
            }

            byte[] data = m_BufferStream.ToArray();

            m_BufferStream.SetLength(0);
            m_BufferStream.Seek(0, SeekOrigin.Begin);

            m_BufferStream.WriteByte(m_Packet.Id);

            bool encryptData = false, compressData = false;

            if (PacketInfo.UsePacketOptions)
            {
                m_BufferStream.WriteByte((byte)PacketOptions);

                encryptData = m_SocketState.HasValidCrypto() && EncryptionEnabled;
                compressData = CompressionEnabled;
            }
            else
            {
                encryptData = m_SocketState.HasValidCrypto() && PacketInfo.UseEncryption;
                compressData = PacketInfo.UseCompression;
            }

            if (encryptData)
            {
                data = Encryption.Encrypt(data, m_SocketState);
            }

            if (compressData)
            {
                data = Compression.Compress(data);
            }

            if (!m_Packet.IsStaticLength)
            {
                byte[] lengthBytes = Support.GetBytes((ushort)data.Length);

                m_BufferStream.Write(lengthBytes, 0, lengthBytes.Length);
            }

            m_BufferStream.Write(data, 0, data.Length);

            m_Packet.Buffer = m_BufferStream.ToArray();

            m_BufferStream.Release();
            m_BufferStream = null;

            m_Packet.Status = PacketStatus.Sealed;

            BasePacket packet = m_Packet;

            BaseSocketState socketState = m_SocketState;

            this.Release();

            if (transmitAfterClose)
            {
                socketState.Send(packet);
            }

            return packet;
        }

The m_BufferStream is a MemoryStream that was acquired from the MemoryStreamPool at time of construction. this.Release() calls to an extension method in the PacketWriterPool that simply resets the PacketWriter (returns m_BufferStream to the MemoryStreamPool, nulls all references, etc) and returns it to the PacketWriterPool. Now that we have our packet, it can be sent if it hasn't already. Now moving onto the socketState.Send(packet) method which is an extension method:
        public static void Send(this BaseSocketState socketState, BasePacket packet)
        {
            if (!socketState.Connected || socketState.IsDisposing)
            {
                socketState.Transmitting = false;

                return;
            }

            if (packet.Status != PacketStatus.Sealed)
            {
                StatusHandler.GenerateMessage(0x0036, socketState, packet.ProtocolType, packet.Id, packet.Status);

                return;
            }

            bool sendNow = false;

            lock (socketState.PacketQueue)
            {
                if (socketState.PacketQueue.Count == 0)
                {
                    sendNow = true;
                }

                socketState.PacketQueue.Enqueue(packet);
            }

            if (sendNow && !socketState.Transmitting)
            {
                socketState.Transmitting = true;

                socketState.Send();
            }
        }

The packet enters a queue and if the socket is not currently transmitting, Send() will be kicked off to start processing the queue. If it is currently transmitting, the queue will be handled after each send has completed. The Send() method:
        internal void Send()
        {
            if (!Connected || IsDisposing)
            {
                m_Transmitting = false;

                return;
            }

            SetSocketArgs(true);

            BasePacket packet = null;

            lock (m_PacketQueue)
            {
                if (m_PacketQueue.Count &gt; 0)
                {
                    packet = m_PacketQueue.Peek();
                }
                else
                {
                    m_Transmitting = false;
                }
            }

            if (packet != null)
            {
                if (m_TransferSpeed == 0)
                {
                    m_LastPacketSent = packet;

                    Send(packet.Buffer);
                }
                else
                {
                    m_SpeedThrottleTimer.Packet = packet;
                    m_SpeedThrottleTimer.Start();
                }
            }
        }

I created a speed throttling mechanism making use of a custom timer system, in which I will not be going into since this stress test is not focused on that. As you can see the PacketQueue is Peek()ed to grab the next packet but does not yet Dequeue and remove the packet. That is done after the packet was successfully sent, so if the connection is lost the remaining packets in the queue will be repackaged into another queue that is then passed into an event. This allows the application to possibly transmit the packets after the connection is restored. SetSocketArgs(bool) method:
        internal void SetSocketArgs(bool sendArgs)
        {
            if (sendArgs)
            {
                m_SendArgs = m_ArgsPool.Acquire();

                if (m_SendArgs != null)
                {
                    m_SendArgs.UserToken = this;
                }
            }
            else
            {
                m_RecvArgs = m_ArgsPool.Acquire();

                if (m_RecvArgs != null)
                {
                    m_RecvArgs.AssignBuffer();
                    m_RecvArgs.UserToken = this;

                    if (ProtocolType == ProtocolType.Udp)
                    {
                        m_RecvArgs.RemoteEndPoint = m_RemoteEndPoint;
                    }
                }
            }
        }

There is a pool for socket args as well. If they are receive args, the buffer is assigned a portion of a large preallocated byte array to reduce fragmentation and increase performance. The Send(byte[]) method:
        private void Send(byte[] buffer)
        {
            if (m_SendArgs != null)
            {
                m_SendArgs.SetBuffer(buffer, 0, buffer.Length);

                SendData();
            }
        }
The send buffer is assigned then the SendData() method, which is the method that FINALLY sends the data to the other side, is executed. All previous Send methods except for the Send(packet) extension method reside in a BaseSocketState class. SendData is a virtual method which is overridden by TCPSocketState and UDPSocketState, since sending data via TCP and UDP are different. SendData() from TCPSocketState:
        protected override void SendData()
        {
            try
            {
                if (!Socket.SendAsync(m_SendArgs))
                {
                    OnSent(this, m_SendArgs);
                }
            }
            catch { }
        }
SendAsync returns false if it processed synchronously, in which then you must call the method directly for post-send handling. OnSent method:
        protected static void OnSent(object sender, SocketAsyncEventArgs e)
        {
            BaseSocketState socketState = (BaseSocketState)e.UserToken;

            if (socketState == null)
            {
                return;
            }

            socketState.m_OnSent = true;

            socketState.m_LastActivityTime = DateTime.Now;

            int bytesTransferred = e.BytesTransferred;

            socketState.m_TotalBytesSent += bytesTransferred;

            if (socketState.m_ClientState != null)
            {
                socketState.m_ClientState.TotalBytesSent += bytesTransferred;
            }

            if (socketState.m_NormalSendMode)
            {
                lock (socketState.m_PacketQueue)
                {
                    if (socketState.m_PacketQueue.Count &gt; 0)
                    {
                        socketState.m_PacketQueue.Dequeue();
                    }
                }

                if (socketState.m_LastPacketSent != null)
                {
                    StatusHandler.GenerateMessage(0x801B, socketState, socketState.ProtocolType, socketState.m_LastPacketSent.Id, socketState.m_LastPacketSent.Buffer.Length -
                        socketState.m_LastPacketSent.PacketInfo.HeaderSize, socketState.m_LastPacketSent.Buffer.Length);

                    socketState.m_LastPacketSent.OnAfterSent(socketState);
                }

                socketState.m_OnSent = false;

                socketState.Send();
            }
            else
            {
                lock (socketState.m_SpeedThrottleTimer.SentPackets)
                {
                    while (socketState.m_SpeedThrottleTimer.SentPackets.Count &gt; 0)
                    {
                        BasePacket packet = socketState.m_SpeedThrottleTimer.SentPackets.Dequeue();

                        StatusHandler.GenerateMessage(0x801B, socketState, socketState.ProtocolType, packet.Id, packet.Buffer.Length - packet.PacketInfo.HeaderSize, packet.Buffer.Length);

                        packet.OnAfterSent(socketState);
                    }
                }

                socketState.m_OnSent = false;

                socketState.ThrottledSend();
            }
        }

m_OnSent is set to true here in case a packet's OnAfterSent method calls to disconnect the socket. The SocketState's dispose method will not wait for m_Transmitting to become false in this case, otherwise it would result in a deadlock. m_NormalSendMode just means that the socket is not currently throttling the speed in which the packets are transmitted. The PacketQueue is Dequeued to remove the packet that was just sent then Send() is called again to see if there are more packets in the queue to be sent. That is pretty much from the sending side of things without going into too much of the rest of the code. That is a lot in itself already. Now here's the receiving side of things... Receive() method:
        public static void Receive(this BaseSocketState socketState)
        {
            socketState.SetSocketArgs(false);
            socketState.ReceiveData();
        }
After a connection is established, this method is called to wait for incoming data. ReceiveData() method in TCPSocketState:
        internal override void ReceiveData()
        {
            try
            {
                if (m_RecvArgs != null)
                {
                    if (!Socket.ReceiveAsync(m_RecvArgs))
                    {
                        OnReceived(this, m_RecvArgs);
                    }
                }
            }
            catch { }
        }
This is nearly identical to the SendData() method and simply starts the wait to receive process. OnReceived method:
        protected static void OnReceived(object sender, SocketAsyncEventArgs e)
        {
            BaseSocketState socketState = (BaseSocketState)e.UserToken;

            if (socketState == null)
            {
                return;
            }

            socketState.m_LastActivityTime = DateTime.Now;

            int bytesTransferred = e.BytesTransferred;

            if (bytesTransferred &gt; 0)
            {
                MemoryStream memoryStream = MemoryStreamPool.Acquire();

                memoryStream.Write(e.Buffer, e.Offset, bytesTransferred);

                socketState.m_InboundBuffer = memoryStream.ToArray();

                memoryStream.Release();

                socketState.m_InboundBytesRead = bytesTransferred;
                socketState.m_TotalBytesRecv += bytesTransferred;

                if (socketState.m_ClientState != null)
                {
                    socketState.m_ClientState.TotalBytesRecv += bytesTransferred;
                }

                lock (socketState.lo_ReadData)
                {
                    socketState.Read();
                }

                socketState.Receive();
            }
            else
            {
                socketState.Disconnect(false);
            }
        }

The data is pulled from the buffer, temporarily stored in a MemoryStream from the pool, then stored in the InboundBuffer for processing. The object lo_ReadData (lo for lock object) is locked here and also in the Dispose method, so if a socket is in the middle of reading and Dispose is called, it won't throw an exception as Dispose will have to wait for the read to finish first. Read() method:
        internal static void Read(this BaseSocketState socketState)
        {
            if (!socketState.Connected || socketState.IsDisposing)
            {
                return;
            }

            int readAttempts = 0, maxReadAttempts = socketState.InboundBytesRead + 1;

            // Loop while the pointer is less than the amount of bytes to be read.
            while (socketState.Pointer &lt; socketState.InboundBytesRead)
            {
                if (readAttempts &gt; maxReadAttempts)
                {
                    StatusHandler.GenerateMessage(0x0037, socketState);

                    socketState.Disconnect(true);

                    break;
                }

                PacketReader packetReader = socketState.PacketReader;

                switch (packetReader.ReadStatus)
                {
                    case PacketReadStatus.Id:
                        {
                            // Read the first byte of incoming data which is to be the packet Id.
                            byte id = socketState.InboundBuffer[socketState.Pointer++];

                            packetReader.PacketInfo = BaseController.GetPacketInfo(socketState, id);

                            // The packet Id that was read is invalid.  An attempt to read the rest of the data will still be made but will not be processed.
                            if (packetReader.PacketInfo.IsNullPacketInfo)
                            {
                                StatusHandler.GenerateMessage(0x001C, socketState, socketState.ProtocolType, id);
                            }

                            // If a static packet is received with a length of zero, nothing else will be read so pass the packet to the handler.
                            if (packetReader.PacketInfo.IsSingleBytePacket)
                            {
                                packetReader.OnHandle(socketState);
                            }
                            else
                            {
                                if (socketState.ProtocolType == ProtocolType.Tcp)
                                {
                                    packetReader.ReadStatus = PacketReadStatus.Options;
                                }
                                else
                                {
                                    packetReader.ReadStatus++;
                                }
                            }

                            break;
                        }
                    case PacketReadStatus.Sequence:
                        {
                            // TODO: Finish UDP implementation.

                            break;
                        }
                    case PacketReadStatus.ACKs:
                        {
                            // TODO: Finish UDP implementation.

                            break;
                        }
                    case PacketReadStatus.Options:
                        {
                            if (packetReader.PacketInfo.UsePacketOptions)
                            {
                                // Read the next byte of incoming data which is to be the PacketOptions.
                                packetReader.PacketOptions = (PacketOptions)socketState.InboundBuffer[socketState.Pointer++];
                            }

                            packetReader.ReadStatus++;

                            break;
                        }
                    case PacketReadStatus.Length:
                        {
                            bool proceed = false;

                            if (!packetReader.PacketInfo.IsStaticLength)
                            {
                                byte[] result = null;

                                if (socketState.ReadBuffer(out result))
                                {
                                    packetReader.Length = BitConverter.ToUInt16(result, 0);

                                    proceed = true;
                                }
                            }
                            else
                            {
                                ushort length = packetReader.PacketInfo.StaticLength;

                                if (socketState.HasValidCrypto())
                                {
                                    bool encryptionEnabled = false;

                                    if (packetReader.PacketInfo.UsePacketOptions)
                                    {
                                        encryptionEnabled = packetReader.EncryptionEnabled;
                                    }
                                    else
                                    {
                                        encryptionEnabled = packetReader.PacketInfo.UseEncryption;
                                    }

                                    if (encryptionEnabled)
                                    {
                                        length = (ushort)socketState.CryptoInfo.GetEncryptedLength(length);
                                    }
                                }

                                packetReader.Length = length;

                                proceed = true;
                            }

                            if (proceed)
                            {
                                if (packetReader.Length &gt; 0)
                                {
                                    packetReader.ReadStatus++;
                                }
                                else
                                {
                                    packetReader.OnHandle(socketState);
                                }
                            }

                            break;
                        }
                    case PacketReadStatus.Buffer:
                        {
                            byte[] result = null;

                            if (socketState.ReadBuffer(out result))
                            {
                                packetReader.OnHandle(result, socketState);
                            }

                            break;
                        }
                }

                readAttempts++;
            }

            socketState.Reset();
        }

This method first reads the packet id from the buffer and looks it up. A PacketReader instance, which is stored on the BaseSocketState, temporarily holds the data that will be handled after all data for the packet has been read. If using packet options (used for turning on/off encryption and compression on the fly), that byte is read. If the packet is a static packet, no length will be read. Finally the buffer is read if there is anything to read. ReadBuffer method:
        private static bool ReadBuffer(this BaseSocketState socketState, out byte[] result)
        {
            int bytesToRead = socketState.PacketReader.BytesLeft;

            if (socketState.PacketReader.BufferStream == null)
            {
                socketState.PacketReader.BufferStream = MemoryStreamPool.Acquire();
            }

            MemoryStream memoryStream = socketState.PacketReader.BufferStream;

            result = null;

            if (socketState.Pointer &gt; socketState.InboundBytesRead)
            {
                StatusHandler.GenerateMessage(0x001D, socketState, socketState.PacketReader.Id);

                socketState.Reset();

                return false;
            }

            int bytesLeft = socketState.InboundBytesRead - socketState.Pointer;

            if (bytesToRead &gt; bytesLeft)
            {
                memoryStream.Write(socketState.InboundBuffer, socketState.Pointer, bytesLeft);

                socketState.Pointer += bytesLeft;

                return false;
            }
            else
            {
                memoryStream.Write(socketState.InboundBuffer, socketState.Pointer, bytesToRead);

                socketState.Pointer += bytesToRead;

                result = memoryStream.ToArray();

                memoryStream.Release();
                socketState.PacketReader.BufferStream = null;

                return true;
            }
        }

This method simply reads the buffer and stores the data on the BufferStream, which is a MemoryStream that was pulled from the pool. PacketReader.BytesLeft is determined based on what part of the reading process is currently executing. If the read is complete for the packet, the BufferStream is released back to the pull and the method returns true. The result is returned via the out parameter. PacketReader.OnHandle method:
        internal void OnHandle(byte[] buffer, BaseSocketState socketState)
        {
            m_Buffer = buffer;

            m_SocketState = socketState;

            StatusHandler.GenerateMessage(0x801A, socketState, socketState.ProtocolType, socketState.PacketReader.Id, socketState.PacketReader.Length, socketState.PacketReader.TotalSize);

            if (m_Buffer != null)
            {
                bool decryptData = false, decompressData = false;

                if (PacketInfo.UsePacketOptions)
                {
                    decryptData = socketState.HasValidCrypto() && EncryptionEnabled;
                    decompressData = CompressionEnabled;

                }
                else
                {
                    decryptData = socketState.HasValidCrypto() && PacketInfo.UseEncryption;
                    decompressData = PacketInfo.UseCompression;
                }

                if (decompressData)
                {
                    m_Buffer = Compression.Decompress(m_Buffer);
                }

                if (decryptData)
                {
                    m_Buffer = Encryption.Decrypt(m_Buffer, socketState);
                }
            }

            PacketInfo.PacketOnHandle(socketState, this);

            Reset();
        }

Basically this takes the data and passes it into the PacketInfo.PacketOnHandle method. PacketInfo is a class that is created at the start of the engine as each packet is registered. It contains all non-changing information for a packet and also contains an instance of the packet so that the packet's OnHandle method can be executed. Instead of creating the packet to store the info on and calling to it's OnHandle method, this increases performance since no objects need to be created during the entire read process. PacketInfo.PacketOnHandle method:
        public void PacketOnHandle(BaseSocketState socketState, PacketReader reader)
        {
            if (!IsNullPacketInfo)
            {
                m_PacketInstance.OnHandle(socketState, reader);
            }
        }
As you can see it just calls the packet's OnHandle method. Here's an example of what this looks like... PingPacket.OnHandle method:
        public override void OnHandle(BaseSocketState socketState, PacketReader reader)
        {
            socketState.SendPacket(typeof(TCPPackets.PongPacket));
        }
SendPacket is just simply a wrapper for the CreatePacket method which calls to Close(true) to send it immediately. That is the extent of the sending and receiving methods. There is nothing from what I can tell that would prevent any of the I/O callbacks from completing and keeping them held in memory. In fact if that were the case, the I/O would fail immediately. I can't seem to figure out why all these objects remain on the heap. Any ideas? [Edited by - Zahlman on February 7, 2010 6:20:41 PM]

C# networking library issue

01 March 2009 - 03:21 PM

I'm writing a network library in C# 3.5 in which I will be using it for various apps, including a RunUO application I'm working on. I'm having a problem with when I throttle the transfer speed (I understand the way I'm throttling this is probably not most efficient). Throttling of the speed is done by calculating how much data to send per second then sending that much data and waiting an entire second before sending more data. I understand this creates more overhead because it defeats coaelescing. The old problem I had was for example if I send a bunch of 32KB packets (max size I am allowing is 32KB), if I throttle the transfer speed to say 20KB/sec, the old method I used would simply send 20KB, then 12KB, 20KB, 12KB, etc because it would send the rest of the packet before reading the next packet and merging the stream to make it 20KB. The problem I have now is when I combine the remains of one packet with the next packet to get the desired length to send, it does not function correctly. The way I have it done is it checks to see if the packet length is less than the buffer size, and if so, store the buffer in a local class variable called m_LeftOverBuffer and call the SendData() method again which will read the next chunk of data and if there is anything stored in the m_LeftOverBuffer, it will be written to a MemoryStream and then the contents of the next chunk of data is written to the stream as well. Then the stream is dumped into a new byte array. If the new byte array length is greater than the buffer size, then it will read up to the buffer size amount of data and store it in the buffer to be transmitted and what is left over is stored again on the m_LeftOverBuffer. The results are incomplete or incorrect data being transmitted to the other side. What I'm working on is a RunUO application (called MegaSpawner) that will basically control the spawning for the server in an external application. To administer the system, you need to connect to it via the MegaSpawner Client. As a part of the connection processs with the client to the RunUO server, the communication breaks down as follows: 1. After a successful login to the shard, the client sends over a request packet for all spawning info to be sent to it (this includes all the data for all of the MegaSpawners, and all of the mobile and item type names). 2. Server receives packet and gathers all of the info for the MegaSpawners and sends it over throttling it at 1KB/sec (just for testing purposes). 3. Client receives spawner data and sends a request packet for mobile type info. 4. Server receives packet and sends all mobile type info throttling at 1KB/sec. 5. Client receives mobile type info and sends request for all item type info. 6. Server receives packet and sends all item type info throttling at 1KB/sec. If I don't throttle the transfer speed, there are never any problems. What ends up happening is most of the time there is a breakdown between steps 2 and 3 above. The server reports that it sent the data but the client never received all of it and is still waiting for more data to be sent. Sometimes it gets past that and breaks down between steps 4 and 5. Similar results occur where the client doesn't receive all of the data. Sometimes the client does receive all of the data it was expecting but it was sent out of order resulting in a failure to decrypt the data. Here is the code:
        internal void Send(Packet packet, ushort transferSpeed, string socketName, bool commandQueueingEnabled)
        {
            DataStreamItem dataStreamItem = new DataStreamItem();

            dataStreamItem.Packet = packet;
            dataStreamItem.SocketName = Name;
            dataStreamItem.CommandQueueingEnabled = commandQueueingEnabled;
            dataStreamItem.SetSpeed(transferSpeed);
            dataStreamItem.ByteQueue.Enqueue(packet.Process());

            Send(dataStreamItem);
        }

        internal void Send(DataStreamItem dataStreamItem)
        {
            lock (m_LeftSyncRoot)
            {
                m_BytesLeftToSend += dataStreamItem.ByteQueue.TotalBytes;
            }

            bool sendNow = false;

            lock (m_SendQueue)
            {
                if (m_SendQueue.Count == 0)
                {
                    sendNow = true;
                }

                m_SendQueue.Enqueue(dataStreamItem);
            }

            if (sendNow && !m_Transmitting)
            {
                m_Transmitting = true;

                SendData();
            }
        }

        private void SendData()
        {
            DataStreamItem dataStreamItem = null;

            lock (m_SendQueue)
            {
                if (m_SendQueue.Count > 0)
                {
                    dataStreamItem = m_SendQueue.Peek();

                    if (dataStreamItem.ByteQueue.Count == 0)
                    {
                        m_SendQueue.Dequeue();

                        if (m_SendQueue.Count > 0)
                        {
                            dataStreamItem = m_SendQueue.Peek();
                        }
                        else
                        {
                            dataStreamItem = null;
                        }
                    }
                }
            }

            if (dataStreamItem != null)
            {
                // If throttling data to send at a specific rate, then enqueue into the throttle queue if CurrentSends exceeds SendsPerInterval.
                if (dataStreamItem.SendsPerInterval > 0 && m_CurrentSends >= dataStreamItem.SendsPerInterval)
                {
                    Enqueue(this);

                    return;
                }

                try
                {
                    if (Connected)
                    {
                        byte[] buffer = dataStreamItem.ByteQueue.Dequeue();

                        if (IsCoreSocket || dataStreamItem.TransferSpeed == 0)
                        {
                            Socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnSent), dataStreamItem);
                        }
                        else // The below code is responsible for merging of packets.
                        {
                            bool waitSend = false;

                            int bufferSize = dataStreamItem.ByteQueue.BufferSize;

                            if (m_LeftOverBuffer.Length > 0)
                            {
                                MemoryStream memoryStream = StreamPool.Acquire();

                                memoryStream.Write(m_LeftOverBuffer, 0, m_LeftOverBuffer.Length);
                                memoryStream.Write(buffer, 0, buffer.Length);

                                byte[] newBuffer = memoryStream.ToArray();

                                StreamPool.Release(memoryStream);

                                if (newBuffer.Length > bufferSize)
                                {
                                    buffer = new byte[bufferSize];

                                    int leftOver = newBuffer.Length - bufferSize;

                                    m_LeftOverBuffer = new byte[leftOver];

                                    Array.Copy(newBuffer, buffer, bufferSize);
                                    Array.Copy(newBuffer, bufferSize, m_LeftOverBuffer, 0, leftOver);
                                }
                                else
                                {
                                    m_LeftOverBuffer = new byte[0];

                                    buffer = newBuffer;
                                }
                            }

                            lock (m_SendQueue)
                            {
                                if (m_SendQueue.Count > 1 && buffer.Length < bufferSize) // SendQueue will still contain current DataStreamItem.
                                {
                                    m_LeftOverBuffer = buffer;

                                    waitSend = true;
                                }
                            }

                            if (!waitSend)
                            {
                                m_CurrentSends++;

                                Socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnSent), dataStreamItem);
                            }
                            else
                            {
                                CheckData(dataStreamItem, false);

                                SendData();
                            }
                        }
                    }
                    else // If connection is lost in the middle of sending and command queueing is enabled, queue the packets into the command queue.
                    {
                        if (GlobalSettings.CommandQueueing.UseCommandQueueing)
                        {
                            lock (m_SendQueue)
                            {
                                if (dataStreamItem.CommandQueueingEnabled)
                                {
                                    n_Connection.n_Controller.EnqueueCommand(n_Connection.Guid, dataStreamItem.Packet,
                                        dataStreamItem.TransferSpeed, dataStreamItem.SocketName);
                                }

                                if (m_SendQueue.Count > 0)
                                {
                                    while (m_SendQueue.Count > 0)
                                    {
                                        dataStreamItem = m_SendQueue.Dequeue();

                                        if (dataStreamItem.CommandQueueingEnabled)
                                        {
                                            n_Connection.n_Controller.EnqueueCommand(n_Connection.Guid, 
                                                dataStreamItem.Packet, dataStreamItem.TransferSpeed, dataStreamItem.SocketName);
                                        }
                                    }
                                }
                            }
                        }

                        return;
                    }
                }
                catch
                {
                    Disconnect(null);

                    return;
                }
            }
            else
            {
                m_Transmitting = false;
            }
        }




Hopefully the above code should be sufficient enough to see what is going on. Otherwise I can post more of it. I'm probably making an easy mistake and just not realizing it... I appreciate any insight anybody has. EDIT: Forgot to mention that I'm using the TCP protocol. [Edited by - Morxeton on March 2, 2009 8:28:57 AM]

PARTNERS