• entries
3
10
• views
2314

1137 views

*See the EDIT below*

The impetus for this research topic is the necessity for threads to exchange data in an efficient manner, namely between the network and main threads. Within the context of networking, we want data to be quickly available to, and dispensable from, the main thread, and that means calling the corresponding system calls (e.g. sendto/recvfrom) as often as possible. In addition to this, there is necessarily some translation done between native and network formats, as well lookups and other operations that could be off-loaded to the network thread if possible.

However, this introduces a problem; as data arrives and is picked up by the network thread, or as logic in the main thread requires data be sent out to the network, how do the separate threads share this data? If we create a mutex for it, we introduce an expensive system call that causes delays and is generally unacceptable in a tight loop. Whatever solution we implement should not involve systems calls or any significant pause in execution.

If our shared data is simple enough to be just one variable - just one *instance* of something, if you will - then nothing fancy is needed. As an example, if the network thread needs to notify the main thread that a player has sent a QUIT message, and that this player be removed from all operations and deleted, a "player * quitting" member can be added, which is updated as necessary. The main thread will watch this member, and take appropriate action when it sees that it is non-null. So, the course of action for both threads is:

1. QUIT message received by player
2. update member "quitting" to refer to this player

1. check member "quitting"
2. if non-null, delete all associated resourced and updating member "quitting" to null

And this introduces yet another problem, as you can probably guess: what should the Network thread do with second QUIT message that it has received before the Main thread has finished processing the first one? It can simply queue this second message, and any others it receives, in a buffer. Each loop iteration, it will check to see if the member "quitting" is null, and if so, assign it to one of the items in the buffer (also removing this item from the buffer).

This solution works and is great in that it allows each thread to produce or consume data without waiting for the other. If data arrives in the producer, and the consumer is busy with previously shared data, it is simply queued, and the producer continues execution. The only cost incurred is one conditional in each thread, plus a buffer in the producer thread for queueing purposes.

The drawback of this approach is that only one item can be shared at a time. While this may be ok for QUIT messages - it would be uncommon for many players to quit in the few milliseconds it takes per each loop iteration - it would not be efficient for messages that are transferred more often. For this approach, two buffers can be used that swap their contents. Instead of explaining all the details, I will simply post the code I've written to support this://// Thread synchronization w/out semaphores//// the data producer can always insert data, and should call Commit() once during it's event loop// (the Commit() call will only commit produced data if the consumer has processed existing data)//// the data consumer will process data if there is any has been committed by the producer//template class Queue{private: std::vector m_clsIn; std::vector m_clsOut; bool m_blnData; bool m_blnProcessed;public: Queue() : m_blnData(false), m_blnProcessed(true) {} void Add(T clsData) { //ASSERT(thread == producer) m_clsIn.push_back(clsData); } void Commit() { //ASSERT(thread == producer) if ( m_blnProcessed && m_clsIn.size() > 0 ) { m_clsIn.swap(m_clsOut); m_blnProcessed = false; m_blnData = true; } } template void Process(Functor & refFunctor) { //ASSERT(thread == consumer) if ( m_blnData ) { for (size_t i=0; i); m_clsOut.clear(); m_blnData = false; m_blnProcessed = true; } }};
As incoming messages are received by the Network thread, it will call the Add() method. The Network thread will also call Commit() each loop iteration. The Main thread will call Process() each loop iteration to retrieve the message data. For outgoing messages, there will be a separate Queue object in which the above is reversed - the Main thread will call Add() and Commit(), and the Network thread will call Process().

Note: I've not tested this class yet. Specialized methods may be added to support certain data types or operations.

EDIT: Per [color=rgb(51,51,51)][font=helvetica]

### Josh's comment and the article he has linked, I've made some changes. Instead of swapping two vectors, one linked list is used with two pointers that refer to active and inactive portions of the list. Nodes that contain produced and un-consumed data constitute the active portion, while nodes containing consumed but un-produced data constitute the inactive portion.

[/font][/color]

[color=rgb(51,51,51)][font=helvetica]

### Even though it is implemented as a list, Queue was kept as a name as it is processed in a first-in, first-out basis. I would have simply used the queue written by the author of the article Josh linked, but it has more functionality than I need; for example I do not need "try to put" or "try to get" semantics.

[/font][/color]template class Queue{private: struct Node : public T { Node * m_ptrNext; Node(Node * ptrNext = NULL) : m_ptrNext(ptrNext) {} ~Node() {delete m_ptrNext;} } * m_ptrActive, * m_ptrInactive; public: Queue() { // // Start the queue with two elements // m_ptrActive = m_ptrInactive = new Node(); m_ptrInactive->m_ptrNext = new Node(m_ptrInactive); // // Make sure all threads have updated memory // std::atomic_signal_fence(std::memory_order_seq_cst); } ~Queue() { // // Make sure all threads have updated memory // std::atomic_signal_fence(std::memory_order_seq_cst); // // Set the 'next' pointer to NULL so the recursive nature of the Node d'tor will not // cause the destruction process to wrap around // Node * ptrNode = m_ptrInactive->m_ptrNext; m_ptrInactive->m_ptrNext = NULL; delete ptrNode; } template void Put(Functor functor) { //ASSERT(thread == producer) // // Check if the queue is full, in which case we must add a node // if ( m_ptrInactive->m_ptrNext == m_ptrActive ) { // // Between the above conditional and the following statement, the consumer can process // data and move the 'active' pointer, making the following node addition unnecessary. // However, there is no way to avoid it except with a mutex lock or other expensive // operation, and the extra node will get used anyway in the next call to Put() // // In the unlikely case that the 'active' pointer is advanced between the preceding // conditional and the Node c'tor below, we use 'inactive->next' as a parameter instead // of 'active' // m_ptrInactive->m_ptrNext = new Node(m_ptrInactive->m_ptrNext); } functor(static_cast(*m_ptrInactive)); std::atomic_signal_fence(std::memory_order_release); m_ptrInactive = m_ptrInactive->m_ptrNext; } template void Get(Functor refFunctor) { //ASSERT(thread == consumer) // // These is no [good] reason to fence here, as it is optimal to let the CPU handle memory as it // naturally does. The producer thread may not see our memory updates right away and subsequently // add extra nodes when it doesn't have to, but any extra nodes will be used anyway in subsequent // calls to Put(). // while ( m_ptrActive != m_ptrInactive ) { refFunctor(static_cast(*m_ptrActive)); m_ptrActive = m_ptrActive->m_ptrNext; } } // // For accurate results, call this from the producer thread // size_t SizeAll() { // Count from inactive -> inactive size_t count = 1; for (Node * ptrNode = m_ptrActive->m_ptrNext; ptrNode != m_ptrActive; ptrNode = ptrNode->m_ptrNext) ++count; return count; } // // Results may be inaccurate regardless of which thread calls this // size_t SizeActive() { // Count from active -> inactive size_t count = 0; for (Node * ptrNode = m_ptrActive; ptrNode != m_ptrInactive; ptrNode = ptrNode->m_ptrNext) ++count; return count; }};
The calls to [font='courier new']std::atomic_signal_fence(...)[/font] will have to be rewritten for compilers that do not support C++11. In my code I do not directly call these, but instead call a library that mimics the code in the article linked by Josh. I changed them here as I do not want to include my entire library.

An assumption made is that a [font='courier new']Queue[/font] object will not be destroyed without coordination between threads (e.g. one waits for the other to finish). In other words, the destructor will clobber concurrent calls to [font='courier new']Put()[/font]/[font='courier new']Get()[/font].

A caveat is that T objects are not destroyed until the [font='courier new']Queue[/font] object is destroyed. This is by design, since my particular implementation benefits from it. However, it can be trivially changed by:

• In [font='courier new']Node[/font]: adding [font='courier new']char object[sizeof(T)][/font] and removing the T inheritance
• in [font='courier new']Put()[/font]: calling [font='courier new']new (node->object) T()[/font] and changing the functor parameter to [font='courier new']static_cast(node->object)[/font]
• in [font='courier new']Get()[/font]: changing the functor parameter to [font='courier new']static_cast(node->object)[/font] and calling [font='courier new']static_cast(node->object)->T::~T()[/font]

(syntax may not be correct, but you get the idea)

-Josh

-Josh

Thanks for the info!  I knew there must be something I'm missing.  I've read the article you linked, as well as other related articles, and will adjust my code to account for it.  However, my "real" job has taken priority for now and I will update this when I am able to.

## Create an account

Register a new account