Observer pattern + Visitor pattern for message system

2.5k Views Asked by At

Recently I got into implementing a message dispatching system that uses the "Observer pattern": nothing special here. As I developed it I thought it would be nice to send "Message" objects from the "subject" that could be fundamentally different from each other and could be read from the many "observers".

These different messages took the form of different message classes (for example, think about the "User Logout message", "Screen mode toogle" and "Volume level changed", all of these need different information) and soon I found that the "observers" needed not to know about every different message I would want to create (that would be... unsustainable, to say the least). Instead, I would like each observer to be able to react to specific kinds of messages.

So in order to make something I thought that double dispatching could be my option here. After a litte I got this piece (c++11 only because of the for loop):

#include <iostream>
#include <vector>
#include <string>

/**
* A few forward declarations.
*/

class Message_base;
class Message_type_a;
class Message_type_b;

/**
* Base observer...
*/

class Observer_base
{
    public:

    /**
    * All these implementations are empty so we don't have to specify them
    * in every single derived class.
    */

    virtual void        get_message(const Message_base&) {}
    virtual void        get_message(const Message_type_a&) {}
    virtual void        get_message(const Message_type_b&) {}
};

/**
* Specification of all message types.
*/

class Message_base
{
    public:

    /**
    * This is the method that will implement the double dispatching in all
    * derived classes.
    */

    virtual void        be_recieved(Observer_base &r) const=0;  //Now that's a nasty method name.
};

class Message_type_a:public Message_base
{
    private:
    int integer_value;

    public:
                Message_type_a(int v):integer_value(v) {}
    int             get_integer_value() const {return integer_value;}
    void            be_recieved(Observer_base &r) const {r.get_message(*this);}

};

class Message_type_b:public Message_base
{
    private:
    std::string string_value;

    public:
                Message_type_b(const std::string v):string_value(v) {}
    std::string         get_string_value() const {return string_value;}
    void            be_recieved(Observer_base &r) const {r.get_message(*this);}
};

/**
* This is the base clase for the Subject... Notice that there are no virtual
* methods so we could as well instantiate this class instead of derive it.
*/

class Subject_base
{
    private:

    std::vector<Observer_base *>    observers;

    public:

    void            emit_message(const Message_base& m) {for(auto o : observers) m.be_recieved(*o);}    //Again, nasty to read since it's... backwards.
    void            register_observer(Observer_base * o) {observers.push_back(o);} 
};

/**
* Now we will create a subject class for the sake of it. We could just call the
* public "emit_message" from main passing Message objects.
*/

class Subject_derived:public Subject_base
{
    public:

    void            emit_message_a(int v) {emit_message(Message_type_a(v));}
    void            emit_message_b(const std::string v) {emit_message(Message_type_b(v));}
};

/**
* This gets fun... We make two observers. One will only get type_a messages
* and the other will get type_b.
*/

class Observer_type_a:public Observer_base
{
    private:

    int             index;  //We will use it to identify the observer.

    public:

                Observer_type_a(int i):index(i) {}
    void            get_message(const Message_type_a& m) {std::cout<<"Observer_type_a ["<<index<<"] : got type_a message : "<<m.get_integer_value()<<std::endl;}
};

class Observer_type_b:public Observer_base
{
    private:

    std::string         name; //Merely to identify the observer.

    public:

                Observer_type_b(const std::string& n):name(n) {}
    void            get_message(const Message_type_b& m) {std::cout<<"Observer_type_b ["<<name<<"] : got type_b message : "<<m.get_string_value()<<std::endl;}
};

/**
* Stitch all pieces together.
*/

int main(int argc, char ** argv)
{
    Observer_type_a o_a1(1);
    Observer_type_a o_a2(2);
    Observer_type_b o_b1("Sauron");
    Observer_type_b o_b2("Roverandom");

    Subject_derived s_a;

    s_a.register_observer(&o_a1);
    s_a.register_observer(&o_b1);

    s_a.emit_message_a(23);
    s_a.emit_message_b("this is my content");

    s_a.register_observer(&o_a2);
    s_a.register_observer(&o_b2);

    s_a.emit_message_a(99);
    s_a.emit_message_b("this is my second content");

    //gloriously exit.  
    return 0;
}

For the sake of clarity I will speak my goals here:

  • Be able to send many different messages from the Subject.
  • Specialize the observers to they ignore every message not meant for them (it would be even nicer if they weren't sent at all, but I know I can make that registering different groups of observers).
  • Avoid RTTI and derived class casting.

Here comes my question: Have I missed a simpler implementation to achieve my goals?.

It is important to mention that the system this will be used on will not have that many observers and maybe less than ten subjects present at the same time.

2

There are 2 best solutions below

1
On

I think you should stick to what will be simpler. If all your observers handles all messages, then you must have one observer type. If the messages are unrelated, each observer mush watch only for the messages it handles.

A solution using Boost::Signal2 would be:

#include <string>
#include <cstdio>
#include <iostream>
#include <functional>
#include <boost/signals2/signal.hpp>

class Subject
{
public:
    void emit_message_a(int v) {
        sig_a(v);
    }

    void emit_message_b(const std::string v) {
        sig_b(v);
    }

    template<typename F>
    void register_listener_a(const F &listener)
    {
        sig_a.connect(listener);
    }

    template<typename F>
    void register_listener_b(const F &listener)
    {
        sig_b.connect(listener);
    }

private:
    boost::signals2::signal<void (int)> sig_a;
    boost::signals2::signal<void (std::string)> sig_b;
};

class Observer
{
public:
    Observer():
        name("John")
    {}

    void observe(int v) {
        std::cout << name << " has observed phoenomenon int: " << v << std::endl;
    }

    void observe(std::string v) {
        std::cout << name << " has observed phoenomenon string: " << v << std::endl;
    }

private:
    std::string name;
};

int main()
{
    Subject s;
    Observer o;

    s.register_listener_a([&](int v){o.observe(v);});
    s.register_listener_b([&](std::string v){o.observe(v);});


    s.register_listener_a([](int val) {
        std::cout << "Received message a : " << val << std::endl;
    });
    s.register_listener_a([](int message_a) {
        printf("I have received message a, too! It is %d.\n", message_a);
    });

    s.register_listener_b([](std::string msg) {
        std::cout << "A B type message was received! Help!\n";
    });

    s.emit_message_a(42);

    s.emit_message_b("a string");

    s.emit_message_a(-1);

    s.emit_message_b("another string");
}

Running it, I get:

John has observed phoenomenon int: 42
Received message a : 42
I have received message a, too! It is 42.
John has observed phoenomenon string: a string
A B type message was received! Help!
John has observed phoenomenon int: -1
Received message a : -1
I have received message a, too! It is -1.
John has observed phoenomenon string: another string
A B type message was received! Help!

If you are going to use it, be sure to read the manual.

4
On

Some metaprogramming boilerplate:

// a bundle of types:
template<class...>struct types{using type=types;};

// a type that does nothing but carry a type around
// without being that type:
template<class T>struct tag{using type=T;};

// a template that undoes the `tag` operation above:
template<class Tag>using type_t=typename Tag::type;

// a shorter way to say `std::integral_constant<size_t, x>`:
template<std::size_t i>struct index:std::integral_constant<std::size_t, i>{};

Get the index of a type in a types<...>:

// this code takes a type T, and a types<...> and returns
// the index of the type in there.
// index_of
namespace details {
  template<class T, class Types>
  struct index_of{};
}
template<class T, class Types>
using index_of_t=type_t<details::index_of<T,Types>>;
namespace details {
  // if the first entry in the list of types is T,
  // our value is 0
  template<class T, class...Ts>struct index_of<T, types<T,Ts...>>:
    tag< index<0> >
  {};
  // otherwise, it is 1 plus our value on the tail of the list:
  template<class T, class T0, class...Ts>
  struct index_of<T, types<T0, Ts...>>:
    tag< index< index_of_t<T,types<Ts...>{}+1 > >
  {};
}

Here is a single "channel" broadcaster (it sends one kind of message):

// a token is a shared pointer to anything
// below, it tends to be a shared pointer to a std::function
// all we care about is the lifetime, however:
using token = std::shared_ptr<void>;
template<class M>
struct broadcaster {
  // f is the type of something that can eat our message:
  using f = std::function< void(M) >;
  // we keep a vector of weak pointers to people who can eat
  // our message.  This lets them manage lifetime independently:
  std::vector<std::weak_ptr<f>> listeners;

  // reg is register.  You pass in a function to eat the message
  // it returns a token.  So long as the token, or a copy of it,
  // survives, broadcaster will continue to send stuff at the
  // function you pass in:
  token reg( f target ) {
    // if thread safe, (write)lock here
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
    // unlock here
  }
  // removes dead listeners:
  void trim() {
    // if thread safe, (try write)lock here
    // and/or have trim take a lock as an argument
    listeners.erase(
      std::remove_if( begin(listeners), end(listeners), [](auto&& p){
        return p.expired();
      } ),
      listeners.end()
    );
    // unlock here
  }
  // Sends a message M m to every listener who is not dead:
  void send( M m ) {
    trim(); // remove dead listeners
    // (read) lock here
    auto tmp_copy = listeners; // copy the listeners, just in case
    // unlock here

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};

Here is a multi-channel subject that can support any number of different message types (determined at compile-time). If you fail to match a message type, send and/or reg will fail to compile. You are responsible for deciding if a message is a const& or a value or whatever. Trying to reg an rvalue message won't work. It is intended that M is passed to reg and send explicitly.

// fancy wrapper around a tuple of broadcasters:
template<class...Ts>
struct subject {
  std::tuple<broadcaster<Ts>...> stations;
  // helper function that gets a broadcaster compatible
  // with a message type M:
  template<class M>
  broadcaster<M>& station() {
    return std::get< index_of_t<M, types<Ts...>>{} >( stations );
  }
  // register a message of type M.  You should call with M explicit usually:
  template<class M>
  token reg( std::function<void(M)> listener ) {
    return station<M>().reg(std::move(listener));
  }
  // send a message of type M.  You should explicitly pass M usually:
  template<class M>
  void send( M m ) {
    station<M>().send(std::forward<M>(m));
  }
};

live example.

When you reg, it returns a token, aka std::shared_ptr<void>. For as long as this token (or a copy) survives, messages will flow. If it goes away, messages to the reged callback will end. Typically that means listeners should maintain a std::vector<token>, and reg lambdas that use this willy-nilly.

In C++14/1z, the above gets a bit nicer (we can do away with types<...> and index_of for one).

If you add a listener during a broadcast cycle, it will not be sent to. If you remove a listener during a broadcast cycle, it will not be sent to after the point you removed it.

The thread safe comments are set up for reader/writer locks on broadcaster.

Memory allocated for dead listeners for a given broadcaster is reclaimed when trim or send is called. However, the std::function will have been destroyed long ago, so only a limited amount of memory is wasted until the next send. I do it then, because we are going to iterate over the list of messages anyhow, might as well clean up any mess first.

This solution has no RTTI or dynamic casting, and messages are only sent to listeners who understand them.


In things gets simpler. Drop all of the metaprogramming boilerplate, remove subject (keep broadcaster) and just do this to handle more than one channel:

template<class...Ms>
struct broadcasters : broadcaster<Ms>... {
  using broadcaster<Ms>::reg...;
  using broadcaster<Ms>::send...;

  template<class M>
  broadcaster<M>& station() { return *this; }
};

this broadcasters is now nearly drop-in improvement on subject above.

Due to improvements in std::function since , the reg function usually does the right thing unless the signal options are overly similar. If you do run into problems with reg or send, you are forced to call .station<type>().reg(blah).

But 99/100 times you can just do a .reg( lambda ) and .send( msg ) and overload resolution does the right thing.

Live example.

And here is the entire system augmented with a modular drop-in thread safety system:

struct not_thread_safe {
    struct not_lock {~not_lock(){}};
    auto lock() const { return not_lock{}; }
};
struct mutex_thread_safe {
    auto lock() const { return std::unique_lock<std::mutex>(m); }
private:
    mutable std::mutex m;
};
struct rw_thread_safe {
    auto lock() { return std::unique_lock<std::shared_timed_mutex>(m); }
    auto lock() const { return std::shared_lock<std::shared_timed_mutex>(m); }
private:
    mutable std::shared_timed_mutex m;
};
template<class D, class>
struct derived_ts {
    auto lock() { return static_cast<D*>(this)->lock(); }
    auto lock() const { return static_cast<D const*>(this)->lock(); }
};
using token = std::shared_ptr<void>;
template<class M, class TS=not_thread_safe>

struct broadcaster:
  TS
{
  using f = std::function< void(M) >;
  mutable std::vector<std::weak_ptr<f>> listeners;
  token reg( f target )
  {
    auto l = this->lock();
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
  }
  // logically const, but not really:
  void trim() const {
    auto l = const_cast<broadcaster&>(*this).lock();
    auto it = std::remove_if( listeners.begin(), listeners.end(), [](auto&& p){
      return p.expired();
    } );
    listeners.erase( it, listeners.end() );
  }
  // logically const, but not really:
  void send( M m ) const
  {
    trim(); // remove dead listeners
    auto tmp_copy = [this]{
      auto l = this->lock();
      return listeners; // copy the listeners, just in case
    }();

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};
template<class TS, class...Ms>
struct basic_broadcasters :
    TS,
    broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >... 
{
  using TS::lock;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::reg...;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::send...;

  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>>& station() { return *this; }
  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>> const& station() const { return *this; }
};
template<class...Ms>
using broadcasters = basic_broadcasters<rw_thread_safe, Ms...>;

Live example.

broadcasters<Messages...> is now a read-write locked broadcasting class that uses 1 common shared lock to synchronize every broadcast queue.

basic_broadcasters<not_thread_safe, Messages...> instead creates one with no locking (ie, isn't thread safe).