Completely distributed game logic design

Started by
20 comments, last by Antheus 16 years, 8 months ago
While trying to find efficient and scalable distributed application design, I got somewhat stuck on MPI approach. The idea here is to try to avoid any kind of explicit synchronization.

[message queue] --> Object 1 // thread
[message queue] --> Object 2 // thread
[message queue] --> Object 3 // thread
              ...
[message queue] --> Object n // thread

with usual message loop:
while (this->isAlive()) {
  Message *msg = next_message();
  handle( msg );
}

The threading doesn't really imply a thread per object (given hundreds of thousands of objects that's unviable), nor does it imply true threads (objects can be distributed over multiple machines and/or processes. With this design, each object represents equivalent of thread's own stack space. Queues would be implemented using one of asynchronous dispatching strategies (Reactor/Proactor), with central demuxer assigning objects with non-empty message queues to available worker threads. Demuxer ensures that a single objects is never handled by more than one thread at the same time (in a way similar to fibers, co-routines). But this would have perhaps the greatest impact on programming model. While setting values becomes trivial (SetValueMessage( id, value )), querying the value becomes problematic for objects that are not currently handled by the same thread (1 thread per object). This would devolve into rather verbose message passing semantics (typical combat scenario):

// In player object
on_attack( target, spell ) {
  if (this->has_spell( spell ) ) {
    if (this->get_energy() > spell.energy_cost() ) {
      send( AttackMessage, this->current_weapon(), this, target, spell );
    }
  }
}
// In weapon object
on_attack( attacker, target, weapon, spell ) {
  int base_damage = rand( this->minimum_damage(), this->maximum_damage() );
  send( DealDamageMessage, target, attacker, base_damage, this->extra_damage(), spell);
}
// In target object
on_deal_damage( attacker, base_damage, extra_damage )
{
  send( AbsorbDamageMessage, this->current_armor(), attacker, this, base_damage, extra_damage );
}
// In armor object
on_absorb_damage( attacker, target, base_damage, extra_damage )
{
  int adjusted_damage = base_damage * this->armor_rating() + extra_damage;
  send( DealAdjustedDamage, target, attacker, adjusted_damage );
}
// Back in target object
on_deal_adjusted_damage( attacker, adjusted_damage )
{
  // this also triggers the change in health value to be propagated
  // to remote nodes
  health.set( this.health.get() - adjusted_damage ); 
}


send() has the signature of send( MessageType, SendDestination, ... ); Explicit use of this-> indicates that it's only possible to query values belonging to the object which is currently handled by the thread. While other objects may exist in same address space, their values cannot be obtained, since they might be modified by some other thread (or running in parallel on another machine, causing data inconsistency. This would be equivalent to all get_ functions being protected or private, and message handlers member functions. While this approach appears to solve concurrency issues, as well as data consistency problems (events will occur in proper sequence, but several events from different clients would not have guaranteed order of execution, possibly just bucket synchronization), the programming model is somewhat cumbersome. The number of different messages is rather large, and the current syntax isn't really well suited (but that's solvable). However, I fail to see an alternative to this. Allowing values on other objects to be queried directly requires synchronization on per-object basis. While usually not very drastic, it does represent overhead. It also cannot be done across multiple processes. If implemented in C++, individual actions would likely be in form of functions (or functors), that would pass the object reference via IOC, guaranteeing thread-safety. When passing messages between cluster nodes, the order of events would be preserved. Regardless of where the objects physically reside, there is always only one single message queue handler responsible, so this eliminates authority issues. I realize that this is merely a re-invention of similar MPI solutions or lambda calculus, but it's turned out somewhat hard to find practical alternatives to classic single-threaded design, or at least any attempt at reducing required locking. There are obvious problems even with this design. One example would be buying an item from a vendor:

A ------ HasItemMessage ---> Vendor
                               |
A <-- ItemAvailableMessage ----+
|                       // Here another player buys item
+---- BuyItemMessage ------> Vendor
                               |
                               + 
// Although vendor said they have the itme, it's no longer available,
// making the whole query process somewhat redundant, and we could go with
// BuyItemMessage from the start


But I consider these higher level problems. Either that, or allowing for some form of soft locks, similar to way files are sometimes used to limit access to a directory or file, where presence of a lock. file means the resource is currently in use. Anyway, I'm just brainstorming, perhaps someone will find a fatal flaw in this whole concept that makes this useless in the first place.
Advertisement
I'd stick with messages given the requirements. Maybe a different syntax. Dunno if you could use RPC calls instead to reduce the bloat, but that's an option.

Everything is better with Metal.

RPC calls turn into deadlock hell if they are synchronous. Asynchronous RPC calls are no different from messages.
enum Bool { True, False, FileNotFound };
"When passing messages between cluster nodes"

Replication of the world state (updates) on the seperate machines in a cluster can become significant overhead (multicores could share the same memory space).
Spacial partitioning and residence of objects and probably 'local' execution queues would likely be more efficient (though adding the boundry transition problems).


The project I'm working on likely has much heavier AI processing, so the communications overhead ratio would be much less. The large number of cluster nodes I expect would be costly for each to maintain world state, so each high AI object has a window of the 'chunks' of the world surrounding it (Im still investigating the tradeoffs of sharing the chunks between objects that overlap in the same space). The central 'bookkeeping' master copy of the world space itself is spread over many machines. Im still investigating splitting the simulation mechanism across multi-cores as well as multiple machines.
--------------------------------------------[size="1"]Ratings are Opinion, not Fact
Multi-cores help when you are compute bound, but beware that the multi-cores share a single memory bus to main RAM. That means that for the cases where you are memory access bound, multiple cores are no faster than a single core (and may be slower because of contention).

To take advantage of heavily multi-cored CPUs (Sun Niagara2 -- 64 cores!) I believe we need a totally different programming paradigm (message passing / state machines), and we don't really have the languages and tools (like debuggers!) to make good use of that paradigm yet. Also, I think you'll want to use L2 cache as a message queue, so you can avoid going through main RAM for the messaging; currently there's not quite enough control to make sure that that happens. Lockable/assignable L2 cache is probably in our future.
enum Bool { True, False, FileNotFound };
Quote:To take advantage of heavily multi-cored CPUs (Sun Niagara2 -- 64 cores!) I believe we need a totally different programming paradigm (message passing / state machines), and we don't really have the languages and tools (like debuggers!) to make good use of that paradigm yet. Also, I think you'll want to use L2 cache as a message queue, so you can avoid going through main RAM for the messaging; currently there's not quite enough control to make sure that that happens. Lockable/assignable L2 cache is probably in our future.


I spent some time verifying and implementing the proposed system, and while I do not have such huge hardware, it doesn't affect the solution itself.

The problem is formalized with a very simple primitive:
class ObjectPtr {  void send( Message msg ) {    if ( is_local ) {      m_object.messagequeue.push( msg );      MARK_PENDING(m_object); // notify demuxer that m_object needs attention    } else {      socket.send( msg );    }  }  Object *m_object;}class Object {  void process_pending() {    int n = 10;    // process at most n messages, then yield to others    while ( !messagequeue.empty() && n-- ) {      handlemessage( messagequeue.pop() );    }  }  State object_local_state;}

The processing model results in this:
---Socket-->[Demuxer}-------->{Objects}                 ^                 |//Send                 +-----------------+------>[Serializer]--->{Net}

Serializer runs in its own thread, and demuxer passes out worker threads to objects with non-empty message queues (most likely only 1 (special case, can run in completely single-threaded implementation with no change to code) or 2 workers per processor)

While this would work fine in theory, the problem becomes with message storage. If used for true IPC, it needs to be persisted (either on heap, or via network). And using lock-free queues, allocating and de-allocating becomes a challenge in itself. Using a lockfree allocator solves this problem, but then causes livelock problem if the allocator is exhausted (e.g. Object produces messages at 1:2 ratio, when generating a message, it will spin on allocator waiting to receive an allocation, and demuxer will be waiting for processing to complete). There might be a solution for this at some higher level.

One side-effect of the design I hadn't noticed at first is that there is no more need for object shadowing. Since all calculations are performed either on object that owns the queue, or the data is passed through a message to another object, the execution context will always be local only to thread to which the object has been assigned.

In many ways I feel this is close to the state machine model mentioned. In this example, each state machine has its own thread, and messages form both, input and output to another SM.

One problem that I do suspect is the most problematic is acknowledgment or feedback. Since there are no guarantees as to when other a certain action will be acknowledged (one node is very busy and takes seconds to respond, when expected time is milliseconds). Then again, since this is about real-time gameplay, if such disconnect occurs, the solution would likely lie elsewhere, such as re-distributing the objects across nodes.

And, this design also reduces intra-node traffic (no shadowed copies), albeit at the expense of larger intra-object messages. So far, I've found that the most complex action I need to express takes 4 messages (per user command), with most of them being executable in direct response to command with no additional messages.

I won't know about real-world behaviour until the implementation is in workable form and split across a few machines.

Another consequence of such model is also that it becomes trivial to use DHT as storage, real-world analysis will need to show the needs for traffic balancing and explicit object grouping on a single machine - I suspect that with moderately complex logic of a typical MMO, this overhead would be less than originally expected.
That sounds very encouraging.

Some questions:

1) Can you guarantee latencies between objects? It seems like, with a guaranteed maximum workload, you might be able to.
2) How do you do collision detection between objects in different processes? I think this one is harder.
3) If you put objects onto servers in a random or round-robin fashion (often one of the better load balancing strategies for disconnected loads), then all of the processors may need to load all of the static data about the world (terrain db, etc).
enum Bool { True, False, FileNotFound };
Quote:1) Can you guarantee latencies between objects? It seems like, with a guaranteed maximum workload, you might be able to.
2) How do you do collision detection between objects in different processes? I think this one is harder.
3) If you put objects onto servers in a random or round-robin fashion (often one of the better load balancing strategies for disconnected loads), then all of the processors may need to load all of the static data about the world (terrain db, etc).


1) The guarantee is somewhat tricky. If a particular action takes a long time (hundreds of ms or s), then the system breaks. This is of course no different that using other approaches (even with multiple worker threads, if a monolithic task takes a long time, it will bog down any system if executed in all worker threads).

That aside, if each action by itself is well behaved, or offloads its work to external workers (that do not need to lock the object), then it's possible to establish per-object quotas, and use that to balance them across nodes.

2) This one is trickier. I won't pretend I can offer any reasonable solution to general problems, but I believe that a solution with bucket synchronization could work.

- For the duration of time slice, collect (in an independant out-of-thread object) intended movements. For n-dimensional movement, that would be n+1 dimensional object. For objects represented with circle, an extruded cylinder would represent intended movement. This can be further simplified with a point extended into a line. Desired movements can be broadcast within cluster (barely managable with reasonably small number of objects. A caching/grouping/delta compression strategy could also be applied here for cases where large number of collisions need to be resolved (1000+).

At the end of time slice, calculate all possible intersections and seek for collisions. Send messages back to each object, signaling either allow or deny movement. This calculations would be done locally for each process, and collision information would be known for entire space, so final allow/deny notifications could be sent locally only, not adding to additional traffic.

Since this calculation is out-of-thread (it doesn't need the object's thread safety), messages can be sent with higher priority. Once again, this only works under certain load guarantees - or perhaps it's not even that dependant on the load, I'd have to look into this in more detail.

In the end, the problem will be with (n-1)(n-2)/2 complexity of collision detection itself, not so much with the cluster scalability. Given realistic loads, it's unlikely that more than 1000+ players would be close enough (collision detector has knowledge of spatial relations, and can cull out objects that cannot possibly collide, even if individual objects can't). Additionally, static geometry can be safely checked on per-object basis, and non-player entities can probably also use simplified tests.

3) I believe there was an article about DHT addressing this. One solution is to assign each sub-part of the world to m different nodes, where m << n, where n is total number of nodes. This way, different parts of the world are permuted across multiple nodes, requiring single node to only know about a small number of subsets of the world. This still implies geographical grouping. In case of m = n, every node contains entire world data. I'll have to look it up, IIRC Colossus or Mercury implementations use precalculated connections between nodes to statically balance regions.

Given Zipf or Gaussian distribution of activity across the world, simply splitting one geographic location across 3-6 nodes would likely be sufficient to take care of the worst overload (within limits, obviously).

Quote:Original post by hplus0603
Multi-cores help when you are compute bound, but beware that the multi-cores share a single memory bus to main RAM. That means that for the cases where you are memory access bound, multiple cores are no faster than a single core (and may be slower because of contention).

To take advantage of heavily multi-cored CPUs (Sun Niagara2 -- 64 cores!) I believe we need a totally different programming paradigm (message passing / state machines), and we don't really have the languages and tools (like debuggers!) to make good use of that paradigm yet. Also, I think you'll want to use L2 cache as a message queue, so you can avoid going through main RAM for the messaging; currently there's not quite enough control to make sure that that happens. Lockable/assignable L2 cache is probably in our future.



I will have to do some testing on the quad core system I am planning to buy soon. The Q6600 has 8 meg of cache (4x2) and hopefully the data flow to number crunching ratio wont be too high. Additional cores could only increase the need for data. I havent yet seen how architecturally they might increase the flow from shared main memory to actually allow efficient use of so many cores for more than limited applications (ie- number crunching pipelines).

--------------------------------------------[size="1"]Ratings are Opinion, not Fact
Quote:Desired movements can be broadcast within cluster


I was going to object that this scales by n-squared in total number of objects on the cluster, which isn't good for scalability, but you already noted that. This is the main reason why I think geographic division is still a necessity for anything physically based and real-time.

Note that you can work around the problem with game design, instead. A RPG really doesn't need physics (or, why EverQuest mobs will suddenly warp through trees and rocks that are in their path).
enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement