Sign in to follow this  
Dragon_Strike

threadsafe event library

Recommended Posts

im looking for a thread safe event library similar to boost::library... ive been googling for a while without success... i spent an hour writing my own... it almost works but is kinda slow... any suggestions? EDIT: see code in post below [Edited by - Dragon_Strike on June 3, 2009 7:43:29 AM]

Share this post


Link to post
Share on other sites
Thread-safe event systems are incredibly hard to get right, debug and use. Lock-less designs are even harder to get right.

Your implementation also doesn't synchronize callback invocations. Imagine that you have two threads invoking same handler, which removes itself from the callback list. And if you do lock your handlers, then you immediately run into obscure dead/live-lock situations.

Boost's asio has io_service which can be used for asynchronous invocation, and works reasonably well for coarse grained designs. At very least, it can be used as foundation for handler invocation, since it helps solve previously mentioned problems.

Share this post


Link to post
Share on other sites
Quote:
Original post by Antheus
Your implementation also doesn't synchronize callback invocations. Imagine that you have two threads invoking same handler, which removes itself from the callback list. And if you do lock your handlers, then you immediately run into obscure dead/live-lock situations.
.


doesnt tbb::concurrent_hash_map solve that problem?
my real problem is that it crashes in the disconnecter destructor when reading the atomic variable... EDIT: actually i dont think that has to be an atomic variable

i didnt notice that boost signals had a thread-safe version... however boost signals seems very slow.. id prefer to use that as a last choice

EDIT: i just noticed that there is a problem if the Event object is destroyed before all connections...

[Edited by - Dragon_Strike on June 3, 2009 2:21:44 AM]

Share this post


Link to post
Share on other sites
ive rewritten the code

// Thread Safe Event Library
// Copyright (c) 2009 by Robert Nagy
//
//Permission is hereby granted, free of charge, to any person or organization
//obtaining a copy of the software and accompanying documentation covered by
//this license (the "Software") to use, reproduce, display, distribute,
//execute, and transmit the Software, and to prepare derivative works of the
//Software, and to permit third-parties to whom the Software is furnished to
//do so, all subject to the following:
//
//The copyright notices in the Software and this entire statement, including
//the above license grant, this restriction and the following disclaimer,
//must be included in all copies of the Software, in whole or in part, and
//all derivative works of the Software, unless such copies or derivative
//works are solely in the form of machine-executable object code generated by
//a source language processor.
//
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
//SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
//FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
//ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
//DEALINGS IN THE SOFTWARE.

#pragma once

#include <tbb/atomic.h>
#include <tbb/concurrent_vector.h>
#include <tbb/concurrent_hash_map.h>
#include <boost/lambda/lambda.hpp>
#include <boost/shared_ptr.hpp>

#include <algorithm>

#include "FastDelegate.hpp"

namespace drone
{

namespace devent
{
class Connection;

namespace detail
{
struct IEvent
{
virtual void DisconnectSlot(size_t slot) = 0;
};
}

class Connection
{
public:

Connection(size_t slot, boost::shared_ptr<tbb::atomic<detail::IEvent*>> parentPtr) : slot_(slot)
{
disconnecter_.reset(new Disconnecter(slot, parentPtr));
}

void Disconnect()
{
disconnecter_.reset();
}

bool Connected()
{
return (disconnecter_ && disconnecter_->parentPtr_ != NULL);
}

size_t Slot()
{
return slot_;
}

bool operator==(const Connection& other)
{
return slot_ == other.slot_;
}

private:
struct Disconnecter
{
Disconnecter(size_t slot, boost::shared_ptr<tbb::atomic<detail::IEvent*>> parentPtr) : slot_(slot), parentPtr_(parentPtr){}

~Disconnecter()
{
if(parentPtr_ && (*parentPtr_) != NULL)
(*parentPtr_)->DisconnectSlot(slot_);
}

boost::shared_ptr<tbb::atomic<detail::IEvent*>> parentPtr_;
size_t slot_;
};

size_t slot_;
boost::shared_ptr<Disconnecter> disconnecter_;
};

class Trackable
{
public:
virtual ~Trackable(){}

void Add(Connection& connection)
{
connections_.push_back(connection);
}

void Remove(Connection& connection)
{
tbb::concurrent_vector<Connection>::iterator it = connections_.begin();
while(it != connections_.end())
{
if(*it == connection)
*it = Connection(-1, boost::shared_ptr<tbb::atomic<detail::IEvent*>>());
++it;
}
}

public:
tbb::concurrent_vector<Connection> connections_; // thread-safe refcount
};

template <typename P>
class Event : public detail::IEvent
{
public:
Event() : self_(new tbb::atomic<detail::IEvent*>())
{
curSlot_ = 1;
*self_ = this;
}

~Event()
{
*self_ = NULL;
}

template<typename ClassType, typename CallbackType>
Connection Connect(ClassType& object, CallbackType callback)
{
size_t slot = ++curSlot_;
slots_.insert(std::make_pair(curSlot_, fastdelegate::MakeDelegate(&object, callback)));

Connection connection(curSlot_, self_);
object.Add(connection);
return connection;
}

void operator()(P p)
{
HashMap::iterator it = slots_.begin();
while(it != slots_.end())
(it++)->second(p);
}

void DisconnectSlot(size_t slot)
{
slots_.erase(slot);
}

private:

struct HashCompare
{
static size_t hash(const size_t x) { return x; }
static bool equal(const size_t lhs, const size_t rhs) { return lhs == rhs; }
};

tbb::atomic<size_t> curSlot_;
boost::shared_ptr<tbb::atomic<detail::IEvent*>> self_;
typedef tbb::concurrent_hash_map<size_t, fastdelegate::FastDelegate1<P,void>, HashCompare> HashMap;
HashMap slots_;
};





}

}





there are two problems i need help to solve... first of all the "slots_" vector can contain alot of "NULL's" which the invoke operator has to iterate... not very efficient....

the second problem is that if the event object is destroyed before the connections then the "parent_" pointer in all the connection becomes invalid

EDIT: fast fix

EDIT2: fixed the second problem... are there any thread issues?

EDIT3: more fixes

[Edited by - Dragon_Strike on June 3, 2009 7:11:19 AM]

Share this post


Link to post
Share on other sites
Quote:
Original post by loufoque
Quote:
i didnt notice that boost signals had a thread-safe version... however boost signals seems very slow..

How so?


everything ive read on these and other forums seem to point to indicate it being slow

[Edited by - Dragon_Strike on June 3, 2009 7:13:11 AM]

Share this post


Link to post
Share on other sites
It's slower than it has to be, but it's faster than it used to be. Also, being thread-safe finally gives it an excuse to lock mutexes, which it had been doing already.

From what I've seen of signals2 I'm really a fan. It addresses a lot of the problems in the previous library. I haven't done very much benchmarking on it compared to original signals, though, so I couldn't say for sure what the total costs are.

Share this post


Link to post
Share on other sites
Quote:
Original post by Sneftel
It's slower than it has to be, but it's faster than it used to be. Also, being thread-safe finally gives it an excuse to lock mutexes, which it had been doing already.

From what I've seen of signals2 I'm really a fan. It addresses a lot of the problems in the previous library. I haven't done very much benchmarking on it compared to original signals, though, so I couldn't say for sure what the total costs are.


i will reconsider using signals2... however if my code is alright im thinking about adding parallel event invocation using OpenMP or TBB...


Share this post


Link to post
Share on other sites
ive tried signals2... but the syntax seems horrible...

pSceneGeometry->PositionChanged.connect(scene::Geometry::signal_type::slot_type(&Geometry::PositionChanged, pGeometry.get(), _1).track(pGeometry));

this isnt very readable....

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Sign in to follow this