Does Observable.Range break The Observable Contract?

237 Views Asked by At

In learning about Rx I've come across an often repeated rule about Observables that is spelled out in The Observable Contract.

Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications.

This makes good sense to me, since it would be confusing to have an Observable continue to produce values after it has completed, but when I tested the Observable.Range method in .NET I noticed that it does not exhibit that behavior, and in fact many Observables violate this rule.

var rangeObservable = Observable.Range(0, 5);

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done first!"));
Console.ReadLine();

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done second!"));
Console.ReadLine();

//Output:
//0
//1
//2
//3
//4
//Done first!

//0
//1
//2
//3
//4
//Done second!

Clearly rangeObservable has called OnComplete two times and produced values after the first OnComplete. This leads me to believe that this is not a rule about Observables but instead a rule about Subscriptions. That is, an Observable may produce as many terminating messages as it wants, and even produce values after it has, so long as each Subscription only receives one terminating message and receives no further messages after that.

Do they actually mean Subscription when it says Observable? Are they truly different things? Do I have a fundamental misunderstanding of the model?

2

There are 2 best solutions below

6
On BEST ANSWER

The observable contract must be valid for any Observable being observed. Whether or not anything happens while the Observable is unobserved is left to the implementation of the observable.

It helps to think of analog in Enumerable - Observable being the dual of Enumerable. In enumerables, you would have range = Enumerable.Range(0, 5), and you would use the range similar to the above:

range.ForEach(Console.WriteLine); //prints 0 - 4

range.ForEach(Console.WriteLine); //prints 0 - 4 again

and find that this is perfectly acceptable behavior, because the the actual number generator gets created only when GetEnumerator is called. Similarly, in Observable, the equivalent method is Subscribe.

The implementation of range is something like:

        static IObservable<int> Range(int start, int count)
        {
            return Observable.Create<int>(observer =>
            {
                for (int i = 0; i < count; i++)
                    observer.OnNext(start + i);

                observer.OnCompleted();

                return Disposable.Empty;
            });
        }

Here, the observer => {...} function is called every time there is a subscription. The work gets done in the subscribe method. You can easily see that it (1) pushes the same sequence for every observer, (2) it only completes once per observer.

These observables where something happens only when you observe them are called cold observables. Here's an article describing the concept.

Note

The Range is a very naive implementation, just for illustration purposes. The method won't return the disposable until it completes - so Disposable.Empty is acceptable. A proper implementation would run the work on a scheduler and use a checked disposable to see if the subscription has been disposed before continuing the loop.

The takeaway is that implementing the observable contract by hand is hard, and this is why the Rx library exists - to build functionality by composition.

2
On

Observable.Range returns a cold observable, which means it "replays" it's behavior for each subscriber. Since the "OnNext* OnComplete|OnError" contract only applies to a subscription, this is totally fine.

For more information on hot/cold observables, see my answer on "IConnectableObservables in Rx"