Jump to content

  • Log In with Google      Sign In   
  • Create Account

Banner advertising on our site currently available from just $5!


1. Learn about the promo. 2. Sign up for GDNet+. 3. Set up your advert!


Trying to get my IOCP code on par with expected performance


Old topic!
Guest, the last post of this topic is over 60 days old and at this point you may not reply in this topic. If you wish to continue this conversation start a new topic.

  • You cannot reply to this topic
22 replies to this topic

#1 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 29 March 2009 - 01:13 PM

The more I learn, the less I realize I really know. I've finally gotten around to learning the IOCP model the past few weeks. I've done extensive research over this time and have consulted the archives here at GameDev a lot. After having reached a reasonable minimal code base that could be tested, I ran into some issues while stress testing it. I didn't quite understand how to test my IOCP code, so I ended up writing dozens of tests that failed. I thought the code was wrong at first, but after doing more research, that does not seem to be the case. Long story short, I was throwing too much data at it expecting it to "just work" since it was IOCP. What I've done is found 20 very important quotes from different threads over the years, all from hplus0603 since I needed accurate and reliable information, that best describes game programming on a massive scale. I've linked each quote back to the original thread for future references. Here they are.
Quote:
(Link) IOCP solves the problem of "if I'm using TCP, how can I simultaneously serve 1,000 sockets with some fairness and performance?"
Quote:
(Link) There is no need to have more than one read request (or write request) outstanding for a single socket. The point of IOCP is that you can have many read requests outstanding for many sockets, and efficiently receive data from the sockets that do have data. ... you should just do a non-blocking write/send without IOCP, and let the kernel-side outgoing buffer take care of the asynchronizing.
Quote:
(Link)
Quote:
More then 2000 TCP connection is hard for OS.
These days, if you use the appropriate I/O primitive (IOCP on Windows, various poll/epoll variants on UNIX), 2000 TCP connections isn't so bad. However, if each of them sends commands that require collision testing, 30 times per second, that's hard for the server :-)
Quote:
(Link)
Quote:
WoW has an order of magnitude more players
Be careful to separate number of players per shard ("server") from number of players for the game, total. Even a successful indie usually only runs a single shard, and can probably get the same number of players on that shard as you'd get on a single shard for a commercial game.
Quote:
(Link)
Quote:
Nor is it possible at this scale to have just one thread serving all clients
I'd like to take exception to that particular statement. You can serve all the clients in a single thread just fine, given the proper software architecture. ... (combined replies) ... Ergo, it's often most cost-effective, AND highest performance, to design for single-CPU single-threaded nodes. If you want to scale, add more nodes.
Quote:
(Link) Note that there are two pieces to "clustering" here: 1) How do objects on the servers talk to each other? You need to be custom here, IMO. 2) How do you manage the actual cluster servers? I e, how is software deployed, how do you monitor load, how do you fail-over when a host dies? You can leverage existing stuff here. We built ours on nagios, perl, rsync and ssh, although something more integrated like Beowulf would probably work, too.
Quote:
(Link)
Quote:
I'm looking at doing scaleable networking
Note that all the things you've been talking about -- IOCP, bit packing, etc -- all have to do with simple efficiency improvements. They do not improve the scalability of the system. A system that is scalable has a know path to add capacity to meet load. For example, if one system is totally optimized, and can do 1,000 connected clients per machine, but won't be helped by adding more machines, then that system is not scalable. A system that does 100 connected clients per machine, but you can add as many machines as you want to meet load, is scalable.
Quote:
(Link) One thread per zone allows you to scale to multi-CPU systems for larger worlds. This is assuming that zones are computationally independent (no locking required). One process per zone might be even better, as it allows you to scale to multiple physical machines. It requires the client to re-connect to a different machine when zoning, though.
Quote:
(Link) Regarding players per server, that depends on your degree of security, how much simulation you do, how interactive it is, and all that. If all you're doing is bouncing a few packets a second between players that are close to each other, you can easily go to 2,000 on a single piece of modern hardware. After all, most routers are just regular computers, and do many more packets than that. If you do secure server-authenticated simulation at high simulation rates where each tool, attachment, vehicle, weapon, etc is a fully simulated object, and fully replicated to neighboring servers and clients, and have expensive rules for who can see what, when, and where, then you can cut that in ten.
Quote:
(Link) When it comes to "servers" vs "clusters," I've given up on trying to correct that terminology. 90% of the professionals in the business, and 100% of the customers, say "server" when they mean mean "shard", and I just translate to "cluster" in my head.
Quote:
(Link) When it comes to CPU load, a highly optimized FPS that doesn't allow you to affect the environment (similar to Planetside) may allow you 300-500 players per server CPU. That's making some assumptions on server physics rate, collision complexity, etc. If you allow players to affect the environment, have very complex geometry, a richer movement model, a higher simulation rate, etc, then that number will go down. From what little I've played WoW (a one-month subscription that came with a copy I got), I would say that they do not run the full simulation on the servers.
Quote:
(Link) A game like EverQuest runs about 100-200 players per piece of hardware (at least at the time of shipping -- when a 1 GHz CPU was the top of the line). They have maybe 50 zones in the game. They could put very heavily loaded zones onto their own machines, but put sveral zones that seldom have that many people all on a single machine to save on hardware cost. Thus, an initial EverQuest "server" (really, a world instance, or "shard") would be run by something like 30 machines. ... snip ... If you want all of everything, at high simulation rates, you won't be able to do 2,000 entities on a single machine, for much of the same reasons that you don't have 2,000 simultaneous active NPCs in a single-player game of Half-Life. The easiest way to load balance between physical servers within a single shard is to require the user to "zone" between areas of the game, and put different zones on different machines -- the EQ model. ... snip ...
Quote:
(Link)
Quote:
they could only handle around 200 clients per server, which would increase the hardware costs of your average MMOG by a factor of 10
You don't expect that, just because the server "Little Big" has 2,000 players active, that means it's a single machine, right? EQ runs one process per zone, with sparsely populated zones sharing machines, but heavily populated zones having a machine of their own (this may have changed if they've upgraded the hardware). The Wood-elf starting zone (which is also the Elf starting zone) used to get server-based lag around 150 people or so. (This was years back -- don't know the status these days) As more technology are put into these games, faster machines will mean better collisions and things of that nature, rather than more simultaneous players per machine. 200 players per machine is just fine.
Quote:
(Link) Regarding number of player per server, vs number of players per server machine, such data IS available for some of the games out there. For example, in Game Developer, issue May 2004, article "The Business of EverQuest Revealed," Sony spokespeople say there are 47 open "servers" (== shards) and about 1,500 server machines running those shards. That's a little over 30 machines per shard. They also say their peak simultaneous player volume is 100,000, which, if you divide it on 1,500 PCs is about 70 players per server machine. Yes, a developer may tell you they have 3,000 players per "server," but, as my original point said, that's not 3,000 players per machine. Each "server" in an MMO (more accurately called "shard") is a cluster of multiple server machines; no single server machine does 3,000 simultaneous players for an interactive, real-time, online game. (Text-based MUDs are something different, of course)
Quote:
(Link) Can you serve 10,000 "connections" on a single machine, and somehow claim success? I'm sure. For example, just echoing a position/velocity packet per second per player works fine that way. Also, a text-based MUD, where each player issues a command once every ten seconds probably could get away with that. Can you do that while running object interaction simulation, collision detection, weapons statistics, inventory, and all the other things that go into a game? Nope! No-where near. The game I know of that came closest so far (publicly available, that is), was Planetside, which did something like 500 players per machine (== island). Even the Half-Life 2 engine, brand spanking new, doesn't claim more than 200 players per machine.
Quote:
(Link) Anyway, depending on physics demands, a current single machine can run between 100 and 1,000 players before running out of steam, where "steam" means kernel ability to service connections, and memory bandwidth ability to serve player physics. If you put 200 patches per server, that means 5,000 server machines to cover your area, which is certainly reasonable for a cluster intended to serve a million simultaneous players.
Quote:
(Link)
Quote:
If you're planning to make an indy mmog this is a good strategy IMHO. When we have to bring the server back up after a crash it's at the maximum a 20 minutes rollback but it's often something in between. Of course when we do bring the server down ourselves there's no rollback.
The real problem is when there's not "a" server, but a distributed cluster of them. You have to make sure that all operations that affect more than one physical data store commit together somehow. (The classic way is to use a transaction monitor and two-phase commit)
Quote:
(Link) We use a model where the client connects to all zone servers within view, and zone servers tell the client about objects within its borders only. An alternative is to make the client connect to only one zone server (except when transitioning), and make each zone server tell the client about all objects within client range (this means that zone servers need to forward their updates to their neighbors). A third option is to go with the "central server" approach, but to have more than one.
Quote:
(Link)
Quote:
Is there any techniques that can be used for most MMOGs?
What most MMOs do is sharding. Their game world is only designed to handle 1,000 - 5,000 simultaneous online users, and gets too full if more people are playing. Thus, they replicate the same content across many shards, giving each one a separate name ("Darkbane," "Brell Serilis" etc).
Quote:
(Link) Sockets limitation: The number of sockets really shouldn't be a limitation to how many players you have on a single machine. ... snip ... 10,000 users on a server: "server" != "machine". Typically, MMORPG companies build clusters of machines that work together, where each machine will serve one or more zones, or one or more areas of the world, or one or more roles (NPC, guild, chat, etc), or some other such load balancing scheme. ... snip ...

Now, as the thread title implies, my particular question comes in regarding how to make all the aforementioned concepts work together. I have all this information in front of me (the quotes) that tell how it's done in practice, but it seems that they are all interdependent on one another to form a working solution, and I don't know what direction to go in now. Before having put together all those quotes for this thread, I thought if I made an IOCP server, I'd be able to process heavy network data for around 1000 clients without any "problems", per se, and simply be done. I find that I can barely handle traffic for 200 clients and I've not even added in any data processing or other game related technologies. I thought my code was wrong at first, but after checking it over and trying other configurations it seems right. Regardless though, it doesn't matter about the IOCP code I've written. It could be wrong, it could be right. Either way, the bigger problem at hand is that I'd still have to code something to accomplish the distributed aspect to make the server scalable via adding more machines. That, is what I do not know how to do. Likewise, the whole shard/zone business seems something that is "game specific" and I don't have a game! I'm only trying to work on the network architecture. I'm looking for advice, design ideas, and a general direction I should go in to be able to learn first-hand on how to make all these things work together to form a scalable networking system that is applicable to games. I'm requesting more "practical" advice rather than "theoretical", but beggars can't be choosers [wink]. My main goal is to have a simple "working" system that is not too overly complicated but realistic enough to where it would be applicable in a simple game. I want something "real world", but not to try and create the next big commercial middleware system. The resulting performance metrics aren't that important to me as just learning the practical design of how it's done. This is for my own learning experience for the future as I want to build up experience in this area. Here's what I'm looking for specific to the four areas of interest: IOCP - I've pretty much exhausted online resources on IOCP. I don't need much here, barring someone who has written working code to review my own [lol]. I've not ruled out the necessity of understanding a custom UDP based protocol as well rather than TCP w/ IOCP. It's just that I've had all my experience with TCP and IOCP is the last tier. MMO-Architecture - Having studied many F2P mmos, I understand the components of them, but I don't know how to design them (types, classes, etc...). Shame on me, since I've got a degree in Software Engineering. I need to really learn UML and how to class diagram out a simple game. Any resources on game specific modeling would help here or any other advice on building up experience to get effective here. Cluster Computing - I'll be looking at Windows specific technology. The type of cluster or distributed system I use does not really matter as long as I learn how to make it scalable. I think I should grab a few cheap 1U racks from Geek.com to help test in this area; but I've got no idea what type of system to code for such. Shards & Zones - I realize this is really game specific, but I'd like to find a way to make a simplified abstraction of an API used for implementing these concepts. I know this post is really long, sorry. Thanks for any help though! I do understand how specialized these domains are and there are a lot of trade secrets people don't want to give away. That's why I'm just looking for practical advice on what I should pursue to accomplish my goals. [smile] [Edited by - Drew_Benton on March 30, 2009 12:42:23 PM]

Sponsor:

#2 chairthrower   Members   -  Reputation: 436

Like
0Likes
Like

Posted 29 March 2009 - 06:56 PM

I have not written any MMOs and am not a networking expert, and have never done hardware load balancing. That's the disclaimer out of the way, so you can choose to apply whatever weight you wish to this post ;-).

I have tested boost's asio which wraps the Window's completion ports stuff, and epoll etc on *nix.

I tested against an external server (in another country - actually http, no nat routing), going through a full dns resolve, connection setup, pull a few hundred bytes, and tear down. I benchmarked this with 1000 simultaneous connections and got a benchmark time of 10 seconds with maybe an average of 700 concurrently active tcp connections (others were in state of resolving or had finished). This was done on a 3 year old laptop, and the aggregated traffic was determined to be basically saturating our 200Mbit external link. Cpu was untaxed, perhaps 10%. This was more than ample for my specific needs, and so I did not try scaling up, either against a local server on our gigabit lan, or for more than 1000 connections, or for connections that did anything more than service a response. I didn't have real time servicing constraints (opposite of games), but believe that most of the time was dominated by connection management, and by bandwidth induced latency rather than reading and writing the sockets. It is subjective, but I felt this would scale upwards (3000 connections) without problem. I only tried this on linux so cannot comment on the completion ports side of things. I avoided any use of threads. I did use a now orphaned c++ library for coroutines (mostly just to satisfy my curiosity) but this doesnt affect the basic networking libraries operation.

It took me only a couple of days to code this test. My advice would be to write some tests using an existing library (asio or something else) that holds open the promise of robust performance, and that offers a bit more abstraction than the raw completion ports api. The advantages I see are twofold. 1. an existing library will hopefully be using the OS api (IOCP, epoll etc) correctly and efficiently at least for default usage (various values of default), and this will then enable you to attain some baseline real world benchmarking. 2. You can far more quickly develop code to do performance testing to address specific needs.

#3 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 29 March 2009 - 08:30 PM

Thanks for your reply chairthrower. Let me add some more details since I've posted about the IOCP related stuff. I found a few serious flaws in some extra code I had added that was definitely adding performance hits to the IOCP code. In particular, I had a critical section and a STL map being accessed each 'event', which explained why I had such performance issues. I took both those out and fixed the code to work as IOCP code is supposed to be designed and I noticed significantly better performance. Furthermore, I've been testing the server (which runs on my desktop) using my desktop, so I think that too lead to some significant performance degradation I was overlooking.

That leads me to explaining my various tests for my code, which I know is part of the problem since I don't know how to setup real world tests that simulate expected behaviors. Originally, I was simply spawning Telnets to test connections only, but I hit a wall on Windows resources; it doesn't seem like several hundred windows open at once very much [lol]. So I wrote a simple IOCP test client program that I would launch and pass how many connections to create. That worked great and got by all the problems I had with Telnet. The next step was add client data sending to the server in the test program so I added in some simple send logic. My original expectations were to handle 1000 clients, so I created 1000 connections and was sending 512 bytes each 100ms.

Ouch. Windows really didn't like that test. The server "worked" fine, but there was such a delay between the read processing times of the requests that it was impractical. I.e. the server was not able to churn through all of the data i was spamming it with, which was mostly due to my router taking that hit and then propagating to my desktop since I was doing LAN testing. Lesson learned.

I scaled down the amount of data and frequency that I was sending data, but then I hit the next problem, I was synchronized sending a lot of data at once. Since I was saving resources on Windows, I simply created all sockets in a loop and sent in a loop and slept, but that was to equivalent to several hundred sends being executed a second with a decent amount of data which pretty much resulted in the same behavior as before. I tried adding variable sleeps, but then I was serializing the sends, which is not real world.

Finally, I wrote a second IOCP client program that simply used CreateProcess with variable Sleeps in there to launch the first IOCP client testing program with one connection to simulate more "real world conditions". So far that seems to be the "right" thing to do as I can test the server and see results. I'm using a combination of TaskManager, ProcessExplorer, and NetMeter just to watch traffic and resource usage to make sure nothing is going weird on me. I think I'm set on using that as a testing strategy.

The real problem though is figuring out how to simulate various traffic for types of games without having any numbers to work with. That's the real problem I am having in coming up with this "testing" stuff. I can't use a web server test and static content since my goal is for games. I guess I'll have to scour the net more for average bytes per second various genre of games send to adequately test my code to make sure it's right.

I'll describe the tests as I am running them right now to give an idea of what I'm looking at. Perhaps I'm just doing the tests all wrong which is why I think my code is not right when in fact it's the tests that are flawed. I really think that is the case, but until someone corrects me, I think I'm at that disadvantage.

Testing benchmark: Track times between 'read' events on each socket. If the time elapsed is above some threshold (10s in this case), then the test fails. The reason for this approach is because it shows if the server can keep up with the amount of data the clients are sending it. These tests are more "performance over estimators", but that's fine with me. The test mode is simply "data churn"; it processes the IOCP events and discards the data; no processing is done on it. All data sent by the clients is randomly generated as well (to make sure not caching or other filtering is done)

In all tests, CPU usage rarely went above 1-2% once every few minutes. Total CPU time was really negligible. After running hours of testing various methods, the total CPU time was only at 15s, so that is definitely a plus.

Select test results:
* 1000 x connections sending 32 bytes a second - failed; time between reads grew above 10 seconds. When the client count hit around 600, noticeable delays were reported in the read processing and clients began auto dropping as designed. The system would have self corrected down to around 600 or so connections I believe.

* 500 x connections sending 32 bytes a second - succeeded; time between reads stayed under 5s for reads and connections only failed after the power saving features in my laptop happened to kick in when I wasn't watching.

* 1000 x connections sending 32 bytes every 2 seconds - succeeded; time between reads stayed under 5s for reads. Server was pulling in about 47KiB/s according to NetMeter on average. Each set of 500 clients was uploading at 1/2 that rate, as expected.

From the new changes in the code and having learned the way to setup more realistic tests, I feel much better about the IOCP code. However, I know I'm too inexperienced in the other areas I am writing about in this thread to be able to just "trial and error it" since I won't know the wrongs from rights. That's basically what I am after.

Bah, yet another long post and this is probably the easiest portion out of the 4 areas I am trying to get into. I'll just stop here, I could see myself writing a bit more.

#4 Washu   Senior Moderators   -  Reputation: 6268

Like
0Likes
Like

Posted 29 March 2009 - 09:06 PM

First off, try and reduce the number of synchronizations to zero. Things like heap allocation management (new/delete) will quickly bog you down as each aquires and releases a lock.

Also, try attaching a profiler and seeing where you're spending the majority of your time. One critical thing to understand is that Windows XP/Vista does not behave the same as say Windows Server 2003. A prime example is that of the thread quantum, which in windows XP is set to 2, and on windows server 2003 is at 12. Or in other words, a Windows XP thread gets around 20 - 30ms of processor time, while a Server 2003 thread gets around 120 - 180ms of processor time. Note that you can change your PC to use long quantums, but doing so will impact the general performance of foreground applications such as VS.

Check memory usage too, if you're paging frequently then you'll find things will slow down significantly. There are ways to manage this, such as maintaining locality of data. Try and keep everything relevant to an event nearby, rather than scattered all over the place.

See how you pass around messages and process them. Avoid large loops or other similar structures that take a while to run through. Something as simple as finding all players within a certain radius of a point can be quite time consuming unless you use a proper data structure to reduce the amount of searching you do.

32 bytes per second per client is not that realistic of a number unless you're looking at a mostly event driven RPG of some kind. Eve-Online can get away with that (or less) in many cases, but once combat begins, you can expect surges of packets as players (or macros) attempt to perform operations fairly rapidly. Also, chat text is generally longer than 32 bytes, and may appear at any time (generally).

Avoid using sleep or other methods to surrender your time-slice. Design your system away from calls like that, as long lasting completion calls tend to bog down the system. Look at using lock free containers if you need something like tasklet message passing.

#5 Antheus   Members   -  Reputation: 2397

Like
0Likes
Like

Posted 30 March 2009 - 02:03 AM

Quote:
Original post by Drew_Benton
My original expectations were to handle 1000 clients, so I created 1000 connections and was sending 512 bytes each 100ms.

Ouch. Windows really didn't like that test. The server "worked" fine, but there was such a delay between the read processing times of the requests that it was impractical. I.e. the server was not able to churn through all of the data i was spamming it with, which was mostly due to my router taking that hit and then propagating to my desktop since I was doing LAN testing. Lesson learned.


That's still under 100Mbit, so it shouldn't be a problem.

What probably happened is TCP throttling kicking in, causing cascading failure. On LAN, and with such simulated tests, I ran into a problem of sending being done too fast and too accurately. Clients would send data every 100ms with such accuracy and burst, that they'd flood the network buffers, resulting in packet loss. Under TCP, packet loss kills the throughput. For the test, changing send time to 50+rand(100) ms fixed the issue.

How are you throttling the sends? Did you try increasing socket receive buffer?

#6 hplus0603   Moderators   -  Reputation: 6024

Like
0Likes
Like

Posted 30 March 2009 - 04:53 AM

First: That's an awesome list of quotes! I'm flattered :-)

Getting a service time of 5 seconds for each client at 500 clients with 32 bytes per second sounds too long, unless your machine is an old single-core machine with too little memory. Or if you're running the test clients on the same machine as the server, that would also severely impact the numbers.

Just do the math: if you get one read every 5 seconds, for 500 clients, that means 100 clients per second, or 20 milliseconds per client if you're using a dual-core server. What on Earth are you doing for 20 milliseconds when you're processing 32 bytes of data!? That sounds like the time needed to serve several page faults. Are you running out of physical memory?

When it comes to designing the architecture, I think you need to think carefully about the kind of game you're building. A Virtual World is different from a RPG. A RPG is different from a FPS. Generally, VWs are most expensive per player/entity, because you have full physics and can't optimize for a lot of static cases; FPS-es are in the middle, because they still have physics, but a static/optimized environment, and RPGs are cheapest because they have very rudimentary physics, if any at all.

For a RPG, it's mostly about replicating changes in properties on entities, making sure that rules are fair, and that you don't start losing hitpoints to some monster which you can't even see yet on your machine.
For a FPS, you typically get all of the players, you get quick updates on position/velocity, but there really aren't much in the way of game rules or property updates.
For a Virtual World, you have both problems: properties for user-customizable objects AND physics updates. Additionally, you have the problem that most content will be user-generated, so introducing a new entity isn't simply "add a new instance of entity type 53" but you often have to download models and textures and other large assets to be able to show the entity.

However, I think this thread is too sprawling. Let's focus on one thing per thread :-)
I'd be interested in seeing what Intel VTune or a similar tool says about the CPU usage on your server when you're getting 20 ms per client processing times. 20 ms on a 2 GHz machine is 40,000,000 instructions. Wooah! You can probably ray-trace a sphere-over-checkerboard scene in that amount of computation :-)


#7 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 30 March 2009 - 06:37 AM

Thanks a lot for the replies Washu and Antheus.

Quote:
First off, try and reduce the number of synchronizations to zero. Things like heap allocation management (new/delete) will quickly bog you down as each aquires and releases a lock.


This was one of the first mistakes I made at the time of the post. I had not only a critical section but also a map I was accessing in debug mode. I removed both of those, since they were not needed as well as a second critical section I was using to manage the connection pool (list). I changed my design to not allow any more connection be added to the pool once the server was running, which is fine for me. As of right now, I don't think I have any additional synchronizations in the code. I'll keep that in mind for the future as I make improvements in the code.

Quote:
Also, try attaching a profiler and seeing where you're spending the majority of your time. One critical thing to understand is that Windows XP/Vista does not behave the same as say Windows Server 2003. A prime example is that of the thread quantum, which in windows XP is set to 2, and on windows server 2003 is at 12. Or in other words, a Windows XP thread gets around 20 - 30ms of processor time, while a Server 2003 thread gets around 120 - 180ms of processor time. Note that you can change your PC to use long quantums, but doing so will impact the general performance of foreground applications such as VS.


Thanks for pointing out those numbers between the two OS's. I was thinking a real server would have a better network card than I have, but I wasn't considering the software implications. I'll spend some time today checking out profilers as I've not done any profiling yet, just looking at the results I am generating. I'm not trying to optimize everything yet without having profiled, I'm just trying to understand the results I am getting so I don't feel too bad having not profiled yet. Those numbers to help a lot in understanding why my testing setup is a bit inadequate for this type of code.

Quote:
Check memory usage too, if you're paging frequently then you'll find things will slow down significantly. There are ways to manage this, such as maintaining locality of data. Try and keep everything relevant to an event nearby, rather than scattered all over the place.


As connections are accepted in the server (I ran 700 total), the Page Fault Delta was between 1-9 with no real average except it was more on the low side than high most of the time. As soon as the connections were done accepting, the page fault delta remained at 0. From server start to before accepting the first connection it generated a few thousand page faults, but since that was only on start up I assume that's 'ok'.

I should also mention that the design I am using is based on preallocating the maximum number of connections resources at startup. Once the server is running, the only real resource usages have to do with the sockets being closed and recreated for reuse and the new IOCP association on the new socket. The network connection data is never deallocated and reallocted, it's simply reused.

Quote:
See how you pass around messages and process them. Avoid large loops or other similar structures that take a while to run through. Something as simple as finding all players within a certain radius of a point can be quite time consuming unless you use a proper data structure to reduce the amount of searching you do.


I'll keep this in mind for when I add more code that is game related. Originally I was using Boost's Circular Buffer, Singleton Pool, and Windows APCs to handle passing raw network data into a single network processing thread that did all the work. The reason for that was to keep the networking system only doing the network event handling and not have worker threads do any data processing themselves (which I was reading is something to avoid).

Quote:
32 bytes per second per client is not that realistic of a number unless you're looking at a mostly event driven RPG of some kind. Eve-Online can get away with that (or less) in many cases, but once combat begins, you can expect surges of packets as players (or macros) attempt to perform operations fairly rapidly. Also, chat text is generally longer than 32 bytes, and may appear at any time (generally).


Good point, I'll write a few more tests that add more data in shorter send periods to try and stress test those conditions. I don't actually have a target game in mind, but I'd like to be able to say: you can expect to have acceptable results if you are handling X players and on average they are sending Y bytes per Z time intervals. I want to be able to understand how to test such code and be able to explain the results in a meaningful way, as opposed to how it's mostly done nowadays without context. For example, Torque3D claims up to 1000+ players per server and I know that's more like a marketing gimmick as it has no context.

Quote:
Avoid using sleep or other methods to surrender your time-slice. Design your system away from calls like that, as long lasting completion calls tend to bog down the system. Look at using lock free containers if you need something like tasklet message passing.


I'll definitely avoid Sleep. Right now I do not have any calls in the server. I do not have any other functions that "wait" aside from the GetQueuedCompletionStatus in each worker thread and a WaitForSingleObject in the main thread to allow me to run a console server. I had been looking into lock free containers, but only briefly. I started with ApochPiQ's thread Double-check my lock-free voodoo and ventured into Win32 Asynchronous Procedure Calls. For this testing though, I went ahead and just removed all that stuff and only did data churning.

Quote:
Original post by Antheus
How are you throttling the sends? Did you try increasing socket receive buffer


I experienced that throttling and then the cascading failure as clients just obliterated the network with my traffic (trying to send 512 bytes each 100ms). I was originally running a couple of exes that created a whole bunch of connections, but that resulted in the bursting you mentioned. At the time I also had tried adding a the random Sleep between sends, but with a high number of connections, it just didn't seem to be working like it should.

I switched over to simply creating one process per connection on two laptops via CreateProcess (to cut back on resources used compared to launching via a batch script) and that seemed to give me more of a "natural" testing case.

I do understand the problems of having 500 processes per PC all trying to do nothing but send network data continuously, but since it's all on the LAN, I was thinking it should work out well since the data does not have to travel too far. This is the complete testing code for reference, it's pretty simple:

#include <winsock2.h>
#include <windows.h>
#include <conio.h>
#include <stdio.h>
#include <iostream>

#pragma comment(lib, "ws2_32.lib")

bool bExit = false;

BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent)
{
switch (ConsoleEvent)
{
case CTRL_LOGOFF_EVENT:
case CTRL_C_EVENT:
case CTRL_BREAK_EVENT:
case CTRL_CLOSE_EVENT:
case CTRL_SHUTDOWN_EVENT:
{
bExit = true;
return TRUE;
}
}
return FALSE;
}

int main(int argc, char * argv[])
{
srand(GetTickCount());

int connectionCount = 1;
if(argc > 1) // Limit connections to [1, 4096]
{
connectionCount = atoi(argv[1]);
if(connectionCount < 0) connectionCount = 1;
if(connectionCount > 4096) connectionCount = 4096;
}

sockaddr_in server;
server.sin_addr.s_addr = inet_addr(""); // hard coded my internet IP so I can run on another network too
server.sin_family = AF_INET;
server.sin_port = htons((u_short)15779); // hard code port

WSADATA wsadata = {0};
WSAStartup(MAKEWORD(2,2), &wsadata);

SetConsoleCtrlHandler(ConsoleHandler, TRUE);

SOCKET * sockets = new SOCKET[connectionCount];
for(int x = 0; x < connectionCount; ++x)
{
sockets[x] = INVALID_SOCKET;
}

for(int x = 0; x < connectionCount && !bExit; ++x)
{
sockets[x] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(sockets[x] != INVALID_SOCKET)
{
if(connect(sockets[x], (struct sockaddr*)&server, sizeof(server)))
{
closesocket(sockets[x]);
sockets[x] = INVALID_SOCKET;
printf("Could not establish a connection on socket %i / %i\n", x + 1, connectionCount);
}
}
else
{
printf("Could not create socket %i / %i\n", x + 1, connectionCount);
continue;
}
printf("Adding connection %i / %i\n", x + 1, connectionCount);
//Sleep(50); // Delay a little so we do not flood the AcceptEx calls on the server
// This was taken out to further stress test the server in processing many
// AcceptExs at once; it seems to be able to handle it just fine.
}

const int MAX_SEND_BYTES = 32;
char buffer[MAX_SEND_BYTES] = {0};
while(!bExit)
{
// Even though I've coded for multiple connections, I am only running
// one connection per program
for(int x = 0; x < connectionCount && !bExit; ++x)
{
if(sockets[x] != INVALID_SOCKET)
{
int sendBytes = MAX_SEND_BYTES; // Sometimes made this 'rand() % ' to simulate different actions
for(int q = 0; q < sendBytes; ++q) // Randomize data to make it look more legitimate across network
buffer[q] = rand() % 256;
int result = send(sockets[x], buffer, sendBytes, 0);
if(result == -1) { printf("Socket %i failed to send.\n", x); bExit = true; }
//Sleep(100 + rand() % 100); // Originally I was trying to slow down sends this way, but
// that resulted in a serial behavior and not a concurrent send behavior.
}
}
Sleep(1000); // This is the sleep I use now between
}

for(int x = 0; x < connectionCount; ++x)
{
if(sockets[x] != INVALID_SOCKET)
{
shutdown(sockets[x], SD_BOTH);
closesocket(sockets[x]);
sockets[x] = INVALID_SOCKET;
}
printf("Removing connection %i / %i\n", x + 1, connectionCount);
}
delete [] sockets;

WSACleanup();

SetConsoleCtrlHandler(ConsoleHandler, FALSE);

return 0;
}



In my previous post I also mentioned a second testing program. Here is that one I use to launch the previous program to simulate load:

#include <windows.h>
#include <stdio.h>

int main(int argc, char * argv[])
{
srand(GetTickCount());
int count = 1;
if(argc >= 2)
{
count = atoi(argv[1]);
if(count < 1) count = 1;
}
char curDir[MAX_PATH + 1] = {0};
GetCurrentDirectoryA(MAX_PATH, curDir);
printf("Creating %i processes.\n", count);
for(int x = 0; x < count; ++x)
{
STARTUPINFOA si = {0};
PROCESS_INFORMATION pi = {0};
si.cb = sizeof(STARTUPINFO);
CreateProcessA(0, "IocpTestClient.exe", 0, NULL, FALSE, 0, NULL, curDir, &si, &#960;);
Sleep(rand() % 500);
}
return 0;
}




The second program allows me to pump out new connections as the connections already launched are already sending data to the server so I'm getting a test case of "the server just opened, the gamers are flooding in to play". I just run a batch file that passes the desired program count down the line (I'm only spawning 1 connection per program launched). Even on my dual core laptops (one XP via ethernet, one 64bit Vista via wireless) it seems to handle the process load fine. I have ran into some slowdowns from Nod32 and SpyBot checking data so I try to turn those off at times, but sometimes I just leave them on.

As for the receive buffers on the server, I've not touched the internal TCP buffers. I just left them at their defaults. On my machine, 8kb is reported by getsockopt for both. My actual receiving buffer that I pass to WSARecv is 4kb. I didn't try making it 8kb simply because I didn't think it'd help since I'm only sending out 32b/s now and 320b/s on the older saturating test.

Based on what you see here, is my test setup "alright"? I know the results will be somewhat skewed since it's all on the local network, but I viewed that as a perfect test case of assuming everyone on the server had low latency to the server and was able to pump out a decent amount of data at once.

Quote:
Original post by hplus0603
First: That's an awesome list of quotes! I'm flattered :-)


Saying "thanks" isn't enough for all the advice you've given people over the years, but thanks! Likewise to Washu and Antheus, whom have contributed greatly to this section with their real world experiences. I actually added all those quotes to have them in one thread to refer people to in the future when they ask me about specifics in this area. But anyways,

Quote:
Getting a service time of 5 seconds for each client at 500 clients with 32 bytes per second sounds too long, unless your machine is an old single-core machine with too little memory. Or if you're running the test clients on the same machine as the server, that would also severely impact the numbers.


I thought the same thing. As mentioned above (I append my reply to your reply, so I know you've not gotten a chance to read what I've not yet posted), I was indeed running the test clients on the same machine as the server, bad me, I know. I moved them over to laptops and noticed better performance. That is, after fixing some serious mistakes in my code that I had added before making the post and forgot to remove.

Quote:
Just do the math: if you get one read every 5 seconds, for 500 clients, that means 100 clients per second, or 20 milliseconds per client if you're using a dual-core server. What on Earth are you doing for 20 milliseconds when you're processing 32 bytes of data!? That sounds like the time needed to serve several page faults. Are you running out of physical memory?


That's the funny thing about it and I can't figure it out. After moving the test clients to the laptops, I can get about 600 before the long service times kick in; 500 stay under 5s processing. I know because I have code that checks to see if the time between reads is over 50% the timeout and it only triggers once I start approaching 600+ (spread out across 2 laptops, not all running on one). Memory usage stays at ~13mb, which is how much space is needed to setup the server to handle 1500 connections. Each connection structure is pretty light, only 112 bytes, but both have a 4kb send/recv buffer for data processing that runs up the total. The page faults, as mentioned in response to Washu is also constantly 0 (well as reported by Process Explorer on the Virtual Memory section). The only time page faults happen it seems is when I'm accepting connections.

I don't think I'm over stressing my network. I'm on a 8/2 Comcast Cable plan and have a decent router that's yet to fail on me (the 60->600 one featured on Life Hacker). I can on average upload 300-350kb/s and downloads usually cap out around 2MB/s depending on the server. I am still running all those clients across 2 dual core laptops, so perhaps the problem lies I'm just over-taxing them? I don't think I am watching Task Manager as the CPUs never max out on creating all the processes and they don't ever get fully utilized once they are all in their send loops. In fact, CPU usage drops to 2-4% total when running 500 clients on that laptop and service time is under 5s (not sure exact values as I've not made a visual client that reported the data, only consoles that scale better)

This desktop computer is of a server grade, it's just running XP. Quad core Xeon 3350 @ 2.66ghz (can take to 3.0 stably) and 3.5 gb ram usable by Windows (8gb actually installed). Ram is running at 400mhz with 5-5-5-15 timings. Motherboard is X38-DS4. I know the system is under utilized with XP, but due to other development reasons, I've had to stay on 32bit Windows throughout the end of 2008. I will probably be upgrading to 64bit Windows 7 in the next couple of months when the RC comes out.

Quote:
However, I think this thread is too sprawling. Let's focus on one thing per thread :-)
I'd be interested in seeing what Intel VTune or a similar tool says about the CPU usage on your server when you're getting 20 ms per client processing times. 20 ms on a 2 GHz machine is 40,000,000 instructions. Wooah! You can probably ray-trace a sphere-over-checkerboard scene in that amount of computation :-)


I agree, I stuffed too much into it. I'll edit the title to reflect IOCP related stuff and save the rest for later. I'll get VTune soon and post those results after getting it setup. Intel has a 30 day trial I'll checkout.

In addition. I'll go ahead and just post code since I'm beginning to think it's just my testing setup that is the problem now rather than the code itself. I added in references used in the code, it is pretty much minimal It's all self contained in one file as well. I choose to go the 0-byte posted recv to cater tiwards more connections, although after learning the natural connection limit, I might switch to the other method as discussed in Network Programming for Microsoft Windows.

I'm not asking anyone to waste their time checking my code, but if you are interested and see anything that just looks 'wrong', I'd love to know. I've been following the advice presented in the Network Programming book as well as having consulted other resources and threads on how it should be done and I didn't see anything that looked bad, but I am still new to all this.


/*
A lot of resources were consulted and used in this code. Major resources
used include:
MSDN
http://win32.mvps.org/network/sockhim.html
Network Programming for Microsoft Windows
CodeProject's IOCP articles
http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx
http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx
http://www.codeproject.com/KB/IP/iocp.aspx
Larger blocks of comments are mostly from the tbe second reference.
I used comments from that project to help understand the particulars
of IOCP.
*/


#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
#include <list>
#include <map>
#include <vector>
#include <algorithm>
#include <iostream>

#pragma comment(lib, "ws2_32.lib")

// Logical states for the overlapped structure
const int HPS_CONNECTION_STATE_CLOSED = 0;
const int HPS_CONNECTION_STATE_ACCEPT = 1;
const int HPS_CONNECTION_STATE_READ = 2;
const int HPS_CONNECTION_STATE_WRITE = 3;

// Max bytes for the recv buffer
const int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 4096;

// Max bytes for the send buffer
const int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 4096;

// The size of the sockaddr_in parameter
const int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);

struct tHighPerformanceServerData;
struct tWorkerThreadData;
struct tWorkerThreadWrapperData
{
tHighPerformanceServerData * serverData;
tWorkerThreadData * threadData;
};

struct tConnectionGlobalData
{
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
SOCKET listenSocket;
HANDLE hCompletionPort;
DWORD dwNumberOfConcurrentThreads;
DWORD dwReadTimeTimeout;
DWORD dwWriteTimeTimeout;
DWORD dwAcceptTimeTimeout;
int initialReceiveSize;
LONG * plUidBase;

tConnectionGlobalData() :
lpfnAcceptEx(NULL),
listenSocket(INVALID_SOCKET),
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(0),
lpfnGetAcceptExSockaddrs(NULL),
dwReadTimeTimeout(-1),
dwWriteTimeTimeout(-1),
dwAcceptTimeTimeout(5000),
plUidBase(0),
initialReceiveSize(0)
{
}
};

struct tConnectionData
{
public:
OVERLAPPED overlapped;

DWORD dwUid;

SOCKET socket_;

sockaddr_in address;

WORD sendBufferSize;

BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE];
BYTE sendBufferData[HPS_OVERLAPPED_BUFFER_SEND_SIZE];

INT connectionState;

sockaddr_in localPeer;
sockaddr_in remotePeer;

DWORD dwLastReadTime;
DWORD dwLastWriteTime;

tConnectionGlobalData * globalDataPtr;

public:
tConnectionData() :
socket_(INVALID_SOCKET),
connectionState(HPS_CONNECTION_STATE_CLOSED),
sendBufferSize(0),
dwLastReadTime(0),
dwLastWriteTime(0),
dwUid(-1),
globalDataPtr(0)
{
memset(&overlapped, 0, sizeof(overlapped));
memset(&address, 0, sizeof(address));
memset(&localPeer, 0, sizeof(localPeer));
memset(&remotePeer, 0, sizeof(remotePeer));
}

~tConnectionData()
{
}

bool Initialize()
{
int result = 0;
connectionState = HPS_CONNECTION_STATE_CLOSED;
memset(&overlapped, 0, sizeof(overlapped));
memset(&localPeer, 0, sizeof(localPeer));
memset(&remotePeer, 0, sizeof(remotePeer));

socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(socket_ == INVALID_SOCKET)
{
// TODO: Handle error
return false;
}

// Set the socket to non-blocking so we can post 0 byte recvs and handle more connections
u_long iMode = 1;
ioctlsocket(socket_, FIONBIO, &iMode);

DWORD numberOfBytes = 0; // Not used in this mode
if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE)
{
DWORD dwError = GetLastError();
if(dwError != ERROR_IO_PENDING)
{
closesocket(socket_);
socket_ = INVALID_SOCKET;

// TODO: Handle error
return false;
}
}

// Update the state the connection is in
connectionState = HPS_CONNECTION_STATE_ACCEPT;

// Success
return true;
}

void Close(bool force, bool reuse)
{
if(socket_ != INVALID_SOCKET)
{
struct linger li = {0, 0};
if(force == true) // Default: SO_DONTLINGER
{
li.l_onoff = 1; // SO_LINGER, timeout = 0
}
setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li));
closesocket(socket_);
socket_ = INVALID_SOCKET;
}
connectionState = HPS_CONNECTION_STATE_CLOSED;
if(reuse == true)
{
if(Initialize() == false)
{
// TODO: Handle error
__asm nop
}
}
}

void ProcessIO(DWORD numberOfBytes)
{
switch(connectionState)
{
case HPS_CONNECTION_STATE_CLOSED:
{
// We won't need to do anything since the scavenger thread
// will handle this for us.
__asm nop
} break;

case HPS_CONNECTION_STATE_ACCEPT:
{
sockaddr_in *plocal = 0;
sockaddr_in *premote = 0;
int locallen = 0;
int remotelen = 0;

globalDataPtr->lpfnGetAcceptExSockaddrs(recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, (sockaddr **)&plocal, &locallen, (sockaddr **)&premote, &remotelen);

memcpy(&localPeer, plocal, sizeof(sockaddr_in));
memcpy(&remotePeer, premote, sizeof(sockaddr_in));

setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket));

// we still need to associate the newly connected socket to our IOCP:
HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads);
if(hResult != globalDataPtr->hCompletionPort)
{
// TODO: Handle error
return;
}

//printf("Incoming Connection - [%s:%i] to [%s:%i]\n", inet_ntoa(remotePeer.sin_addr), ntohs(remotePeer.sin_port), inet_ntoa(localPeer.sin_addr), ntohs(localPeer.sin_port));

dwLastReadTime = dwLastWriteTime = GetTickCount();

if(globalDataPtr->initialReceiveSize == 0)
{
// if we get here, then the initial receive size must have been 0, therefore n must be 0, with nothing at all in our receive buffer.
// we can therefore blithely allow SendReply() to overwrite n (and other things).
dwUid = InterlockedIncrement(globalDataPtr->plUidBase);

_snprintf_s((char*)sendBufferData, HPS_OVERLAPPED_BUFFER_SEND_SIZE, HPS_OVERLAPPED_BUFFER_SEND_SIZE - 1, "Hello, world.\r\n");
sendBufferSize = 1 + strlen((char*)sendBufferData);
PostWrite();
}
else
{
// We received something during AcceptEx()
if(numberOfBytes != 0)
{
dwUid = InterlockedIncrement(globalDataPtr->plUidBase);
dwLastReadTime = GetTickCount();
//
// TODO: Process Data
//
PostRead();
return;
}

// we should never get here: if SEND_BANNER_FIRST was undefined,
// then AcceptEx() was told to receive at least one byte, and it
// would not have returned without that byte (unless the scavenger
// force-closed the socket, in which case the AcceptEx() result
// would have been an error, handled before ever calling DoIo().

// TODO: Handle error
__asm nop
}
} break;

case HPS_CONNECTION_STATE_READ:
{
int result = 0;
DWORD byteCount = 0;
DWORD recvFlags = 0;
WSABUF recvBufferDescriptor = {0};
while(result != -1) // Empty our the recv buffers
{
recvFlags = 0;
byteCount = 0;
recvBufferDescriptor.len = HPS_OVERLAPPED_BUFFER_RECV_SIZE - numberOfBytes;
recvBufferDescriptor.buf = (char*)(recvBufferData + numberOfBytes);
result = WSARecv(socket_, &recvBufferDescriptor, 1, &byteCount, &recvFlags, 0, 0);
if(byteCount == 0) break; // No more data to read
numberOfBytes += byteCount;
}
if(WSAGetLastError() != WSAEWOULDBLOCK)
{
// TODO: Handle error
__asm nop
}
dwLastReadTime = GetTickCount();
if(numberOfBytes == SOCKET_ERROR)
{
// TODO: Log error
Close(true, true);
return;
}
else if(numberOfBytes == 0) // connection closing?
{
// TODO: Log error
Close(false, true);
return;
}
//
// TODO: Process Data
//
PostRead();
} break;

case HPS_CONNECTION_STATE_WRITE:
{
dwLastWriteTime = GetTickCount();
if(numberOfBytes == SOCKET_ERROR)
{
// TODO: Log error
Close(true, true);
return;
}
else if(numberOfBytes == 0) // connection closing?
{
// TODO: Log error
Close(false, true);
return;
}
PostRead();
} break;
}
}

void PostRead()
{
connectionState = HPS_CONNECTION_STATE_READ;
WSABUF recvBufferDescriptor = {0};
DWORD numberOfBytes = 0;
DWORD recvFlags = 0; // needed by WSARecv()
BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, 0);
if(result != SOCKET_ERROR) // everything is OK
{
return;
}
else
{
if(GetLastError() != ERROR_IO_PENDING)
{
// TODO: Handle error
Close(true, true);
}
else
{
// (else branch intentionally empty)
// if we get here, gle == ERROR_IO_PENDING, which is fine by me
}
}
}

void PostWrite() // TODO: Rework this logic
{
BOOL result;
DWORD numberOfBytes = 0;
WSABUF sendBufferDescriptor;
connectionState = HPS_CONNECTION_STATE_WRITE;
sendBufferDescriptor.len = sendBufferSize;
sendBufferDescriptor.buf = (char*)sendBufferData;
result = WSASend(socket_, &sendBufferDescriptor, 1, &numberOfBytes, 0, &overlapped, 0);
result = (result != SOCKET_ERROR); // WSASend() uses inverted logic wrt/WriteFile()
if(!result)
{
DWORD err = WSAGetLastError();
if(err != ERROR_IO_PENDING)
{
// TODO: Handle error
Close(true, true);

// the fall-through does nothing because the caller
// shouldn't post another read -- the reinitialized
// socket has an AcceptEx() pending
}
else
{
// (else branch intentionally empty)

// if we get here, gle == ERROR_IO_PENDING; nothing
// left to do but return. Caller loops back to GQCS().
}
}
else // WriteFile()
{
// the write completed immediately
// this doesn't bother us -- we will still
// get the completion packet
}
}
};


struct tWorkerThreadData
{
public:
HANDLE hThread;
DWORD dwThreadId;

public:
tWorkerThreadData() :
hThread(INVALID_HANDLE_VALUE),
dwThreadId(0)
{
}

~tWorkerThreadData()
{
}
};

struct tHighPerformanceServerData
{
public:
WORD wPort;

int backLog;

HANDLE hCompletionPort;

DWORD dwNumberOfConcurrentThreads;
DWORD dwNumberOfWorkerThreads;

LONG lRunningWorkerThreadCount;

SOCKET sListenSocket;

SOCKADDR_IN saInternetAddr;

GUID GuidAcceptEx;
LPFN_ACCEPTEX lpfnAcceptEx;

GUID GuidGetAcceptExSockaddrs;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;

CRITICAL_SECTION workerThreadCS;
std::list<tWorkerThreadData *> workerThreads;

DWORD dwInitialConnectionPoolCount;
std::list<tConnectionData *> connectionPool;

HANDLE hScavengerThread;
DWORD dwScavengerThreadId;
DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger
HANDLE hScavengerExitEvent; // tells scavenger thread when to die

DWORD dwWorkerThreadScaleValue;

LONG lUidBase;

tConnectionGlobalData globalData;

public:
tHighPerformanceServerData() :
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(1),
dwNumberOfWorkerThreads(2),
lRunningWorkerThreadCount(0),
sListenSocket(INVALID_SOCKET),
wPort(0),
lpfnAcceptEx(NULL),
lpfnGetAcceptExSockaddrs(NULL),
dwInitialConnectionPoolCount(1500),
dwScavengerDelay(1000),
hScavengerExitEvent(NULL),
hScavengerThread(INVALID_HANDLE_VALUE),
dwScavengerThreadId(0),
dwWorkerThreadScaleValue(1),
lUidBase(0),
backLog(SOMAXCONN)
{
GUID guidAcceptEx = WSAID_ACCEPTEX;
memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx));

GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs));

InitializeCriticalSection(&workerThreadCS);
}

~tHighPerformanceServerData()
{
DeleteCriticalSection(&workerThreadCS);
}

int WorkerThread()
{
BOOL result = 0;
DWORD numberOfBytes = 0;
ULONG key = 0;
OVERLAPPED * lpOverlapped = 0;
InterlockedIncrement(&lRunningWorkerThreadCount);
while(true)
{
tConnectionData * connectionData = 0;
InterlockedDecrement(&lRunningWorkerThreadCount);
result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE);
if(key == -1)
{
break; // Time to exit the worker thread
}
connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped);
if(connectionData == 0)
{
// TODO: Handle error
continue;
}
InterlockedIncrement(&lRunningWorkerThreadCount);
if(result == TRUE)
{
// We have an I/O to process
connectionData->ProcessIO(numberOfBytes);
}
else
{
// Close this socket and make space for a new one if we are still listening
connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true));
}
}
return 0;
}

int ScavengerThread()
{
while(true)
{
int count[4] = {0};
std::list<tConnectionData *>::iterator itr = connectionPool.begin();
while(itr != connectionPool.end())
{
if(WaitForSingleObject(hScavengerExitEvent, 0) != WAIT_TIMEOUT)
{
printf("ScavengerThread requested to quit during logic loop.\n");
break;
}

tConnectionData * connection = (*itr);
count[connection->connectionState]++;

// AcceptEx() called, but no completion yet
if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT)
{
// determine if the socket is connected
int seconds = 0;
int length = sizeof(seconds);
if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length))
{
if(seconds != -1)
{
seconds *= 1000;
if(seconds > (int)globalData.dwAcceptTimeTimeout)
{
printf("[%i][Accept] idle timeout after %i ms.\n", connection->dwUid, seconds);

// closesocket() here causes an immediate IOCP notification with an error indication;
// that will cause our worker thread to call Close().
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}

// No connection made on this socket yet
else if(seconds == -1)
{
// Nothing to do
}
}
}

// At times, a connection can fail and not be able to be reconnected due to connection
// flooding. This check will reclaim our connections that are locked in the closed state
// and never get a chance to recover.
else if(connection->connectionState == HPS_CONNECTION_STATE_CLOSED)
{
connection->Close(true, true);
}

// The client is in a read or write state, doesn't matter which. We want to make sure
// activity still exists as desired.
else
{
bool doClose = false;
DWORD tick = GetTickCount();
DWORD dwLastTime;

dwLastTime = tick - connection->dwLastReadTime;
if(dwLastTime > globalData.dwReadTimeTimeout)
{
printf("[%i][Read] idle timeout after %i ms.\n", connection->dwUid, dwLastTime);
doClose = true;
}
else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5))
{
printf("[%i][Read] %i ms\n", connection->dwUid, dwLastTime);
}

dwLastTime = tick - connection->dwLastWriteTime;
if(dwLastTime > globalData.dwWriteTimeTimeout)
{
printf("[%X][Write] idle timeout after %i ms.\n", connection->dwUid, dwLastTime);
doClose = true;
}
else if(dwLastTime > ((float)globalData.dwWriteTimeTimeout * .5))
{
printf("[%i][Write] %i ms\n", connection->dwUid, dwLastTime);
}

if(doClose)
{
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}
itr++;
}
printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]);

// Pause until next run due
DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay);
if(result != WAIT_TIMEOUT)
{
break;
}
}
return 0;
}

DWORD AddConnectionsToPool(long count)
{
// We cannot add more connections once the server has started
if(hScavengerThread != INVALID_HANDLE_VALUE)
{
return 0;
}
DWORD total = 0;
for(long index = 0; index < count; ++index)
{
tConnectionData * connection = new tConnectionData;
connection->globalDataPtr = &globalData;
bool result = connection->Initialize();
if(result == true)
{
connectionPool.push_back(connection);
total++;
}
else
{
// TODO: Handle error
delete connection;
}
}
return total;
}
};

DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam)
{
tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam;
DWORD dwResult = data->serverData->WorkerThread();

LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS;

EnterCriticalSection(pCS);
std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin();
while(itr != data->serverData->workerThreads.end())
{
tWorkerThreadData * td = (*itr);
if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread)
{
printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId);
data->serverData->workerThreads.erase(itr);
break;
}
itr++;
}

delete data->threadData;
delete data;

LeaveCriticalSection(pCS);

return dwResult;
}

DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam)
{
return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();
}

bool InitializeWinsock()
{
WSADATA wd = { 0 };
if(WSAStartup(MAKEWORD(2, 2), &wd) != 0)
{
// TODO: Handle error
return false;
}
if(LOBYTE( wd.wVersion ) < 2)
{
WSACleanup();
// TODO: Handle error
return false;
}
return true;
}

void DeinitializeWinsock()
{
WSACleanup();
}


// Our high performance server :)
class cHighPerformanceServer
{
private:
tHighPerformanceServerData * internalData;

public:
cHighPerformanceServer()
{
internalData = new tHighPerformanceServerData;
}

~cHighPerformanceServer()
{
delete internalData;
}

bool Create(unsigned short port)
{
// Get the system information
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);

// Try to create an I/O completion port
internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads);
if(internalData->hCompletionPort == NULL)
{
// TODO: Log error
Destroy();
return false;
}

// Calculate how many worker threads we should create to process IOCP events
DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads;
if(internalData->dwNumberOfWorkerThreads == 0)
{
if(internalData->dwNumberOfConcurrentThreads == 0)
{
dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue;
}
else
{
dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue;
}
}

// Create the worker threads!
DWORD dwWorkerTotal = AddWorkerThreads(dwNumberOfWorkerThreads);
if(dwWorkerTotal != dwNumberOfWorkerThreads)
{
// TODO: Log error
Destroy();
return false;
}

internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(internalData->sListenSocket == INVALID_SOCKET)
{
// TODO: Log error
Destroy();
return false;
}

// Bind the socket to the port
internalData->wPort = port;
internalData->saInternetAddr.sin_family = AF_INET;
internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
internalData->saInternetAddr.sin_port = htons(internalData->wPort);
int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr));
if(bindResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

int listenResult = listen(internalData->sListenSocket, internalData->backLog);
if(listenResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

DWORD dwBytes = 0;
int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx,
sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

dwBytes = 0;
ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs,
sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

// Assign the global data for our connections
internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx;
internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs;
internalData->globalData.listenSocket = internalData->sListenSocket;
internalData->globalData.hCompletionPort = internalData->hCompletionPort;
internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads;
internalData->globalData.plUidBase = &internalData->lUidBase;
internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable
internalData->globalData.dwWriteTimeTimeout = -1; // TODO: Variable
internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable
internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx
// If we wanted to accept data from AcceptEx
//internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2);

DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount);
if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount)
{
// TODO: Log error
Destroy();
return false;
}

// Connect the listener socket to IOCP
if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0)
{
// TODO: Log error
Destroy();
return false;
}

internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0);
if(internalData->hScavengerExitEvent == NULL)
{
// TODO: Log error
Destroy();
return false;
}
internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId);
if(internalData->hScavengerThread == NULL)
{
// TODO: Log error
Destroy();
return false;
}
DWORD dwResult = ResumeThread(internalData->hScavengerThread);
if(dwResult == (DWORD)-1)
{
// TODO: Log error
Destroy();
__asm nop
}

// Success!
return true;
}

void Destroy()
{
if(internalData->hScavengerExitEvent != NULL)
{
SetEvent(internalData->hScavengerExitEvent);
if(internalData->hScavengerThread != INVALID_HANDLE_VALUE)
{
int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2);
if(result != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
CloseHandle(internalData->hScavengerThread);
internalData->hScavengerThread = INVALID_HANDLE_VALUE;
}
CloseHandle(internalData->hScavengerExitEvent);
internalData->hScavengerExitEvent = NULL;
}

if(internalData->sListenSocket != INVALID_SOCKET)
{
closesocket(internalData->sListenSocket);
internalData->sListenSocket = INVALID_SOCKET;
}

std::vector<HANDLE> workerThreadHandles;
std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin();
while(itr != internalData->workerThreads.end())
{
workerThreadHandles.push_back((*itr)->hThread);
itr++;
}

//
if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
EnterCriticalSection(&internalData->workerThreadCS);
size_t count = internalData->workerThreads.size();
for(size_t x = 0; x < count; ++x)
{
PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);
}
LeaveCriticalSection(&internalData->workerThreadCS);
}

// Wait for all worker threads to close
for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS)
{
DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x);
DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000);
if(dwResult != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
}

if(internalData->workerThreads.size())
{
// TODO: Log error
printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size());
}

if(internalData->connectionPool.size())
{
std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin();
while(itr != internalData->connectionPool.end())
{
closesocket((*itr)->socket_);
delete (*itr);
itr++;
}
internalData->connectionPool.clear();
}

if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
CloseHandle(internalData->hCompletionPort);
internalData->hCompletionPort = INVALID_HANDLE_VALUE;
}
}

DWORD AddWorkerThreads(DWORD count)
{
DWORD total = 0;
for(DWORD index = 0; index < count; ++index)
{
tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData;
tWorkerThreadData * threadData = new tWorkerThreadData;
threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId);
if(threadData->hThread != NULL)
{
total++;
EnterCriticalSection(&internalData->workerThreadCS);
internalData->workerThreads.push_back(threadData);
LeaveCriticalSection(&internalData->workerThreadCS);

workerThreadData->serverData = internalData;
workerThreadData->threadData = threadData;

DWORD dwResult = ResumeThread(threadData->hThread);
if(dwResult == (DWORD)-1)
{
// TODO: Handle error
__asm nop
}
}
else
{
delete workerThreadData;
delete threadData;
}
}
return total;
}

void RemoveWorkerThreads(DWORD count)
{
EnterCriticalSection(&internalData->workerThreadCS);
count = min(count, internalData->workerThreads.size());
for(DWORD index = 0; index < count; ++index)
{
// Signal one worker to exit, whomever is unlucky enough to process this request dies
PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);
}
LeaveCriticalSection(&internalData->workerThreadCS);
}
};

HANDLE exitEvent = 0;

BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent)
{
switch (ConsoleEvent)
{
case CTRL_LOGOFF_EVENT:
case CTRL_C_EVENT:
case CTRL_BREAK_EVENT:
case CTRL_CLOSE_EVENT:
case CTRL_SHUTDOWN_EVENT:
{
if(exitEvent != 0)
{
SetEvent(exitEvent);
return TRUE;
}
}
}
return FALSE;
}

int main(int argc, char * argv[])
{
printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData));

if(InitializeWinsock() == false)
return 0;

cHighPerformanceServer server;
if(server.Create(15779) == false)
{
return 0;
}

exitEvent = CreateEvent(0, TRUE, FALSE, 0);

SetConsoleCtrlHandler(ConsoleHandler, TRUE);

WaitForSingleObject(exitEvent, INFINITE);
SetConsoleCtrlHandler(ConsoleHandler, FALSE);

server.Destroy();

DeinitializeWinsock();

CloseHandle(exitEvent);

return 0;
}



Thanks again everyone [smile]

#8 hplus0603   Moderators   -  Reputation: 6024

Like
0Likes
Like

Posted 30 March 2009 - 08:28 AM

Your code looks really weird. You are calling wSARead() in the STATE_READ switch, but you are also calling it in the PostRead() function. You don't use WSARead() to read data out of the socket once the data receives; the data is already in the buffer that you submitted when you called WSARead with an OVERLAPPED structure.

If this is the problem, then what's happening is that you're blocking on some read operations, which means that the thread pool has no threads, which means that you get long service times.

In general, what you do with IOCP is:

0) Tie the socket to an IOCP when it is first accepted.
1) Call WSARead() asynchronously, pointing it at a buffer.
2) Wait for QueuedCompletionStatus
3) When complete, the data you requested for the read is now in the buffer, and you can process it however you wish.
4) Goto 1).


#9 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 30 March 2009 - 12:18 PM

Quote:
Your code looks really weird.


The strategy I was using was originally using was the posting a 0 byte read to all connections to prevent any non-paged memory buffers from being locked. I won't paste the entire quote as it does take up some space, but it's available here on MSDN. It's the second paragraph on the page.

Although, I think I'm in need for more throughput right now than concurrent connections, so I'll make those changes. I also had left in a call to WaitForSingleObject for each iteration of the ScavengerThread, so I took that out as well but it didn't change much. I hadn't noticed it until I was checking the code in the thread for wide page breaks.

Quote:
If this is the problem, then what's happening is that you're blocking on some read operations, which means that the thread pool has no threads, which means that you get long service times.


The setup I had should have prevented that since I (usually) had two worker threads per "processor" created. I was using auto-detect, so 4 was used for a concurrency value to the IOCP function. The code configuration I had posted was for only 1 concurrent thread for IOCP and 2 worker threads total, but I've done lots of testing with various settings and all yielded the same results.

Likewise, I played with the scavenger thread configuration to cut down on fast update times. I could possibly see iterating through 1000 connections being a decent amount of work, but the processor doesn't seem to mind in terms of reported CPU. The troubles experienced are outside that thread. I even took out the accept timeout connection detection which did take up a lot of time in the ScavengerThread since getsockopt was being called a lot of excessive times since I was never accepting as many connections as I had capacity for, but that too didn't seem to matter. That whole thread seems to be CPU bound anyways.

I've changed the IOCP model to how you described (which I've been testing both models, but only had the code represent one model at a time) and I get about the same results. As soon as I get around 550+ clients, sending 32 bytes/second, there is a longer service time between reads, which steadily grows which makes them auto-drop off (I d/c after 10s delay).

At this point, I don't think there's much I can do code wise to make it more efficient; I only have the minimal networking code done. What I'll spend this week doing is learning the basics of using VTune so I can make sure I'm not overlooking anything. Right now, looking at the call graph, it looks as expected. I'll put up some results later.

It's not that I'm worried about not being able to hit a specific performance metric, I just didn't want to design code that was wrong and base everything else on top of that code and have to redo it all from not making sure things were ok. I'll work from here and start adding in the normal operations associated with data processing. I'll then retest later on and run some more VTune analysis and look for areas of improvement if there are any.

I'll make another reply in a day or so after playing with some more logic added. Thanks for the help and replies so far everyone, I'm feeling much better now [smile] I'll put together some more data soon. On another side note, I see now I might have been to concerned with the small details in terms of performance. I ran 100 clients sending 1kb each 100ms and it churned through all the data fine. Yes, just about 1mb/s and CPU usage was 0 the whole time. In addition, only a few clients staggered and dc/ed from time between reads. While that amount of data is really "extreme", perhaps my scaling issue going up is with Windows and no the code itself (as Washu mentioned).

Here's the latest code after changes for anyone following along the thread who might also be interested in IOCP (note I'll be posting better code later on, this is just for the sake of keeping the thread from being a wall of text, as I so happen to do too often).


/*
A lot of resources were consulted and used in this code. Major resources
used include:
MSDN
http://win32.mvps.org/network/sockhim.html
Network Programming for Microsoft Windows
CodeProject's IOCP articles
http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx
http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx
http://www.codeproject.com/KB/IP/iocp.aspx
Larger blocks of comments are mostly from the tbe second reference.
I used comments from that project to help understand the particulars
of IOCP.
*/


#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
#include <list>
#include <map>
#include <vector>
#include <algorithm>
#include <iostream>

#pragma comment(lib, "ws2_32.lib")

// Logical states for the overlapped structure
const int HPS_CONNECTION_STATE_CLOSED = 0;
const int HPS_CONNECTION_STATE_ACCEPT = 1;
const int HPS_CONNECTION_STATE_READ = 2;
const int HPS_CONNECTION_STATE_WRITE = 3;

// Max bytes for the recv buffer
const int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 4096;

// Max bytes for the send buffer
const int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 4096;

// The size of the sockaddr_in parameter
const int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);

struct tHighPerformanceServerData;
struct tWorkerThreadData;
struct tWorkerThreadWrapperData
{
tHighPerformanceServerData * serverData;
tWorkerThreadData * threadData;
};

struct tConnectionGlobalData
{
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
SOCKET listenSocket;
HANDLE hCompletionPort;
DWORD dwNumberOfConcurrentThreads;
DWORD dwReadTimeTimeout;
DWORD dwWriteTimeTimeout;
DWORD dwAcceptTimeTimeout;
int initialReceiveSize;
LONG * plUidBase;

tConnectionGlobalData() :
lpfnAcceptEx(NULL),
listenSocket(INVALID_SOCKET),
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(0),
lpfnGetAcceptExSockaddrs(NULL),
dwReadTimeTimeout(-1),
dwWriteTimeTimeout(-1),
dwAcceptTimeTimeout(5000),
plUidBase(0),
initialReceiveSize(0)
{
}
};

struct tConnectionData
{
public:
OVERLAPPED overlapped;

DWORD dwUid;

SOCKET socket_;

sockaddr_in address;

WORD sendBufferSize;

BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE];
BYTE sendBufferData[HPS_OVERLAPPED_BUFFER_SEND_SIZE];

INT connectionState;

sockaddr_in localPeer;
sockaddr_in remotePeer;

DWORD dwLastReadTime;
DWORD dwLastWriteTime;

tConnectionGlobalData * globalDataPtr;

public:
tConnectionData() :
socket_(INVALID_SOCKET),
connectionState(HPS_CONNECTION_STATE_CLOSED),
sendBufferSize(0),
dwLastReadTime(0),
dwLastWriteTime(0),
dwUid(-1),
globalDataPtr(0)
{
memset(&overlapped, 0, sizeof(overlapped));
memset(&address, 0, sizeof(address));
memset(&localPeer, 0, sizeof(localPeer));
memset(&remotePeer, 0, sizeof(remotePeer));
}

~tConnectionData()
{
}

bool Initialize()
{
connectionState = HPS_CONNECTION_STATE_CLOSED;
if(socket_ != INVALID_SOCKET) // Prevent resource leaks
{
return Close(true, true);
}

socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(socket_ == INVALID_SOCKET)
{
// TODO: Handle error
return false;
}

DWORD numberOfBytes = 0; // Not used in this mode
if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE)
{
DWORD dwError = GetLastError();
if(dwError != ERROR_IO_PENDING)
{
closesocket(socket_);
socket_ = INVALID_SOCKET;

// TODO: Handle error
return false;
}
}

// Update the state the connection is in
connectionState = HPS_CONNECTION_STATE_ACCEPT;

// Success
return true;
}

bool Close(bool force, bool reuse)
{
if(socket_ != INVALID_SOCKET)
{
struct linger li = {0, 0};
if(force == true) // Default: SO_DONTLINGER
{
li.l_onoff = 1; // SO_LINGER, timeout = 0
}
setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li));
closesocket(socket_);
socket_ = INVALID_SOCKET;
}
connectionState = HPS_CONNECTION_STATE_CLOSED;
if(reuse == true)
{
return Initialize();
}
return true;
}

void ProcessIO(DWORD numberOfBytes)
{
switch(connectionState)
{
case HPS_CONNECTION_STATE_CLOSED:
{
// We won't need to do anything since the scavenger thread
// will handle this for us.
__asm nop
} break;

case HPS_CONNECTION_STATE_ACCEPT:
{
sockaddr_in *plocal = 0;
sockaddr_in *premote = 0;
int locallen = 0;
int remotelen = 0;

globalDataPtr->lpfnGetAcceptExSockaddrs(recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, (sockaddr **)&plocal, &locallen, (sockaddr **)&premote, &remotelen);

memcpy(&localPeer, plocal, sizeof(sockaddr_in));
memcpy(&remotePeer, premote, sizeof(sockaddr_in));

setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket));

// we still need to associate the newly connected socket to our IOCP:
HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads);
if(hResult != globalDataPtr->hCompletionPort)
{
// TODO: Handle error
return;
}

//printf("Incoming Connection - [%s:%i] to [%s:%i]\n", inet_ntoa(remotePeer.sin_addr), ntohs(remotePeer.sin_port), inet_ntoa(localPeer.sin_addr), ntohs(localPeer.sin_port));

dwLastReadTime = dwLastWriteTime = GetTickCount();
if(globalDataPtr->initialReceiveSize == 0)
{
// if we get here, then the initial receive size must have been 0, therefore n must be 0, with nothing at all in our receive buffer.
// we can therefore blithely allow SendReply() to overwrite n (and other things).
dwUid = InterlockedIncrement(globalDataPtr->plUidBase);

_snprintf_s((char*)sendBufferData, HPS_OVERLAPPED_BUFFER_SEND_SIZE, HPS_OVERLAPPED_BUFFER_SEND_SIZE - 1, "Hello, world.\r\n");
sendBufferSize = 1 + strlen((char*)sendBufferData);
PostWrite();
}
else
{
// We received something during AcceptEx()
if(numberOfBytes != 0)
{
dwUid = InterlockedIncrement(globalDataPtr->plUidBase);
dwLastReadTime = GetTickCount();
//
// TODO: Process Data
//
PostRead();
return;
}

// we should never get here: if SEND_BANNER_FIRST was undefined,
// then AcceptEx() was told to receive at least one byte, and it
// would not have returned without that byte (unless the scavenger
// force-closed the socket, in which case the AcceptEx() result
// would have been an error, handled before ever calling DoIo().

// TODO: Handle error
__asm nop
}
} break;

case HPS_CONNECTION_STATE_READ:
{
if(numberOfBytes == SOCKET_ERROR)
{
// TODO: Log error
Close(true, true);
return;
}
else if(numberOfBytes == 0) // connection closing?
{
// TODO: Log error
Close(false, true);
return;
}
dwLastReadTime = GetTickCount();
// TODO: Process data here
PostRead();
} break;

case HPS_CONNECTION_STATE_WRITE: // This state needs improvement
{
dwLastWriteTime = GetTickCount();
if(numberOfBytes == SOCKET_ERROR)
{
// TODO: Log error
Close(true, true);
return;
}
else if(numberOfBytes == 0) // connection closing?
{
// TODO: Log error
Close(false, true);
return;
}
PostRead();
} break;
}
}

void PostRead()
{
connectionState = HPS_CONNECTION_STATE_READ;
WSABUF recvBufferDescriptor = {HPS_OVERLAPPED_BUFFER_RECV_SIZE, (char*)recvBufferData};
DWORD numberOfBytes = 0;
DWORD recvFlags = 0;
BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, 0);
if(result != SOCKET_ERROR) // everything is OK
{
return;
}
else
{
if(GetLastError() != ERROR_IO_PENDING)
{
// TODO: Handle error
Close(true, true);
}
else
{
// (else branch intentionally empty)
// if we get here, gle == ERROR_IO_PENDING, which is fine by me
}
}
}

void PostWrite() // TODO: Rework this logic
{
BOOL result;
DWORD numberOfBytes = 0;
WSABUF sendBufferDescriptor;
connectionState = HPS_CONNECTION_STATE_WRITE;
sendBufferDescriptor.len = sendBufferSize;
sendBufferDescriptor.buf = (char*)sendBufferData;
result = WSASend(socket_, &sendBufferDescriptor, 1, &numberOfBytes, 0, &overlapped, 0);
result = (result != SOCKET_ERROR); // WSASend() uses inverted logic wrt/WriteFile()
if(!result)
{
DWORD err = WSAGetLastError();
if(err != ERROR_IO_PENDING)
{
// TODO: Handle error
Close(true, true);

// the fall-through does nothing because the caller
// shouldn't post another read -- the reinitialized
// socket has an AcceptEx() pending
}
else
{
// (else branch intentionally empty)

// if we get here, gle == ERROR_IO_PENDING; nothing
// left to do but return. Caller loops back to GQCS().
}
}
else // WriteFile()
{
// the write completed immediately
// this doesn't bother us -- we will still
// get the completion packet
}
}
};

struct tWorkerThreadData
{
public:
HANDLE hThread;
DWORD dwThreadId;

public:
tWorkerThreadData() :
hThread(INVALID_HANDLE_VALUE),
dwThreadId(0)
{
}

~tWorkerThreadData()
{
}
};

struct tHighPerformanceServerData
{
public:
WORD wPort;

int backLog;

HANDLE hCompletionPort;

DWORD dwNumberOfConcurrentThreads;
DWORD dwNumberOfWorkerThreads;

LONG lRunningWorkerThreadCount;

SOCKET sListenSocket;

SOCKADDR_IN saInternetAddr;

GUID GuidAcceptEx;
LPFN_ACCEPTEX lpfnAcceptEx;

GUID GuidGetAcceptExSockaddrs;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;

CRITICAL_SECTION workerThreadCS;
std::list<tWorkerThreadData *> workerThreads;

DWORD dwInitialConnectionPoolCount;
std::list<tConnectionData *> connectionPool;

HANDLE hScavengerThread;
DWORD dwScavengerThreadId;
DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger
HANDLE hScavengerExitEvent; // tells scavenger thread when to die

DWORD dwWorkerThreadScaleValue;

LONG lUidBase;

tConnectionGlobalData globalData;

public:
tHighPerformanceServerData() :
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(1),
dwNumberOfWorkerThreads(1),
lRunningWorkerThreadCount(0),
sListenSocket(INVALID_SOCKET),
wPort(0),
lpfnAcceptEx(NULL),
lpfnGetAcceptExSockaddrs(NULL),
dwInitialConnectionPoolCount(200),
dwScavengerDelay(1000),
hScavengerExitEvent(NULL),
hScavengerThread(INVALID_HANDLE_VALUE),
dwScavengerThreadId(0),
dwWorkerThreadScaleValue(1),
lUidBase(0),
backLog(SOMAXCONN)
{
GUID guidAcceptEx = WSAID_ACCEPTEX;
memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx));

GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs));

InitializeCriticalSection(&workerThreadCS);
}

~tHighPerformanceServerData()
{
DeleteCriticalSection(&workerThreadCS);
}

int WorkerThread()
{
BOOL result = 0;
DWORD numberOfBytes = 0;
ULONG key = 0;
OVERLAPPED * lpOverlapped = 0;
InterlockedIncrement(&lRunningWorkerThreadCount);
while(true)
{
tConnectionData * connectionData = 0;
InterlockedDecrement(&lRunningWorkerThreadCount);
result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE);
if(key == -1)
{
break; // Time to exit the worker thread
}
connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped);
if(connectionData == 0)
{
// TODO: Handle error
continue;
}
InterlockedIncrement(&lRunningWorkerThreadCount);
if(result == TRUE)
{
// We have an I/O to process
connectionData->ProcessIO(numberOfBytes);
}
else
{
// Close this socket and make space for a new one if we are still listening
connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true));
}
}
return 0;
}

int ScavengerThread()
{
while(true)
{
int count[4] = {0};
std::list<tConnectionData *>::iterator itr = connectionPool.begin();
while(itr != connectionPool.end())
{
tConnectionData * connection = (*itr);
count[connection->connectionState]++;

// AcceptEx() called, but no completion yet
if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT)
{
// determine if the socket is connected
int seconds = 0;
int length = sizeof(seconds);
if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length))
{
if(seconds != -1)
{
seconds *= 1000;
if(seconds > (int)globalData.dwAcceptTimeTimeout)
{
printf("[%i][Accept] idle timeout after %i ms.\n", connection->dwUid, seconds);

// closesocket() here causes an immediate IOCP notification with an error indication;
// that will cause our worker thread to call Close().
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}

// No connection made on this socket yet
else if(seconds == -1)
{
// Nothing to do
}
}
}

// At times, a connection can fail and not be able to be reconnected due to connection
// flooding. This check will reclaim our connections that are locked in the closed state
// and never get a chance to recover.
else if(connection->connectionState == HPS_CONNECTION_STATE_CLOSED)
{
connection->Close(true, true);
}

// The client is in a read or write state, doesn't matter which. We want to make sure
// activity still exists as desired.
else
{
bool doClose = false;
DWORD tick = GetTickCount();
DWORD dwLastTime;

dwLastTime = tick - connection->dwLastReadTime;
if(dwLastTime > globalData.dwReadTimeTimeout)
{
printf("[%i][Read] idle timeout after %i ms.\n", connection->dwUid, dwLastTime);
doClose = true;
}
else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5))
{
printf("[%i][Read] %i ms\n", connection->dwUid, dwLastTime);
}

dwLastTime = tick - connection->dwLastWriteTime;
if(dwLastTime > globalData.dwWriteTimeTimeout)
{
printf("[%X][Write] idle timeout after %i ms.\n", connection->dwUid, dwLastTime);
doClose = true;
}
else if(dwLastTime > ((float)globalData.dwWriteTimeTimeout * .5))
{
printf("[%i][Write] %i ms\n", connection->dwUid, dwLastTime);
}

if(doClose)
{
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}
itr++;
}
printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]);

// Pause until next run due
DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay);
if(result != WAIT_TIMEOUT)
{
break;
}
}
return 0;
}

DWORD AddConnectionsToPool(long count)
{
// We cannot add more connections once the server has started
if(hScavengerThread != INVALID_HANDLE_VALUE)
{
return 0;
}
DWORD total = 0;
for(long index = 0; index < count; ++index)
{
tConnectionData * connection = new tConnectionData;
connection->globalDataPtr = &globalData;
bool result = connection->Initialize();
if(result == true)
{
connectionPool.push_back(connection);
total++;
}
else
{
// TODO: Handle error
delete connection;
}
}
return total;
}
};

DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam)
{
tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam;
DWORD dwResult = data->serverData->WorkerThread();

LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS;

EnterCriticalSection(pCS);
std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin();
while(itr != data->serverData->workerThreads.end())
{
tWorkerThreadData * td = (*itr);
if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread)
{
printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId);
data->serverData->workerThreads.erase(itr);
break;
}
itr++;
}

delete data->threadData;
delete data;

LeaveCriticalSection(pCS);

return dwResult;
}

DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam)
{
return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();
}

bool InitializeWinsock()
{
WSADATA wd = { 0 };
if(WSAStartup(MAKEWORD(2, 2), &wd) != 0)
{
// TODO: Handle error
return false;
}
if(LOBYTE( wd.wVersion ) < 2)
{
WSACleanup();
// TODO: Handle error
return false;
}
return true;
}

void DeinitializeWinsock()
{
WSACleanup();
}


// Our high performance server :)
class cHighPerformanceServer
{
private:
tHighPerformanceServerData * internalData;

public:
cHighPerformanceServer()
{
internalData = new tHighPerformanceServerData;
}

~cHighPerformanceServer()
{
delete internalData;
}

bool Create(unsigned short port)
{
// Get the system information
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);

// Try to create an I/O completion port
internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads);
if(internalData->hCompletionPort == NULL)
{
// TODO: Log error
Destroy();
return false;
}

// Calculate how many worker threads we should create to process IOCP events
DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads;
if(internalData->dwNumberOfWorkerThreads == 0)
{
if(internalData->dwNumberOfConcurrentThreads == 0)
{
dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue;
}
else
{
dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue;
}
}

// Create the worker threads!
DWORD dwWorkerTotal = AddWorkerThreads(dwNumberOfWorkerThreads);
if(dwWorkerTotal != dwNumberOfWorkerThreads)
{
// TODO: Log error
Destroy();
return false;
}

internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(internalData->sListenSocket == INVALID_SOCKET)
{
// TODO: Log error
Destroy();
return false;
}

// Bind the socket to the port
internalData->wPort = port;
internalData->saInternetAddr.sin_family = AF_INET;
internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
internalData->saInternetAddr.sin_port = htons(internalData->wPort);
int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr));
if(bindResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

int listenResult = listen(internalData->sListenSocket, internalData->backLog);
if(listenResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

DWORD dwBytes = 0;
int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx,
sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

dwBytes = 0;
ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs,
sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

// Assign the global data for our connections
internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx;
internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs;
internalData->globalData.listenSocket = internalData->sListenSocket;
internalData->globalData.hCompletionPort = internalData->hCompletionPort;
internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads;
internalData->globalData.plUidBase = &internalData->lUidBase;
internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable
internalData->globalData.dwWriteTimeTimeout = -1; // TODO: Variable
internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable
internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx
// If we wanted to accept data from AcceptEx
//internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2);

DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount);
if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount)
{
// TODO: Log error
Destroy();
return false;
}

// Connect the listener socket to IOCP
if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0)
{
// TODO: Log error
Destroy();
return false;
}

internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0);
if(internalData->hScavengerExitEvent == NULL)
{
// TODO: Log error
Destroy();
return false;
}
internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId);
if(internalData->hScavengerThread == NULL)
{
// TODO: Log error
Destroy();
return false;
}
DWORD dwResult = ResumeThread(internalData->hScavengerThread);
if(dwResult == (DWORD)-1)
{
// TODO: Log error
Destroy();
__asm nop
}

// Success!
return true;
}

void Destroy()
{
if(internalData->hScavengerExitEvent != NULL)
{
SetEvent(internalData->hScavengerExitEvent);
if(internalData->hScavengerThread != INVALID_HANDLE_VALUE)
{
int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2);
if(result != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
CloseHandle(internalData->hScavengerThread);
internalData->hScavengerThread = INVALID_HANDLE_VALUE;
}
CloseHandle(internalData->hScavengerExitEvent);
internalData->hScavengerExitEvent = NULL;
}

if(internalData->sListenSocket != INVALID_SOCKET)
{
closesocket(internalData->sListenSocket);
internalData->sListenSocket = INVALID_SOCKET;
}

std::vector<HANDLE> workerThreadHandles;
std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin();
while(itr != internalData->workerThreads.end())
{
workerThreadHandles.push_back((*itr)->hThread);
itr++;
}

//
if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
EnterCriticalSection(&internalData->workerThreadCS);
size_t count = internalData->workerThreads.size();
for(size_t x = 0; x < count; ++x)
{
PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);
}
LeaveCriticalSection(&internalData->workerThreadCS);
}

// Wait for all worker threads to close
for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS)
{
DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x);
DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000);
if(dwResult != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
}

if(internalData->workerThreads.size())
{
// TODO: Log error
printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size());
}

if(internalData->connectionPool.size())
{
std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin();
while(itr != internalData->connectionPool.end())
{
closesocket((*itr)->socket_);
delete (*itr);
itr++;
}
internalData->connectionPool.clear();
}

if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
CloseHandle(internalData->hCompletionPort);
internalData->hCompletionPort = INVALID_HANDLE_VALUE;
}
}

DWORD AddWorkerThreads(DWORD count)
{
DWORD total = 0;
for(DWORD index = 0; index < count; ++index)
{
tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData;
tWorkerThreadData * threadData = new tWorkerThreadData;
threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId);
if(threadData->hThread != NULL)
{
total++;
EnterCriticalSection(&internalData->workerThreadCS);
internalData->workerThreads.push_back(threadData);
LeaveCriticalSection(&internalData->workerThreadCS);

workerThreadData->serverData = internalData;
workerThreadData->threadData = threadData;

DWORD dwResult = ResumeThread(threadData->hThread);
if(dwResult == (DWORD)-1)
{
// TODO: Handle error
__asm nop
}
}
else
{
delete workerThreadData;
delete threadData;
}
}
return total;
}

void RemoveWorkerThreads(DWORD count)
{
EnterCriticalSection(&internalData->workerThreadCS);
count = min(count, internalData->workerThreads.size());
for(DWORD index = 0; index < count; ++index)
{
// Signal one worker to exit, whomever is unlucky enough to process this request dies
PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);
}
LeaveCriticalSection(&internalData->workerThreadCS);
}
};

HANDLE exitEvent = 0;

BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent)
{
switch (ConsoleEvent)
{
case CTRL_LOGOFF_EVENT:
case CTRL_C_EVENT:
case CTRL_BREAK_EVENT:
case CTRL_CLOSE_EVENT:
case CTRL_SHUTDOWN_EVENT:
{
if(exitEvent != 0)
{
SetEvent(exitEvent);
return TRUE;
}
}
}
return FALSE;
}

int main(int argc, char * argv[])
{
printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData));

if(InitializeWinsock() == false)
return 0;

cHighPerformanceServer server;
if(server.Create(15779) == false)
{
return 0;
}

exitEvent = CreateEvent(0, TRUE, FALSE, 0);

SetConsoleCtrlHandler(ConsoleHandler, TRUE);

WaitForSingleObject(exitEvent, INFINITE);
SetConsoleCtrlHandler(ConsoleHandler, FALSE);

server.Destroy();

DeinitializeWinsock();

CloseHandle(exitEvent);

return 0;
}



#10 hplus0603   Moderators   -  Reputation: 6024

Like
0Likes
Like

Posted 31 March 2009 - 04:30 AM

Quote:
posting a 0 byte read to all connections to prevent any non-paged memory buffers from being locked


When the data comes in, the kernel needs to put the data somewhere. That somewhere needs to, typically, be in locked pages anyway, because the data is shuffled at DPC level or higher. If you provide properly aligned and lockable buffers in the read, the kernel can read straight into your buffer, instead of going through a separate buffer, so a 0-byte read is by no means a guaranteed win IMO.

What kind of CPU and how much RAM are you using for this test server?

#11 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 31 March 2009 - 09:05 AM

Quote:
If you provide properly aligned and lockable buffers in the read, the kernel can read straight into your buffer, instead of going through a separate buffer, so a 0-byte read is by no means a guaranteed win IMO.


Ah, ok, that makes sense. Thanks for that explanation. I went ahead and just switched to the regular way of passing in the buffer and size to WSARecv. I also reread the topic on Scalable Server Architecture and see I was misinterpreting some of the information there; I should also not touch the SO_SNDBUF/SO_RCVBUF options on a socket since I will always have overlapped reads posted on a socket in this setup. I can see now in other setups, that might not be the case.

Quote:
What kind of CPU and how much RAM are you using for this test server?


Here is information from CPU-Z (I'm running XP 32bit though, so 3.5gb is usable)



In terms of the test:
* Program is ran in release mode.
* The Initial memory is: 8.1mb (1000 preallocated connections)
* 500 connections from one laptop via LAN

Server:
* Memory usage to 9.5mb
* ~42kb/s traffic in to the server (reported by NetNeter)
* ~28kb/s traffic out of the server (not sure why though)

Clients
* ~42kb/s total traffic out
* ~28kb/s traffic in to the clients (not sure why either)
* Each client takes up 2.3mb on laptop, virtually no CPU time
* CPU utilization on laptop is ~2-4% from other system applications running
* Each client sends 32 bytes and Sleeps 1 second, 32 bytes/s output consistently

Results:
* Up to the ~500 connection mark, there is no service time of > 5s. As soon as 500 is hit, one or two will fire occasionally. If I try adding 100 more or so, more begin to have longer service times.

I also ran Wireshark during the test and I think I might see the real problem at hand. I ran a capture for just under 2 minutes to get the 500 clients connected (takes about 70s on one computer) and then added 100 more on another computer until they started timing out.

I applied a filter to the traffic, "tcp.analysis.ack_rtt > 1 && tcp.dstport == 15779" and starting around 50 seconds (which a little over 450 clients would be connected) the RTT to ACK for the packets raises to 1.5s. Towards the 80s mark, where a little over 600 clients would be connected, there are a whole bunch of "retransmissions" (lines that are black and red in Wireshark) and their RTT to ACK is 2s. Getting towards the 90s mark, there are a couple of entries that hit a RTT to ACK of 3s!

That would explain why the service time is gradually getting longer as more connections are being added and clients sporadically disconnect. I was running 500 clients per laptop, which it seems the network can't handle from one source.

If I split the test up into 2 x 250 connection parts and watch Wireshark, I see far fewer retransmits and never get any notifications of the delayed reads. If I try running 400 clients per computer, then right towards the 600 mark, I start getting more retransmits and longer service time delays in my program. As I hit almost 800 connections, the retransmits were filling up Wireshark and most of the connections were failing in my program due to longer service times.

I see now that my code seems to be more than suitable for handling a lot more connection and traffic, but my network and my current test setup is not. What would you suggest is the best way to go about testing code like this in general to avoid problems like this? I mean, when you are in an early stage and want to test the upper bounds of your code but have nothing much to lure random testers in and you need lots of traffic, do you just have to wait?

Let's say I wanted to try and pull off a larger test, would the problem lie in my router? I.e., if I setup a dedicated server to run on at home and got let's say 10 or so older computers (you know those P4 512mb Dell Optiplex ones) connected via LAN, do you think the router still couldn't handle it or are the computers themselves the problem?

Thanks for your continued help [smile]

Code wise, I've cleaned up the code a bit and fixed a few things that were no longer necessary. This code still is far from usable for anything serious, but I'm just adding it for anyone who stumbles upon the thread:

/*
A lot of resources were consulted and used in this code. Major resources
used include:
MSDN
http://win32.mvps.org/network/sockhim.html
Network Programming for Microsoft Windows
CodeProject's IOCP articles
http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx
http://www.codeproject.com/KB/IP/SimpleIOCPApp.aspx
http://www.codeproject.com/KB/IP/iocp.aspx
Larger blocks of comments are mostly from the tbe second reference.
I used comments from that project to help understand the particulars
of IOCP.
*/


#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
#include <list>
#include <vector>
#include <algorithm>
#include <iostream>

#pragma comment(lib, "ws2_32.lib")

HANDLE hPacketProcessingThread = INVALID_HANDLE_VALUE;

// Logical states for the overlapped structure
const int HPS_CONNECTION_STATE_CLOSED = 0;
const int HPS_CONNECTION_STATE_ACCEPT = 1;
const int HPS_CONNECTION_STATE_READ = 2;

// Max bytes for the recv buffer
const int HPS_OVERLAPPED_BUFFER_RECV_SIZE = 8192;

// Max bytes for the send buffer
const int HPS_OVERLAPPED_BUFFER_SEND_SIZE = 8192;

// The size of the sockaddr_in parameter
const int HPS_SOCKADDR_SIZE = (sizeof(SOCKADDR_IN) + 16);

DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam);
DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam);

struct tHighPerformanceServerData;
struct tWorkerThreadData;
struct tWorkerThreadWrapperData
{
tHighPerformanceServerData * serverData;
tWorkerThreadData * threadData;
};

struct tConnectionLocalData
{
DWORD dwUid;

tConnectionLocalData() :
dwUid(-1)
{
}

~tConnectionLocalData()
{
}
};

struct tConnectionGlobalData
{
LPFN_ACCEPTEX lpfnAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;
SOCKET listenSocket;
HANDLE hCompletionPort;
DWORD dwNumberOfConcurrentThreads;
DWORD dwReadTimeTimeout;
DWORD dwAcceptTimeTimeout;
int initialReceiveSize;

tConnectionGlobalData() :
lpfnAcceptEx(NULL),
listenSocket(INVALID_SOCKET),
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(0),
lpfnGetAcceptExSockaddrs(NULL),
dwReadTimeTimeout(-1),
dwAcceptTimeTimeout(5000),
initialReceiveSize(0)
{
}
};

struct tConnectionData
{
public:
OVERLAPPED overlapped;

SOCKET socket_;

sockaddr_in address;

WORD sendBufferSize;

BYTE recvBufferData[HPS_OVERLAPPED_BUFFER_RECV_SIZE];

INT connectionState;

DWORD dwLastReadTime;

tConnectionGlobalData * globalDataPtr;
tConnectionLocalData * localDataPtr;

public:
tConnectionData(tConnectionGlobalData * gblData) :
socket_(INVALID_SOCKET),
connectionState(HPS_CONNECTION_STATE_CLOSED),
sendBufferSize(0),
dwLastReadTime(0),
globalDataPtr(gblData),
localDataPtr(0)
{
memset(&overlapped, 0, sizeof(overlapped));
memset(&address, 0, sizeof(address));
localDataPtr = new tConnectionLocalData;
}

~tConnectionData()
{
delete localDataPtr;
}

bool Initialize()
{
connectionState = HPS_CONNECTION_STATE_CLOSED;
if(socket_ != INVALID_SOCKET) // Prevent resource leaks
{
return Close(true, true);
}

socket_ = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(socket_ == INVALID_SOCKET)
{
// TODO: Handle error
return false;
}

// We still need to associate the newly connected socket to our IOCP:
HANDLE hResult = CreateIoCompletionPort((HANDLE)socket_, globalDataPtr->hCompletionPort, 0, globalDataPtr->dwNumberOfConcurrentThreads);
if(hResult != globalDataPtr->hCompletionPort)
{
// TODO: Handle error
return false;
}

DWORD numberOfBytes = 0; // Not used in this mode
if(globalDataPtr->lpfnAcceptEx(globalDataPtr->listenSocket, socket_, recvBufferData, globalDataPtr->initialReceiveSize, HPS_SOCKADDR_SIZE, HPS_SOCKADDR_SIZE, &numberOfBytes, &overlapped) == FALSE)
{
DWORD dwError = GetLastError();
if(dwError != ERROR_IO_PENDING)
{
closesocket(socket_);
socket_ = INVALID_SOCKET;

// TODO: Handle error
return false;
}
}

// Update the state the connection is in
connectionState = HPS_CONNECTION_STATE_ACCEPT;

// Success
return true;
}

bool Close(bool force, bool reuse)
{
if(socket_ != INVALID_SOCKET)
{
struct linger li = {0, 0};
if(force == true) // Default: SO_DONTLINGER
{
li.l_onoff = 1; // SO_LINGER, timeout = 0
}
setsockopt(socket_, SOL_SOCKET, SO_LINGER, (char *)&li, sizeof(li));
closesocket(socket_);
socket_ = INVALID_SOCKET;
}
connectionState = HPS_CONNECTION_STATE_CLOSED;
if(reuse == true)
{
return Initialize();
}
return true;
}

void ProcessIO(DWORD numberOfBytes)
{
if(connectionState == HPS_CONNECTION_STATE_READ)
{
if(numberOfBytes == SOCKET_ERROR)
{
// TODO: Log error
Close(true, true);
return;
}
else if(numberOfBytes == 0) // connection closing?
{
// TODO: Log error
Close(false, true);
return;
}

dwLastReadTime = GetTickCount();

//
// TODO: Process data sent from the client here
//

PostRead();
}

else if(connectionState == HPS_CONNECTION_STATE_ACCEPT)
{
// On Windows XP and later, once the AcceptEx function completes and the SO_UPDATE_ACCEPT_CONTEXT option is set on the accepted socket,
// the local address associated with the accepted socket can also be retrieved using the getsockname function. Likewise, the remote
// address associated with the accepted socket can be retrieved using the getpeername function.
setsockopt(socket_, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&globalDataPtr->listenSocket, sizeof(globalDataPtr->listenSocket));

dwLastReadTime = GetTickCount();
if(globalDataPtr->initialReceiveSize != 0)
{
//
// TODO: Process data sent from a ConnectEx call here
//
}

// We are ready to start receiving from the client
PostRead();
}
}

// This function will post a read operation on the socket. This means that an IOCP event
// notification will be fired when the socket has data available for reading to it.
void PostRead()
{
connectionState = HPS_CONNECTION_STATE_READ;
WSABUF recvBufferDescriptor = {HPS_OVERLAPPED_BUFFER_RECV_SIZE, (char *)recvBufferData};
DWORD numberOfBytes = 0;
DWORD recvFlags = 0;
BOOL result = WSARecv(socket_, &recvBufferDescriptor, 1, &numberOfBytes, &recvFlags, &overlapped, NULL);
if(result == SOCKET_ERROR)
{
if(GetLastError() != ERROR_IO_PENDING)
{
// TODO: Handle error
Close(true, true);
}
}
}
};

struct tWorkerThreadData
{
public:
HANDLE hThread;
DWORD dwThreadId;

public:
tWorkerThreadData() :
hThread(INVALID_HANDLE_VALUE),
dwThreadId(0)
{
}

~tWorkerThreadData()
{
}
};

struct tHighPerformanceServerData
{
public:
WORD wPort;

int backLog;

HANDLE hCompletionPort;

DWORD dwNumberOfConcurrentThreads;
DWORD dwNumberOfWorkerThreads;

LONG lRunningWorkerThreadCount;

SOCKET sListenSocket;

SOCKADDR_IN saInternetAddr;

GUID GuidAcceptEx;
LPFN_ACCEPTEX lpfnAcceptEx;

GUID GuidGetAcceptExSockaddrs;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs;

CRITICAL_SECTION workerThreadCS;
std::list<tWorkerThreadData *> workerThreads;

DWORD dwInitialConnectionPoolCount;
std::list<tConnectionData *> connectionPool;

HANDLE hScavengerThread;
DWORD dwScavengerThreadId;
DWORD dwScavengerDelay; // milliseconds between runs of the idle socket scavenger
HANDLE hScavengerExitEvent; // tells scavenger thread when to die

DWORD dwWorkerThreadScaleValue;

tConnectionGlobalData globalData;

public:
tHighPerformanceServerData() :
hCompletionPort(INVALID_HANDLE_VALUE),
dwNumberOfConcurrentThreads(0),
dwNumberOfWorkerThreads(0),
lRunningWorkerThreadCount(0),
sListenSocket(INVALID_SOCKET),
wPort(0),
lpfnAcceptEx(NULL),
lpfnGetAcceptExSockaddrs(NULL),
dwInitialConnectionPoolCount(1000),
dwScavengerDelay(1000),
hScavengerExitEvent(NULL),
hScavengerThread(INVALID_HANDLE_VALUE),
dwScavengerThreadId(0),
dwWorkerThreadScaleValue(1),
backLog(SOMAXCONN)
{
GUID guidAcceptEx = WSAID_ACCEPTEX;
memcpy(&GuidAcceptEx, &guidAcceptEx, sizeof(guidAcceptEx));

GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
memcpy(&GuidGetAcceptExSockaddrs, &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs));

InitializeCriticalSection(&workerThreadCS);
}

~tHighPerformanceServerData()
{
DeleteCriticalSection(&workerThreadCS);
}

int WorkerThread()
{
BOOL result = 0;
DWORD numberOfBytes = 0;
ULONG key = 0;
OVERLAPPED * lpOverlapped = 0;
InterlockedIncrement(&lRunningWorkerThreadCount);
while(true)
{
tConnectionData * connectionData = 0;
InterlockedDecrement(&lRunningWorkerThreadCount);
result = GetQueuedCompletionStatus(hCompletionPort, &numberOfBytes, &key, &lpOverlapped, INFINITE);
if(key == -1)
{
break; // Time to exit the worker thread
}
connectionData = CONTAINING_RECORD(lpOverlapped, tConnectionData, overlapped);
if(connectionData == 0)
{
// TODO: Handle error
continue;
}
InterlockedIncrement(&lRunningWorkerThreadCount);
if(result == TRUE)
{
// We have an I/O to process
connectionData->ProcessIO(numberOfBytes);
}
else
{
// Close this socket and make space for a new one if we are still listening
connectionData->Close(true, ((sListenSocket == INVALID_SOCKET) ? false : true));
}
}
return 0;
}

int ScavengerThread()
{
while(true)
{
int count[4] = {0};
std::list<tConnectionData *>::iterator itr = connectionPool.begin();
while(itr != connectionPool.end())
{
tConnectionData * connection = (*itr);
count[connection->connectionState]++;

// AcceptEx() called, but no completion yet
if(connection->connectionState == HPS_CONNECTION_STATE_ACCEPT)
{
// determine if the socket is connected
int seconds = 0;
int length = sizeof(seconds);
if(0 == getsockopt(connection->socket_, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &length))
{
if(seconds != -1)
{
seconds *= 1000;
if(seconds > (int)globalData.dwAcceptTimeTimeout)
{
printf("[%i][Accept] idle timeout after %i ms.\n", connection->socket_, seconds);

// closesocket() here causes an immediate IOCP notification with an error indication;
// that will cause our worker thread to call Close().
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}

// No connection made on this socket yet
else if(seconds == -1)
{
// Nothing to do
}
}
}

// The client is in a read or write state, doesn't matter which. We want to make sure
// activity still exists as desired.
else
{
bool doClose = false;
DWORD tick = GetTickCount();

DWORD dwLastTime = tick - connection->dwLastReadTime;
if(dwLastTime > globalData.dwReadTimeTimeout)
{
printf("[%i][Read] idle timeout after %i ms.\n", connection->socket_, dwLastTime);
doClose = true;
}
else if(dwLastTime > ((float)globalData.dwReadTimeTimeout * .5))
{
printf("[%i][Read] %i ms\n", connection->socket_, dwLastTime);
}

if(doClose)
{
closesocket(connection->socket_);
connection->socket_ = INVALID_SOCKET;
connection->connectionState = HPS_CONNECTION_STATE_CLOSED;
}
}
itr++;
}
printf("[Closed]: %.4i [Accept]: %.4i [Read]: %.4i [Write]: %.4i\n", count[0], count[1], count[2], count[3]);

// Pause until next run due
DWORD result = WaitForSingleObject(hScavengerExitEvent, dwScavengerDelay);
if(result != WAIT_TIMEOUT)
{
break;
}
}
return 0;
}

DWORD AddConnectionsToPool(long count)
{
// We cannot add more connections once the server has started
if(hScavengerThread != INVALID_HANDLE_VALUE)
{
return 0;
}
DWORD total = 0;
for(long index = 0; index < count; ++index)
{
tConnectionData * connection = new tConnectionData(&globalData);
bool result = connection->Initialize();
if(result == true)
{
connectionPool.push_back(connection);
total++;
}
else
{
// TODO: Handle error
delete connection;
}
}
return total;
}

DWORD AddWorkerThreads(DWORD count)
{
DWORD total = 0;
for(DWORD index = 0; index < count; ++index)
{
tWorkerThreadWrapperData * workerThreadData = new tWorkerThreadWrapperData;
tWorkerThreadData * threadData = new tWorkerThreadData;
threadData->hThread = CreateThread(NULL, 0, WorkerThreadWrapper, workerThreadData, CREATE_SUSPENDED, &threadData->dwThreadId);
if(threadData->hThread != NULL)
{
total++;
EnterCriticalSection(&workerThreadCS);
workerThreads.push_back(threadData);
LeaveCriticalSection(&workerThreadCS);

workerThreadData->serverData = this;
workerThreadData->threadData = threadData;

DWORD dwResult = ResumeThread(threadData->hThread);
if(dwResult == (DWORD)-1)
{
// TODO: Handle error
__asm nop
}
}
else
{
delete workerThreadData;
delete threadData;
}
}
return total;
}
};

DWORD WINAPI WorkerThreadWrapper(LPVOID lpParam)
{
tWorkerThreadWrapperData * data = (tWorkerThreadWrapperData *)lpParam;

DWORD dwResult = data->serverData->WorkerThread();

LPCRITICAL_SECTION pCS = &data->serverData->workerThreadCS;

EnterCriticalSection(pCS);
std::list<tWorkerThreadData *>::iterator itr = data->serverData->workerThreads.begin();
while(itr != data->serverData->workerThreads.end())
{
tWorkerThreadData * td = (*itr);
if(td->dwThreadId == data->threadData->dwThreadId && td->hThread == data->threadData->hThread)
{
printf("Removing worker thread [%X][%X]\n", data->threadData->hThread, data->threadData->dwThreadId);
data->serverData->workerThreads.erase(itr);
break;
}
itr++;
}

delete data->threadData;
delete data;

LeaveCriticalSection(pCS);

return dwResult;
}

DWORD WINAPI ScavengerThreadWrapper(LPVOID lpParam)
{
return ((tHighPerformanceServerData *)lpParam)->ScavengerThread();
}

bool InitializeWinsock()
{
WSADATA wd = { 0 };
if(WSAStartup(MAKEWORD(2, 2), &wd) != 0)
{
// TODO: Handle error
return false;
}
if(LOBYTE( wd.wVersion ) < 2)
{
WSACleanup();
// TODO: Handle error
return false;
}
return true;
}

void DeinitializeWinsock()
{
WSACleanup();
}

// Our high performance server :)
class cHighPerformanceServer
{
private:
tHighPerformanceServerData * internalData;

public:
cHighPerformanceServer()
{
internalData = new tHighPerformanceServerData;
}

~cHighPerformanceServer()
{
delete internalData;
}

bool Create(unsigned short port)
{
// Get the system information
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);

// Try to create an I/O completion port
internalData->hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, internalData->dwNumberOfConcurrentThreads);
if(internalData->hCompletionPort == NULL)
{
// TODO: Log error
Destroy();
return false;
}

// Calculate how many worker threads we should create to process IOCP events
DWORD dwNumberOfWorkerThreads = internalData->dwNumberOfWorkerThreads;
if(internalData->dwNumberOfWorkerThreads == 0)
{
if(internalData->dwNumberOfConcurrentThreads == 0)
{
dwNumberOfWorkerThreads = SystemInfo.dwNumberOfProcessors * internalData->dwWorkerThreadScaleValue;
}
else
{
dwNumberOfWorkerThreads = internalData->dwNumberOfConcurrentThreads * internalData->dwWorkerThreadScaleValue;
}
}

// Create the worker threads!
DWORD dwWorkerTotal = internalData->AddWorkerThreads(dwNumberOfWorkerThreads);
if(dwWorkerTotal != dwNumberOfWorkerThreads)
{
// TODO: Log error
Destroy();
return false;
}

internalData->sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(internalData->sListenSocket == INVALID_SOCKET)
{
// TODO: Log error
Destroy();
return false;
}

// Bind the socket to the port
internalData->wPort = port;
internalData->saInternetAddr.sin_family = AF_INET;
internalData->saInternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
internalData->saInternetAddr.sin_port = htons(internalData->wPort);
int bindResult = bind(internalData->sListenSocket, (PSOCKADDR) &internalData->saInternetAddr, sizeof(internalData->saInternetAddr));
if(bindResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

int listenResult = listen(internalData->sListenSocket, internalData->backLog);
if(listenResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

DWORD dwBytes = 0;
int ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidAcceptEx, sizeof(internalData->GuidAcceptEx), &internalData->lpfnAcceptEx,
sizeof(internalData->lpfnAcceptEx), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

dwBytes = 0;
ioctlResult = WSAIoctl(internalData->sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&internalData->GuidGetAcceptExSockaddrs, sizeof(internalData->GuidGetAcceptExSockaddrs), &internalData->lpfnGetAcceptExSockaddrs,
sizeof(internalData->lpfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL);
if(ioctlResult == SOCKET_ERROR)
{
// TODO: Log error
Destroy();
return false;
}

// Assign the global data for our connections
internalData->globalData.lpfnAcceptEx = internalData->lpfnAcceptEx;
internalData->globalData.lpfnGetAcceptExSockaddrs = internalData->lpfnGetAcceptExSockaddrs;
internalData->globalData.listenSocket = internalData->sListenSocket;
internalData->globalData.hCompletionPort = internalData->hCompletionPort;
internalData->globalData.dwNumberOfConcurrentThreads = internalData->dwNumberOfConcurrentThreads;
internalData->globalData.dwReadTimeTimeout = 10000; // TODO: Variable
internalData->globalData.dwAcceptTimeTimeout = 5000; // TODO: Variable
internalData->globalData.initialReceiveSize = 0; // Do not accept anything from AcceptEx

// If we wanted to accept data sent from ConnectEx via AcceptEx
//internalData->globalData.initialReceiveSize = HPS_OVERLAPPED_BUFFER_RECV_SIZE - ((sizeof(SOCKADDR_IN) + 16) * 2);

DWORD dwConnectionTotal = internalData->AddConnectionsToPool(internalData->dwInitialConnectionPoolCount);
if(dwConnectionTotal != internalData->dwInitialConnectionPoolCount)
{
// TODO: Log error
Destroy();
return false;
}

// Connect the listener socket to IOCP
if(CreateIoCompletionPort((HANDLE)internalData->sListenSocket, internalData->hCompletionPort, 0, internalData->dwNumberOfConcurrentThreads) == 0)
{
// TODO: Log error
Destroy();
return false;
}

internalData->hScavengerExitEvent = CreateEvent(0, TRUE, FALSE, 0);
if(internalData->hScavengerExitEvent == NULL)
{
// TODO: Log error
Destroy();
return false;
}
internalData->hScavengerThread = CreateThread(0, 0, ScavengerThreadWrapper, internalData, CREATE_SUSPENDED, &internalData->dwScavengerThreadId);
if(internalData->hScavengerThread == NULL)
{
// TODO: Log error
Destroy();
return false;
}
DWORD dwResult = ResumeThread(internalData->hScavengerThread);
if(dwResult == (DWORD)-1)
{
// TODO: Log error
Destroy();
__asm nop
}

// Success!
return true;
}

void Destroy()
{
if(internalData->hScavengerExitEvent != NULL)
{
SetEvent(internalData->hScavengerExitEvent);
if(internalData->hScavengerThread != INVALID_HANDLE_VALUE)
{
int result = WaitForSingleObject(internalData->hScavengerThread, internalData->dwScavengerDelay * 2);
if(result != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
CloseHandle(internalData->hScavengerThread);
internalData->hScavengerThread = INVALID_HANDLE_VALUE;
}
CloseHandle(internalData->hScavengerExitEvent);
internalData->hScavengerExitEvent = NULL;
}

if(internalData->sListenSocket != INVALID_SOCKET)
{
closesocket(internalData->sListenSocket);
internalData->sListenSocket = INVALID_SOCKET;
}

std::vector<HANDLE> workerThreadHandles;
std::list<tWorkerThreadData *>::iterator itr = internalData->workerThreads.begin();
while(itr != internalData->workerThreads.end())
{
workerThreadHandles.push_back((*itr)->hThread);
itr++;
}

// Clean up the worker threads waiting on the IOCP
if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
EnterCriticalSection(&internalData->workerThreadCS);
size_t count = internalData->workerThreads.size();
for(size_t x = 0; x < count; ++x)
{
PostQueuedCompletionStatus(internalData->hCompletionPort, 0, -1, 0);
}
LeaveCriticalSection(&internalData->workerThreadCS);
}

// Wait for all worker threads to close
for(size_t x = 0; x < workerThreadHandles.size(); x += MAXIMUM_WAIT_OBJECTS)
{
DWORD count = min(MAXIMUM_WAIT_OBJECTS, workerThreadHandles.size() - x);
DWORD dwResult = WaitForMultipleObjects(count, &workerThreadHandles[x], TRUE, count * 1000);
if(dwResult != WAIT_OBJECT_0)
{
// TODO: Log error
__asm nop
}
}

// Sanity check
if(internalData->workerThreads.size())
{
// TODO: Log error
printf("%i worker threads did not finish...resources will be leaked.\n", internalData->workerThreads.size());
}

if(internalData->connectionPool.size())
{
std::list<tConnectionData * >::iterator itr = internalData->connectionPool.begin();
while(itr != internalData->connectionPool.end())
{
closesocket((*itr)->socket_);
delete (*itr);
itr++;
}
internalData->connectionPool.clear();
}

if(internalData->hCompletionPort != INVALID_HANDLE_VALUE)
{
CloseHandle(internalData->hCompletionPort);
internalData->hCompletionPort = INVALID_HANDLE_VALUE;
}
}
};

HANDLE exitEvent = 0;

BOOL __stdcall ConsoleHandler(DWORD ConsoleEvent)
{
switch (ConsoleEvent)
{
case CTRL_LOGOFF_EVENT:
case CTRL_C_EVENT:
case CTRL_BREAK_EVENT:
case CTRL_CLOSE_EVENT:
case CTRL_SHUTDOWN_EVENT:
{
if(exitEvent != 0)
{
SetEvent(exitEvent);
return TRUE;
}
}
}
return FALSE;
}

int main(int argc, char * argv[])
{
printf("sizeof(tConnectionData) = %i\n", sizeof(tConnectionData));
if(InitializeWinsock() == false)
return 0;
cHighPerformanceServer server;
if(server.Create(15779) == false)
{
return 0;
}
exitEvent = CreateEvent(0, TRUE, FALSE, 0);
SetConsoleCtrlHandler(ConsoleHandler, TRUE);
WaitForSingleObject(exitEvent, INFINITE);
SetConsoleCtrlHandler(ConsoleHandler, FALSE);
server.Destroy();
DeinitializeWinsock();
CloseHandle(exitEvent);
return 0;
}




#12 Washu   Senior Moderators   -  Reputation: 6268

Like
0Likes
Like

Posted 31 March 2009 - 10:02 AM

Quote:
Original post by Drew_Benton
That would explain why the service time is gradually getting longer as more connections are being added and clients sporadically disconnect. I was running 500 clients per laptop, which it seems the network can't handle from one source.

The network can handle it fine, the desktop cannot. 500 connections coming from one machine is quite a large workload, especially if it then has to queue up and send data on those connections. Spreading out your connections across many machines should improve your test case, and allow you to implement other tests more easily as well.

Also, and I can't find a reference for this so take it with a grain of salt, I recall reading somewhere that Windows XP networking stack has certain limitations that server OS ones do not.
Quote:

If I split the test up into 2 x 250 connection parts and watch Wireshark, I see far fewer retransmits and never get any notifications of the delayed reads. If I try running 400 clients per computer, then right towards the 600 mark, I start getting more retransmits and longer service time delays in my program. As I hit almost 800 connections, the retransmits were filling up Wireshark and most of the connections were failing in my program due to longer service times.

I see now that my code seems to be more than suitable for handling a lot more connection and traffic, but my network and my current test setup is not. What would you suggest is the best way to go about testing code like this in general to avoid problems like this? I mean, when you are in an early stage and want to test the upper bounds of your code but have nothing much to lure random testers in and you need lots of traffic, do you just have to wait?

Let's say I wanted to try and pull off a larger test, would the problem lie in my router? I.e., if I setup a dedicated server to run on at home and got let's say 10 or so older computers (you know those P4 512mb Dell Optiplex ones) connected via LAN, do you think the router still couldn't handle it or are the computers themselves the problem?

I doubt your router is the problem. While most consumer routers are fairly limited in their internet bandwidth abilities, their internal switches can generally operate fairly well (and you should be hitting only the switch if you're on the local network). Spreading out the connections to be coming from multiple clients will create a more realistic test scenario. Configuring your host OS to be a server class one (even a 180 day trial of Server 2008 will do) would help as well.

#13 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 31 March 2009 - 10:28 AM

Thanks Washu, that's great information to know. I think I'll go that route of testing on the Server OS with a few more computers across the lan. I'll post some more results in a couple days or so after I get everything setup and test again. I might also invest some time in making the code 64bit compatible (or at least make sure it works in 64bit mode) as well so I can get a 2-for-1 deal as well.

I might as well get this type of work done now, because I'm going to have to pickup some more testing hardware anyways to work through the distributed computing and making a simple program that can work across multiple computers at once anyways. I won't go crazy or anything, just get some cheap basics that should work out fine.

Thanks again everyone, I'll keep you updated. [smile]

#14 Antheus   Members   -  Reputation: 2397

Like
0Likes
Like

Posted 01 April 2009 - 05:17 AM

Although you're measuring latency while sending small amounts of data, I don't see any NO_DELAY option.

#15 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 01 April 2009 - 08:46 AM

Quote:
Original post by Antheus
Although you're measuring latency while sending small amounts of data, I don't see any NO_DELAY option.


Thanks, I forgot about that flag in the test client. That seemed to help a little but having set that flag really emphasizes the problem with the testing hardware/OS combination on the server.

I ran another large test trying to get 750 from one computer to make sure nothing changed and sure enough, the higher the client count got, the more TCP Retransmissions flooded the screen with longer RTT to ACKs. In addition, I was getting a flood of duplicate ACKs, which to me looks like there is too much activity going on for the hardware on the test clients as it's not churning through the data fast enough. Similarly, another test with 400 clients on each computer resulted in the same behavior, so I'm certain now about the OS theory.

I don't expect to be able to get more testing hardware or a server setup until some time next week, so for now I'll keep working on the small case examples using the IOCP code using what I have. I still have to solve some more problems dealing with concurrency and stream processing. That's ok since this is really all for fun and learning; I'm not in a rush or have deadlines looming. I've also started already doing my initial research and development on making my small distributed/cluster code to tackle the scalability issues I originally talked about. So I have plenty to do until I can do a larger more effective test. When I do, I'll make sure to write up the results and hardware used for reference.

Thanks everyone, this thread has been a great benefit to me!

#16 Ysaneya   Members   -  Reputation: 1249

Like
0Likes
Like

Posted 02 April 2009 - 10:58 AM

That's an interesting topic, I'm interested to see if/how you'll solve your problem.

I don't know if it's too complex or not for you to test, but maybe you could try to change your network system to use a single UDP socket on the server to which the clients can send/receive data. The main point here would be to test if you're hitting a TCP socket count overhead/limit somewhere. With UDP your packets won't be reliable anymore, but since it's just for a test, it shouldn't matter..

Y.


#17 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 02 April 2009 - 02:21 PM

Quote:
Original post by Ysaneya
That's an interesting topic, I'm interested to see if/how you'll solve your problem.


Me too! I'm hoping a trial run on a server will turn out positive, but I'm still deciding on how to go about that test. I need my own server estup, which I've had planned for a while so I was just going to setup my own this upcoming week, but before making that kind of plunge, I've had to spend a lot of time researching options and making future considerations: A) Buy a refurbished server 1U unit from geeks.com, load up server OS, and test. B) Lease a dedicated server for a month and test on that so it's more "real world testing". C) Make a future investment and simply build my own via Newegg.

As much as I want to setup my own on my own local network, I don't know enough about the process to where I feel like I'd be making an educated buying purchase, so I'm leaning towards doing a 30 day lease of a dedicated for the purpose of testing this stuff. The other ideas I had about the distributed computing and shards/mmo architecture I think I can get away with on my local network using my existing desktop and laptops so no worries there as those goals aren't based on being able to act as real servers. This IOCP stuff though, I want to get it right.

Quote:
I don't know if it's too complex or not for you to test, but maybe you could try to change your network system to use a single UDP socket on the server to which the clients can send/receive data. The main point here would be to test if you're hitting a TCP socket count overhead/limit somewhere. With UDP your packets won't be reliable anymore, but since it's just for a test, it shouldn't matter..


I've started the process of making my IOCP server UDP based, but I'm getting a little stuck on one specific issue. In TCP, each connection has a WSARead posted on it and it simply blocks until there is something going on.

In my current UDP setup however, when I post a overlapped WSARecvFrom on the main listen socket, it completes immediately with an error code of ERROR_IO_PENDING. This is bad because since it does not block, the server will run at 100% CPU usage as it's polling the recv buffers rather than waiting for a completion event.

I've found that if I instead just post a blocking WSARecv, and then use PostQueuedCompletionStatus to the worker threads, then I can utilize the system as it should be, but that's defeating the purpose as it's bottlenecked at the non-overlapped WSARecvFrom call.

Going back to the overlapped WSARecvFrom call, each call does post a recv operation for the IOCP code to use, but I don't really know how to make it so that it's not flooding the connection with overlapped recv requests. I.e., running it in the loop will make it post an overlapped operation, so calling 1000 times a second means 1000 queued overlapped events waiting to be handled. Well that running 60 seconds is 60,000 over lapped events waiting ot be happened, or in UDP, 60,000 packets a minute.

I'm not really sure at the moment of how to go about coding limitations on that process in an efficient manner that would allow me to do the same caliber of TCP testing using UDP. My only initial thought on the matter is to have some value that stores how many posted overlapped operations there are and when that number hits some limit, Sleep the recv thread a little to see if those events have reduced. In the worker threads, I'd decrease the value on each operation. I was thinking I could do that system using the Interlocked family of functions, but that doesn't address what to do when I have too many posted as Sleeping seems a bit of a hack.

What I think I should be doing is having the worker threads decrement the overlapped counter as mentioned before, but have a detection in place to set a global event that the UDP thread waits on if the current overlapped counter value is great than some safe value. When that event is then triggered, it will loop through the WSARecvFroms again, posting requests and updating the counter until it hits the max quota and then waits for the event again. That seems better than Sleep and more effective and scalable overall.

I should be able to finish this code and test the idea in the next couple of days. I actually have simple UDP I'm working with for the distributed system component I'm making. More to come soon! [smile]

#18 aissp   Members   -  Reputation: 136

Like
0Likes
Like

Posted 02 April 2009 - 05:52 PM

hi Drew. Just fui.

Server VM based on dual core centrino and hosted 6 different servers, RAM 4gb.
Router fortigate 200 (not home router) clients, 25 different computers belonging different subnetworks, each of them running several clients. Each packet size was about 600 bytes. Results was 800 ccu without any significant exhausting server resources (i guess it was very easy to increase this amount, but we was limited by clients side...)

#19 Drew_Benton   Crossbones+   -  Reputation: 1729

Like
0Likes
Like

Posted 02 April 2009 - 09:54 PM

Thanks for those results aissp! I am still looking forward to testing the TCP version on a server myself and checking it out. I think I will run out of client resources before you did though [lol]

I've been working on the UDP version and after testing it, the results seem unreal. I mean, I know I am testing on localhost and all, but it's amazing how it's turning out.

Just some preliminary numbers about the current results. I am still thinking something has to be wrong, but so far it looks like I'm doing it right. I will spend some more time thinking through it though. I used TeamViewer to RDC into my laptops, so there was a small amount of network traffic from UDP ~4kb/s always going on.

Server: UDP IOCP with 2500 overlapped received kept pending at a time (see my Journal for why 2500!) As soon as this value hit 1/2, more requests were posted to bring it back to the max. This logic only triggers on 1/2 empty. The server also has a hard coded limitation of tracking 8kb "connections" (arbitrary). Since this is early stage work, I have to limit the server to one IOCP thread and one worker thread to avoid any synchronization issues with storing timing data. Unlike the TCP server, this server has to make use of boost::singleton_pool for the massive allocations and deallocations for the connection objects posted on each overlapped event. That wouldn't make performance "better" though.

Client: Creates a UDP socket and sends 32 bytes each second. Each client uses __rdtsc to generate its own UID and sends that to the server. Server stores client data in a stdext hash_map by the 64bit number sent. Simple client just like the TCP one was.

I ran 1000 processes of the Client on each of two laptops, 2000 total clients. Initial memory usage was 29mb (high for many reasons, modified TCP code) and stayed there in that range. CPU usage was more often 1 and sometimes 2 and 0 throughout the first 5 minutes of the test. About each minute that passed added 1 second on cpu time. About 150kb/s data was being processed by the application according to NetMeter.

The test ran for just a little over 5 minutes, informal timing. Out of the 2000 connections, 1033 had an average service time of over 1 second, but under still under ~1ms (< 1002 ms). 967 had an average service time of under 1 second, but under 1ms below it (>998ms). Given that 1/2 of the clients have their data handled faster than they are sending it, makes me a little concerned. I think I need to add in sequence numbers to the packets to try and figure out if duplicates are throwing off the values or if it's just the lack of precision of GetTickCount. I should move to a more high precision timer for a more accurate test.

For a second test, I ran 1500 processes of the Client on each laptop, 3000 total clients for the server. For this test, about 220kb/s data was being processed by the applications according to NetMeter. CPU usage was more often 1 and 2 % rather than 0 in the previous version. As a result, almost 2 seconds of cpu time was spent per minute on this test.

This test results were more reflective of the amount of traffic, but not by much. 325 were still running with an average service time of around 999ms. 2204 were running somewhere around 1000ms and fractions. 471 had averages above 1s between 1-5ms over.

Download: Results 1 | Results 2. The format is: [Client Index] => Average Service Time

Of course, testing is by no means scientific or "official", but the obvious difference between TCP and UDP are being seen; I just didn't know it would be this big. I'm sure my tests are far from optimal, but I'll keep at it this weekend to get better tests replicable for when I test on a server. I also will try to make a UDP/TCP client all-in-one as that would be more useful than trying to make two projects.

These posts are getting longer and longer each time, so I'll stop here, but I did want to mention I also made a one process test client that spawned more UDP connections and that testing went well likewise. Each laptop tried to run 4,000 UDP sockets and traffic was ~675kb/s. Occasional dips did take place where it dropped down to 1/3 of that, which should be reflected in the service time latencies. I ran a quick test for a couple of minutes (7950 total clients actually made it) and results were as follows: Results 3. yes the first few at ~500ms is puzzling, but everything else looks normal, but scaled way better than TCP testing did!

#20 hplus0603   Moderators   -  Reputation: 6024

Like
0Likes
Like

Posted 03 April 2009 - 04:31 AM

Quote:
which it seems the network can't handle from one source.


Actually, I think the problem is the testing client. When there are 500 separate processes, each taking 2 megabytes, and each doing their own I/O, the client-side kernel will have trouble scheduling all that. If you wrote one uber-client that used IOCP to multiplex a large number of client connections, I believe that client could sustain many more connections. From the point of view of the server, there would be no difference, because each connection is still a separate client-side port.

The reason you get half as much data back as in is because TCP overhead is at least 40 bytes per packet (20 for IP and 20 for TCP without options). Thus, your 32 bytes are actually less than the overhead.

Also, when posting measurements, "kB" is usually 1000 bytes, "kb" is usually 1000 bits, "KB" is usually 1024 bytes, and "Kb" is usually 1024 bits. Thus, when you post "48 kb" I read that as bits, meaning you're doing I/O that would fit on a modem. My guess is you meant bytes. A lot of people don't use the same conventions, though (and there's "kibi" and "kby" and others, too) so it's best to write out the unit: "kilobytes/s."





Old topic!
Guest, the last post of this topic is over 60 days old and at this point you may not reply in this topic. If you wish to continue this conversation start a new topic.



PARTNERS