Is it possible to send a message not to all observers using .net Rx?

563 Views Asked by At

I have a situation where there is an observable and let's say 10 observers attached to it. I would like to send the message to each new observer only until an observer somehow says to observable that it recognizes the message and it will process it. At this moment I would like to stop sending this message to other observers.

In other words each observer knows how to process a particular type of message and each one will take and process the message it recognizes. Others don't need to receive it after the one that recognizes it started processing. How this situation could be implemented with the reactive extensions? I assume we need some sort of notification back to the observable but I don't see how can it be done.

1

There are 1 best solutions below

0
On

This is what I came up with. Any comments, ideas, suggestions and critics is very welcome. The most interesting part is that IObserver<TValue, TResult> public interface already exists in the Rx library but it's used only in the Notification object. What I did, I created IObservable<TValue, TResult> counterpart, SelectiveSubject to take care of the logic to call observers until one of them returns true and ToSelective method extension. I'm actually surprised that it was not done in the library, at least the IObservable<TValue, TResult> part. After all, IObserver<TValue, TResult> existed.

public interface IObservable<out TValue, in TResult> {
   IDisposable Subscribe(IObserver<TValue, TResult> objObserver);
}



internal class SelectiveSubject<T> : IObserver<T>, IObservable<T, bool> {
   private readonly LinkedList<IObserver<T, bool>> _ObserverList;

   public SelectiveSubject() {
      _ObserverList = new LinkedList<IObserver<T, bool>>();
   }

   public void OnNext(T value) {
      lock(_ObserverList) {
         foreach(IObserver<T, bool> objObserver in _ObserverList) {
            if(objObserver.OnNext(value)) {
               break;
            }
         }
      }
   }

   public void OnError(Exception exception) {
      lock(_ObserverList) {
         foreach(IObserver<T, bool> objObserver in _ObserverList) {
            if(objObserver.OnError(exception)) {
               break;
            }
         }
      }
   }

   public void OnCompleted() {
      lock(_ObserverList) {
         foreach(IObserver<T, bool> objObserver in _ObserverList) {
            if(objObserver.OnCompleted()) {
               break;
            }
         }
      }
   }

   public IDisposable Subscribe(IObserver<T, bool> objObserver) {
      LinkedListNode<IObserver<T, bool>> objNode;
      lock(_ObserverList) {
         objNode = _ObserverList.AddLast(objObserver);
      }
      return Disposable.Create(() => {
         lock(objNode.List) {
            objNode.List.Remove(objNode);
         }
      });
   }
}



public static IObservable<T, bool> ToSelective<T>(this IObservable<T> objThis) {
   var objSelective = new SelectiveSubject<T>();
   objThis.Subscribe(objSelective);
   return objSelective;
}

Now the usage is as simple as this

 IConnectableObservable<int> objGenerator = Observable.Generate(0, i => i < 100, i => i + 1, i => i).Publish();
 IObservable<int, bool> objSelective = objGenerator.ToSelective();

 var objDisposableList = new CompositeDisposable(2) {
    objSelective.Subscribe(i => {
       Console.Write("1");
       if(i % 2 == 0) {
          Console.Write("!");
          return true;
       }
       else {
          Console.Write(".");
          return false;
       }
    }),
    objSelective.Subscribe(i => {
       Console.Write("2");
       Console.Write("!");
       return true;
    })
 };

 objGenerator.Connect();
 objDisposableList.Dispose();

In the example the first subscriber takes care of every other value in the sequence and the second subscriber takes care of the rest.