C# "Memory leak" issue

Started by
6 comments, last by Pasparty 12 years, 3 months ago
WARNING: This is a loooooooong post... I am working on a network library written in C# using the new async sockets methods in .NET 3.5 (SendAsync, ReceiveAsync, etc.). The library supports TCP and will eventually support UDP. I've rewritten the project from scratch a few months ago and I've now gotten to a point to begin some stress testing. My stress test consists of a test client and a test server where the client makes 200 connections to the server. The connections authenticate then I just let the server send ping packets (custom TCP packets) to the client in which the client responds with a pong packet (lets the server know the connection is alive and to stop the idle disconnect timer). The ping packets are being sent across all 200 connections once every second. After several minutes of testing, I've noticed that the resource usage in task manager is exceptionally high. I know using task manager isn't necessarily accurate for determining how much memory a .NET app is using, but if you watch a .NET process, you will usually see the memory climb and fall as GC is doing its job. In my case, the memory started out at around 30 MB (in debug mode in VS 2008), and by the time I decided to stop the process and debug, it was at around 380 MB. I used WinDbg to take a peek at the heap, and I noticed an awful lot of thread objects on the heap:

000007feefe5a8f0    57914      1389936 System.Threading.Overlapped
000007feefe27a90    59351      1899232 System.WeakReference
000007feee382ab8    59315      2372600 System.Windows.Forms.WindowsFormsSynchronizationContext
000007feefe393c8    57920      3706880 System.Threading.IOCompletionCallback
000007feefe28a28    59009      3776576 System.Threading.ContextCallback
000007feefa094d8    57600      6912000 System.Threading.OverlappedData
000007feefe32cb8   117514      8461008 System.Runtime.Remoting.Messaging.LogicalCallContext
000007feefe29c40   117569      8464968 System.Threading.ExecutionContext
000007feef820048    59006     24546496 System.Net.Sockets.SocketAsyncEventArgs

The 2nd column above is the number of objects. As you can see, that's A LOT of objects. It seems like the callbacks are not completing when you take a look at the heap, but the application is not failing by any means. All 200 ping packets make it across and 200 pong packets back across. Task manager shows only 25 threads being used. I have googled all over the place the past few days after discovering this issue and I have not been able to find any good answers as of yet. I'll post code from the library that is relevant to the issue. I'll start from the sender side. Here's how a packet is created (this is an extension method for the socketState):

        public static PacketWriter CreatePacket(this BaseSocketState socketState, Type type, params object[] data)
        {
            byte id = 0x00;

            if (!PacketInfo.GetPacketId(type, out id))
            {
                StatusHandler.GenerateMessage(0x0034, socketState, type.Name);

                return null;
            }

            BasePacket packet = (BasePacket)Factory.CreateInstance(type)();

            packet.PacketInfo = BaseController.GetPacketInfo(socketState, id);

            if (packet.IsStaticLength)
            {
                packet.Buffer = new byte[packet.PacketInfo.StaticLength];
            }

            PacketWriter writer = PacketWriterPool.Acquire();

            writer.Set(packet, socketState);

            if (data.Length > 0)
            {
                writer.MultiWrite(data);
            }

            return writer;
        }

A new instance of the packet is created based off of the type using a custom object creation method. A PacketWriter (which is just a wrapper for the packet's buffer) is acquired from a pool and then assigned the packet and the socket in which the packet is to traverse. The writer is returned and therefore can be used to write data to the buffer (i.e. writer.Write(object)). When the writer is closed, the packet is returned. Passing in a boolean of true into the close method will send the packet upon closing. The Close method in PacketWriter:

        public BasePacket Close(bool transmitAfterClose)
        {
            if (m_Packet == null)
            {
                StatusHandler.GenerateMessage(0x0038, m_SocketState, "Close");

                this.Release();

                return null;
            }
            else if (m_Packet.Status == PacketStatus.Invalid)
            {
                StatusHandler.GenerateMessage(0x003D, m_SocketState, PacketInfo.ProtocolType, PacketInfo.Id);

                this.Release();

                return null;
            }
            else if (m_Packet.IsStaticLength && m_Packet.Buffer.Length != m_BufferStream.Length)
            {
                StatusHandler.GenerateMessage(0x003E, m_SocketState, PacketInfo.ProtocolType, PacketInfo.Id, m_BufferStream.Length, m_Packet.Buffer.Length);

                m_Packet.Status = PacketStatus.Invalid;

                this.Release();

                return null;
            }

            byte[] data = m_BufferStream.ToArray();

            m_BufferStream.SetLength(0);
            m_BufferStream.Seek(0, SeekOrigin.Begin);

            m_BufferStream.WriteByte(m_Packet.Id);

            bool encryptData = false, compressData = false;

            if (PacketInfo.UsePacketOptions)
            {
                m_BufferStream.WriteByte((byte)PacketOptions);

                encryptData = m_SocketState.HasValidCrypto() && EncryptionEnabled;
                compressData = CompressionEnabled;
            }
            else
            {
                encryptData = m_SocketState.HasValidCrypto() && PacketInfo.UseEncryption;
                compressData = PacketInfo.UseCompression;
            }

            if (encryptData)
            {
                data = Encryption.Encrypt(data, m_SocketState);
            }

            if (compressData)
            {
                data = Compression.Compress(data);
            }

            if (!m_Packet.IsStaticLength)
            {
                byte[] lengthBytes = Support.GetBytes((ushort)data.Length);

                m_BufferStream.Write(lengthBytes, 0, lengthBytes.Length);
            }

            m_BufferStream.Write(data, 0, data.Length);

            m_Packet.Buffer = m_BufferStream.ToArray();

            m_BufferStream.Release();
            m_BufferStream = null;

            m_Packet.Status = PacketStatus.Sealed;

            BasePacket packet = m_Packet;

            BaseSocketState socketState = m_SocketState;

            this.Release();

            if (transmitAfterClose)
            {
                socketState.Send(packet);
            }

            return packet;
        }

The m_BufferStream is a MemoryStream that was acquired from the MemoryStreamPool at time of construction. this.Release() calls to an extension method in the PacketWriterPool that simply resets the PacketWriter (returns m_BufferStream to the MemoryStreamPool, nulls all references, etc) and returns it to the PacketWriterPool. Now that we have our packet, it can be sent if it hasn't already. Now moving onto the socketState.Send(packet) method which is an extension method:

        public static void Send(this BaseSocketState socketState, BasePacket packet)
        {
            if (!socketState.Connected || socketState.IsDisposing)
            {
                socketState.Transmitting = false;

                return;
            }

            if (packet.Status != PacketStatus.Sealed)
            {
                StatusHandler.GenerateMessage(0x0036, socketState, packet.ProtocolType, packet.Id, packet.Status);

                return;
            }

            bool sendNow = false;

            lock (socketState.PacketQueue)
            {
                if (socketState.PacketQueue.Count == 0)
                {
                    sendNow = true;
                }

                socketState.PacketQueue.Enqueue(packet);
            }

            if (sendNow && !socketState.Transmitting)
            {
                socketState.Transmitting = true;

                socketState.Send();
            }
        }

The packet enters a queue and if the socket is not currently transmitting, Send() will be kicked off to start processing the queue. If it is currently transmitting, the queue will be handled after each send has completed. The Send() method:

        internal void Send()
        {
            if (!Connected || IsDisposing)
            {
                m_Transmitting = false;

                return;
            }

            SetSocketArgs(true);

            BasePacket packet = null;

            lock (m_PacketQueue)
            {
                if (m_PacketQueue.Count > 0)
                {
                    packet = m_PacketQueue.Peek();
                }
                else
                {
                    m_Transmitting = false;
                }
            }

            if (packet != null)
            {
                if (m_TransferSpeed == 0)
                {
                    m_LastPacketSent = packet;

                    Send(packet.Buffer);
                }
                else
                {
                    m_SpeedThrottleTimer.Packet = packet;
                    m_SpeedThrottleTimer.Start();
                }
            }
        }

I created a speed throttling mechanism making use of a custom timer system, in which I will not be going into since this stress test is not focused on that. As you can see the PacketQueue is Peek()ed to grab the next packet but does not yet Dequeue and remove the packet. That is done after the packet was successfully sent, so if the connection is lost the remaining packets in the queue will be repackaged into another queue that is then passed into an event. This allows the application to possibly transmit the packets after the connection is restored. SetSocketArgs(bool) method:

        internal void SetSocketArgs(bool sendArgs)
        {
            if (sendArgs)
            {
                m_SendArgs = m_ArgsPool.Acquire();

                if (m_SendArgs != null)
                {
                    m_SendArgs.UserToken = this;
                }
            }
            else
            {
                m_RecvArgs = m_ArgsPool.Acquire();

                if (m_RecvArgs != null)
                {
                    m_RecvArgs.AssignBuffer();
                    m_RecvArgs.UserToken = this;

                    if (ProtocolType == ProtocolType.Udp)
                    {
                        m_RecvArgs.RemoteEndPoint = m_RemoteEndPoint;
                    }
                }
            }
        }

There is a pool for socket args as well. If they are receive args, the buffer is assigned a portion of a large preallocated byte array to reduce fragmentation and increase performance. The Send(byte[]) method:
        private void Send(byte[] buffer)
        {
            if (m_SendArgs != null)
            {
                m_SendArgs.SetBuffer(buffer, 0, buffer.Length);

                SendData();
            }
        }
The send buffer is assigned then the SendData() method, which is the method that FINALLY sends the data to the other side, is executed. All previous Send methods except for the Send(packet) extension method reside in a BaseSocketState class. SendData is a virtual method which is overridden by TCPSocketState and UDPSocketState, since sending data via TCP and UDP are different. SendData() from TCPSocketState:
        protected override void SendData()
        {
            try
            {
                if (!Socket.SendAsync(m_SendArgs))
                {
                    OnSent(this, m_SendArgs);
                }
            }
            catch { }
        }
SendAsync returns false if it processed synchronously, in which then you must call the method directly for post-send handling. OnSent method:

        protected static void OnSent(object sender, SocketAsyncEventArgs e)
        {
            BaseSocketState socketState = (BaseSocketState)e.UserToken;

            if (socketState == null)
            {
                return;
            }

            socketState.m_OnSent = true;

            socketState.m_LastActivityTime = DateTime.Now;

            int bytesTransferred = e.BytesTransferred;

            socketState.m_TotalBytesSent += bytesTransferred;

            if (socketState.m_ClientState != null)
            {
                socketState.m_ClientState.TotalBytesSent += bytesTransferred;
            }

            if (socketState.m_NormalSendMode)
            {
                lock (socketState.m_PacketQueue)
                {
                    if (socketState.m_PacketQueue.Count > 0)
                    {
                        socketState.m_PacketQueue.Dequeue();
                    }
                }

                if (socketState.m_LastPacketSent != null)
                {
                    StatusHandler.GenerateMessage(0x801B, socketState, socketState.ProtocolType, socketState.m_LastPacketSent.Id, socketState.m_LastPacketSent.Buffer.Length -
                        socketState.m_LastPacketSent.PacketInfo.HeaderSize, socketState.m_LastPacketSent.Buffer.Length);

                    socketState.m_LastPacketSent.OnAfterSent(socketState);
                }

                socketState.m_OnSent = false;

                socketState.Send();
            }
            else
            {
                lock (socketState.m_SpeedThrottleTimer.SentPackets)
                {
                    while (socketState.m_SpeedThrottleTimer.SentPackets.Count > 0)
                    {
                        BasePacket packet = socketState.m_SpeedThrottleTimer.SentPackets.Dequeue();

                        StatusHandler.GenerateMessage(0x801B, socketState, socketState.ProtocolType, packet.Id, packet.Buffer.Length - packet.PacketInfo.HeaderSize, packet.Buffer.Length);

                        packet.OnAfterSent(socketState);
                    }
                }

                socketState.m_OnSent = false;

                socketState.ThrottledSend();
            }
        }

m_OnSent is set to true here in case a packet's OnAfterSent method calls to disconnect the socket. The SocketState's dispose method will not wait for m_Transmitting to become false in this case, otherwise it would result in a deadlock. m_NormalSendMode just means that the socket is not currently throttling the speed in which the packets are transmitted. The PacketQueue is Dequeued to remove the packet that was just sent then Send() is called again to see if there are more packets in the queue to be sent. That is pretty much from the sending side of things without going into too much of the rest of the code. That is a lot in itself already. Now here's the receiving side of things... Receive() method:
        public static void Receive(this BaseSocketState socketState)
        {
            socketState.SetSocketArgs(false);
            socketState.ReceiveData();
        }
After a connection is established, this method is called to wait for incoming data. ReceiveData() method in TCPSocketState:
        internal override void ReceiveData()
        {
            try
            {
                if (m_RecvArgs != null)
                {
                    if (!Socket.ReceiveAsync(m_RecvArgs))
                    {
                        OnReceived(this, m_RecvArgs);
                    }
                }
            }
            catch { }
        }
This is nearly identical to the SendData() method and simply starts the wait to receive process. OnReceived method:

        protected static void OnReceived(object sender, SocketAsyncEventArgs e)
        {
            BaseSocketState socketState = (BaseSocketState)e.UserToken;

            if (socketState == null)
            {
                return;
            }

            socketState.m_LastActivityTime = DateTime.Now;

            int bytesTransferred = e.BytesTransferred;

            if (bytesTransferred > 0)
            {
                MemoryStream memoryStream = MemoryStreamPool.Acquire();

                memoryStream.Write(e.Buffer, e.Offset, bytesTransferred);

                socketState.m_InboundBuffer = memoryStream.ToArray();

                memoryStream.Release();

                socketState.m_InboundBytesRead = bytesTransferred;
                socketState.m_TotalBytesRecv += bytesTransferred;

                if (socketState.m_ClientState != null)
                {
                    socketState.m_ClientState.TotalBytesRecv += bytesTransferred;
                }

                lock (socketState.lo_ReadData)
                {
                    socketState.Read();
                }

                socketState.Receive();
            }
            else
            {
                socketState.Disconnect(false);
            }
        }

The data is pulled from the buffer, temporarily stored in a MemoryStream from the pool, then stored in the InboundBuffer for processing. The object lo_ReadData (lo for lock object) is locked here and also in the Dispose method, so if a socket is in the middle of reading and Dispose is called, it won't throw an exception as Dispose will have to wait for the read to finish first. Read() method:

        internal static void Read(this BaseSocketState socketState)
        {
            if (!socketState.Connected || socketState.IsDisposing)
            {
                return;
            }

            int readAttempts = 0, maxReadAttempts = socketState.InboundBytesRead + 1;

            // Loop while the pointer is less than the amount of bytes to be read.
            while (socketState.Pointer < socketState.InboundBytesRead)
            {
                if (readAttempts > maxReadAttempts)
                {
                    StatusHandler.GenerateMessage(0x0037, socketState);

                    socketState.Disconnect(true);

                    break;
                }

                PacketReader packetReader = socketState.PacketReader;

                switch (packetReader.ReadStatus)
                {
                    case PacketReadStatus.Id:
                        {
                            // Read the first byte of incoming data which is to be the packet Id.
                            byte id = socketState.InboundBuffer[socketState.Pointer++];

                            packetReader.PacketInfo = BaseController.GetPacketInfo(socketState, id);

                            // The packet Id that was read is invalid.  An attempt to read the rest of the data will still be made but will not be processed.
                            if (packetReader.PacketInfo.IsNullPacketInfo)
                            {
                                StatusHandler.GenerateMessage(0x001C, socketState, socketState.ProtocolType, id);
                            }

                            // If a static packet is received with a length of zero, nothing else will be read so pass the packet to the handler.
                            if (packetReader.PacketInfo.IsSingleBytePacket)
                            {
                                packetReader.OnHandle(socketState);
                            }
                            else
                            {
                                if (socketState.ProtocolType == ProtocolType.Tcp)
                                {
                                    packetReader.ReadStatus = PacketReadStatus.Options;
                                }
                                else
                                {
                                    packetReader.ReadStatus++;
                                }
                            }

                            break;
                        }
                    case PacketReadStatus.Sequence:
                        {
                            // TODO: Finish UDP implementation.

                            break;
                        }
                    case PacketReadStatus.ACKs:
                        {
                            // TODO: Finish UDP implementation.

                            break;
                        }
                    case PacketReadStatus.Options:
                        {
                            if (packetReader.PacketInfo.UsePacketOptions)
                            {
                                // Read the next byte of incoming data which is to be the PacketOptions.
                                packetReader.PacketOptions = (PacketOptions)socketState.InboundBuffer[socketState.Pointer++];
                            }

                            packetReader.ReadStatus++;

                            break;
                        }
                    case PacketReadStatus.Length:
                        {
                            bool proceed = false;

                            if (!packetReader.PacketInfo.IsStaticLength)
                            {
                                byte[] result = null;

                                if (socketState.ReadBuffer(out result))
                                {
                                    packetReader.Length = BitConverter.ToUInt16(result, 0);

                                    proceed = true;
                                }
                            }
                            else
                            {
                                ushort length = packetReader.PacketInfo.StaticLength;

                                if (socketState.HasValidCrypto())
                                {
                                    bool encryptionEnabled = false;

                                    if (packetReader.PacketInfo.UsePacketOptions)
                                    {
                                        encryptionEnabled = packetReader.EncryptionEnabled;
                                    }
                                    else
                                    {
                                        encryptionEnabled = packetReader.PacketInfo.UseEncryption;
                                    }

                                    if (encryptionEnabled)
                                    {
                                        length = (ushort)socketState.CryptoInfo.GetEncryptedLength(length);
                                    }
                                }

                                packetReader.Length = length;

                                proceed = true;
                            }

                            if (proceed)
                            {
                                if (packetReader.Length > 0)
                                {
                                    packetReader.ReadStatus++;
                                }
                                else
                                {
                                    packetReader.OnHandle(socketState);
                                }
                            }

                            break;
                        }
                    case PacketReadStatus.Buffer:
                        {
                            byte[] result = null;

                            if (socketState.ReadBuffer(out result))
                            {
                                packetReader.OnHandle(result, socketState);
                            }

                            break;
                        }
                }

                readAttempts++;
            }

            socketState.Reset();
        }

This method first reads the packet id from the buffer and looks it up. A PacketReader instance, which is stored on the BaseSocketState, temporarily holds the data that will be handled after all data for the packet has been read. If using packet options (used for turning on/off encryption and compression on the fly), that byte is read. If the packet is a static packet, no length will be read. Finally the buffer is read if there is anything to read. ReadBuffer method:

        private static bool ReadBuffer(this BaseSocketState socketState, out byte[] result)
        {
            int bytesToRead = socketState.PacketReader.BytesLeft;

            if (socketState.PacketReader.BufferStream == null)
            {
                socketState.PacketReader.BufferStream = MemoryStreamPool.Acquire();
            }

            MemoryStream memoryStream = socketState.PacketReader.BufferStream;

            result = null;

            if (socketState.Pointer > socketState.InboundBytesRead)
            {
                StatusHandler.GenerateMessage(0x001D, socketState, socketState.PacketReader.Id);

                socketState.Reset();

                return false;
            }

            int bytesLeft = socketState.InboundBytesRead - socketState.Pointer;

            if (bytesToRead > bytesLeft)
            {
                memoryStream.Write(socketState.InboundBuffer, socketState.Pointer, bytesLeft);

                socketState.Pointer += bytesLeft;

                return false;
            }
            else
            {
                memoryStream.Write(socketState.InboundBuffer, socketState.Pointer, bytesToRead);

                socketState.Pointer += bytesToRead;

                result = memoryStream.ToArray();

                memoryStream.Release();
                socketState.PacketReader.BufferStream = null;

                return true;
            }
        }

This method simply reads the buffer and stores the data on the BufferStream, which is a MemoryStream that was pulled from the pool. PacketReader.BytesLeft is determined based on what part of the reading process is currently executing. If the read is complete for the packet, the BufferStream is released back to the pull and the method returns true. The result is returned via the out parameter. PacketReader.OnHandle method:

        internal void OnHandle(byte[] buffer, BaseSocketState socketState)
        {
            m_Buffer = buffer;

            m_SocketState = socketState;

            StatusHandler.GenerateMessage(0x801A, socketState, socketState.ProtocolType, socketState.PacketReader.Id, socketState.PacketReader.Length, socketState.PacketReader.TotalSize);

            if (m_Buffer != null)
            {
                bool decryptData = false, decompressData = false;

                if (PacketInfo.UsePacketOptions)
                {
                    decryptData = socketState.HasValidCrypto() && EncryptionEnabled;
                    decompressData = CompressionEnabled;

                }
                else
                {
                    decryptData = socketState.HasValidCrypto() && PacketInfo.UseEncryption;
                    decompressData = PacketInfo.UseCompression;
                }

                if (decompressData)
                {
                    m_Buffer = Compression.Decompress(m_Buffer);
                }

                if (decryptData)
                {
                    m_Buffer = Encryption.Decrypt(m_Buffer, socketState);
                }
            }

            PacketInfo.PacketOnHandle(socketState, this);

            Reset();
        }

Basically this takes the data and passes it into the PacketInfo.PacketOnHandle method. PacketInfo is a class that is created at the start of the engine as each packet is registered. It contains all non-changing information for a packet and also contains an instance of the packet so that the packet's OnHandle method can be executed. Instead of creating the packet to store the info on and calling to it's OnHandle method, this increases performance since no objects need to be created during the entire read process. PacketInfo.PacketOnHandle method:
        public void PacketOnHandle(BaseSocketState socketState, PacketReader reader)
        {
            if (!IsNullPacketInfo)
            {
                m_PacketInstance.OnHandle(socketState, reader);
            }
        }
As you can see it just calls the packet's OnHandle method. Here's an example of what this looks like... PingPacket.OnHandle method:
        public override void OnHandle(BaseSocketState socketState, PacketReader reader)
        {
            socketState.SendPacket(typeof(TCPPackets.PongPacket));
        }
SendPacket is just simply a wrapper for the CreatePacket method which calls to Close(true) to send it immediately. That is the extent of the sending and receiving methods. There is nothing from what I can tell that would prevent any of the I/O callbacks from completing and keeping them held in memory. In fact if that were the case, the I/O would fail immediately. I can't seem to figure out why all these objects remain on the heap. Any ideas? [Edited by - Zahlman on February 7, 2010 6:20:41 PM]
Advertisement
Since it could be caused from numerous things, it'd be easier to take a look at if you upload a working copy with the issue so we can test it out. :)
NetGore - Open source multiplayer RPG engine
I would suggest using windbg and getting familiar with it. WIll help you track down memory issues
The more applications I write, more I find out how less I know
I have been using WinDbg, but you're right I do need to familiarize myself with it a bit more. I figured out the problem, though. It was with the Send() method where it would call to SetSocketArgs and acquire another SocketAsyncEventArgs from the pool even if the SocketState was already in a transmitting state. I removed the call to SetSocketArgs in both Send() and ThrottledSend() and placed it in the Send(byte[]) method which is where the data actually gets pushed out over the socket. After that, no more leakage. I feel like a noob for overlooking this!

I also discovered another problem as well. If sending or receiving completed synchronously, they would call to OnSent or OnReceived directly and not call to OnCompletion, which is where the SocketAsyncEventArgs gets pushed back into the pool and if receiving, the buffer is released.
For future reference, using [source] tags will make your post much easier to read as it provides a scrolling, syntax-highlighted box.
[TheUnbeliever]
Quote:Original post by TheUnbeliever
For future reference, using source tags will make your post much easier to read as it provides a scrolling, syntax-highlighted box.

Thanks for the info. I wrapped tags around it and noticed it didn't work. I wasn't sure what the tags were for it.
Quote:Original post by Morxeton
Quote:Original post by TheUnbeliever
For future reference, using source tags will make your post much easier to read as it provides a scrolling, syntax-highlighted box.

Thanks for the info. I wrapped tags around it and noticed it didn't work. I wasn't sure what the tags were for it.


I fixed it for you. Get it right next time, OK?

(And in case you didn't realize, you can edit your posts here.)
[color=#333333][font=arial, sans-serif]

Try [/font][color=#333333][font=arial, sans-serif]

to use [/font][color=#333333][font=arial, sans-serif]

deleaker[/font][color=#333333][font=arial, sans-serif]

.[/font][color=#333333][font=arial, sans-serif]

if [/font][color=#333333][font=arial, sans-serif]

there [/font][color=#333333][font=arial, sans-serif]

a memory or gdi leak[/font][color=#333333][font=arial, sans-serif]

,[/font][color=#333333][font=arial, sans-serif]

deleaker [/font][color=#333333][font=arial, sans-serif]

quickly [/font][color=#333333][font=arial, sans-serif]

finds it ....[/font]

This topic is closed to new replies.

Advertisement