More Crazy Infrastructure.

posted in Septopus for project Unsettled World
Published November 07, 2018
Advertisement

The past few days I've been working on figuring out how the "peer-to-peer" bits of my game network are going to work.  For now I'm testing with MQTT, because it's light and fast and reliable, but you could pretty much replace MQTT with ZeroMQ or just some raw TCP/UDP sockets code or anything else that fits the bill.  In order to build the most robust gaming environment I can I want to provide the capabilities to the player/community to help build out the games network infrastructure.  Allowing a player to opt-into hosting a relay node, or simply enabling the built in relay features in their clients.  In that spirit, I'm going for something of a distributed mesh type topology with some rudimentary "routing" code built into the nodes themselves.  For MQTT, I'm planning to use specific subscription topics that are universal throughout the system like node names or etc, so that the "Relays" or "Routers" can determine which channel(s) to put the data on based on latency/etc.  Obviously, if I'm going to use MQTT, there aren't really any out of the box solutions for what I'm trying to achieve.  Additionally, I'll be wanting to authenticate the MQTT network based on data in my game server cluster(probably via Redis), so it'll be a custom code solution using somebody else's brilliant netcode. ;)

Below you will see a screenshot of my first performance tests of my base functional MQTTnet server & a simple mqtt client hammering away at it.  The server is .net core running on a linux VM on my local machine.  The client is using MQTTnet as well, running on the windows client.

mqtttest.thumb.png.f422051317e057b54f095a2d5b2548b3.png

The client here is sending batches of 10000 messages to the server, it's also subscribed to the same topic, so it waits until it gets all the sent messages back and then outputs the time it took in ms.

The messages are set to a topic of "LoadTestinAllDayLong" and the message is just a random GUID.  So, it looks like right now it's pushing about 2000 messages per second, running over TCP, with reliability(on a 4core 4gb virt essentially over a loopback)..  Not too bad at all, can't wait to test it over a real network though.

Here's a quick sketch of the idea as it stands, broad strokes again..

mqttnetwork.png.83d018ed993edb6331b7f51bfd847692.png

Keep in mind now, this won't be the primary game network, it will be used for localized simulations, direct peer-to-peer activities, or alternative channels for transient data to be moved around the network.

Server Source:


using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;
using MQTTnet.Protocol;
using MQTTnet.Diagnostics;

namespace UWMqtt
{
    class Program
    {
        public static async Task Main(string[] args)
        {

            MqttServerOptions options = new MqttServerOptions();
            options.DefaultEndpointOptions.ConnectionBacklog = 10000;
            options.DefaultEndpointOptions.Port = 1883;
            options.DefaultCommunicationTimeout = new TimeSpan(0,0,5);
            
            options.ConnectionValidator = c =>
            {
                if (c.ClientId.Length < 10)
                {
                    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
                    return;
                }
                if (c.Username != "yummyusername")
                {
                   c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;

               return;
                }
                if (c.Password != "chewypasswordstring")
                {
                    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                    return;
                }
                c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
            };


            MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
            {
                if (e.TraceMessage.Level == MqttNetLogLevel.Info || e.TraceMessage.Level == MqttNetLogLevel.Error || e.TraceMessage.Level == MqttNetLogLevel.Warning)
               {
                    String trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
                    if (e.TraceMessage.Exception != null)
                    {
                        trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
                    }
                    Console.WriteLine(trace);
                }
            };

            IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
            await mqttServer.StartAsync(options);
            Console.WriteLine("Press any key to exit.");
            Console.ReadLine();
            await mqttServer.StopAsync();

        }
    }
}

Client Source:


using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;

namespace mqttClientHammer
{
    class Program
    {
        public static bool running = true;
        static long mrec = 0;
        static long msen = 0;
        public static void Main(string[] args)
        {
            MqttFactory factory = new MqttFactory();
            IMqttClient mqttClient = factory.CreateMqttClient();

            IMqttClientOptions options = new MqttClientOptionsBuilder()
                .WithClientId(Guid.NewGuid().ToString())
                .WithTcpServer("ser.ver.ip.add", 1883)
                .WithCredentials("yummyusername","chewypasswordstring")
            .Build();



            mqttClient.Disconnected += async (s, e) =>
            {
                Console.WriteLine("### DISCONNECTED FROM SERVER ###");
                await Task.Delay(TimeSpan.FromSeconds(1));

                try
                {
                    await mqttClient.ConnectAsync(options);
                }
                catch
                {
                    Console.WriteLine("### RECONNECTING FAILED ###");
                }
            };

            mqttClient.Connected += async (s, e) =>
            {
                Console.WriteLine("### CONNECTED WITH SERVER ###");

                // Subscribe to a topic
                await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("LoadTestinAllDayLong").Build());

                Console.WriteLine("### SUBSCRIBED ###");

                await Task.Factory.StartNew(() =>
                 {
                     publisher(mqttClient);
                 });
            };

            mqttClient.ApplicationMessageReceived += (s, e) =>
            {
                //Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
                //Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
                //Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
                //Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
                //Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
                //Console.WriteLine();
                mrec++;
            };
            // StartAsync returns immediately, as it starts a new thread using Task.Run, 
            // and so the calling thread needs to wait.
            mqttClient.ConnectAsync(options);
            Console.ReadKey();

        }

        static void publisher(IMqttClient mc)
        {
            long lmr = 0;
            Stopwatch sw = new Stopwatch();
            while (running)
            {

                if (mc.IsConnected)
                {
                    sw.Start();
                    Parallel.For(0, 10000, i =>
                    //for (int i = 0; i < 1000; i++)
                    {
                        try
                        {
                            mc.PublishAsync("LoadTestinAllDayLong", Guid.NewGuid().ToString());
                            msen++;
                        }
                        catch (Exception) { }
                    });
                }
                else
                {
                    Console.WriteLine("Load Test Waiting on Reconnect...");
                }
                if (mrec != msen)
                {
                    lmr = mrec;
                    Thread.Sleep(100);
                    while(mrec != lmr)
                    {
                        lmr = mrec;
                        Thread.Sleep(100);
                    }
                }
                Console.WriteLine("MRec: " + mrec.ToString() + " MSen: " + msen + " T: " + (sw.ElapsedMilliseconds).ToString());
                sw.Stop();
                sw.Reset();
                Thread.Sleep(1);
            }
        }
    }
}

 

1 likes 0 comments

Comments

masskonfuzion

Wow! 

November 08, 2018 12:00 AM
Septopus

Now with source code. ;)

The really really really, almost freaky thing about all this?..  When you're looking for a good tutorial on how to write a linux daemon in .net core, the best one I found has a part 2.  And in that part 2 they show you how to turn your vanilla hello world daemon into a functional daemon, and what do they use as their example?  MQTTnet.....  Blew my freakin mind.  And quite handy for me.

Part 1:

https://www.wintellect.com/creating-a-daemon-with-net-core-part-1/

Part 2:

https://www.wintellect.com/creating-a-daemon-with-net-core-part-2/

Great tutorials for writing daemons.  (not the best tutorial for MQTTnet though, for obvious reasons)

For that info I went here:

https://github.com/chkr1011/MQTTnet/wiki

It's kinda hard to find, but I guess it's the official wiki for MQTTnet, the code examples work, so that's all that mattered to me.

November 08, 2018 12:48 AM
Septopus

Also, some of the beauty of MQTTnet, that code above, handles all the disconnect and reconnect behavior without even blinking, set at whatever retry interval you like.  Pretty awesome.

November 08, 2018 01:02 AM
You must log in to join the conversation.
Don't have a GameDev.net account? Sign up!
Advertisement