In learning about Rx I've come across an often repeated rule about Observables that is spelled out in The Observable Contract.
Upon issuing an OnCompleted or OnError notification, it may not thereafter issue any further notifications.
This makes good sense to me, since it would be confusing to have an Observable continue to produce values after it has completed, but when I tested the Observable.Range method in .NET I noticed that it does not exhibit that behavior, and in fact many Observables violate this rule.
var rangeObservable = Observable.Range(0, 5);
rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done first!"));
Console.ReadLine();
rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done second!"));
Console.ReadLine();
//Output:
//0
//1
//2
//3
//4
//Done first!
//0
//1
//2
//3
//4
//Done second!
Clearly rangeObservable
has called OnComplete
two times and produced values after the first OnComplete
. This leads me to believe that this is not a rule about Observables but instead a rule about Subscriptions. That is, an Observable may produce as many terminating messages as it wants, and even produce values after it has, so long as each Subscription only receives one terminating message and receives no further messages after that.
Do they actually mean Subscription when it says Observable? Are they truly different things? Do I have a fundamental misunderstanding of the model?
The observable contract must be valid for any Observable being observed. Whether or not anything happens while the Observable is unobserved is left to the implementation of the observable.
It helps to think of analog in Enumerable - Observable being the dual of Enumerable. In enumerables, you would have
range = Enumerable.Range(0, 5)
, and you would use the range similar to the above:and find that this is perfectly acceptable behavior, because the the actual number generator gets created only when
GetEnumerator
is called. Similarly, in Observable, the equivalent method isSubscribe
.The implementation of range is something like:
Here, the
observer => {...}
function is called every time there is a subscription. The work gets done in the subscribe method. You can easily see that it (1) pushes the same sequence for every observer, (2) it only completes once per observer.These observables where something happens only when you observe them are called cold observables. Here's an article describing the concept.
Note
The
Range
is a very naive implementation, just for illustration purposes. The method won't return the disposable until it completes - soDisposable.Empty
is acceptable. A proper implementation would run the work on a scheduler and use a checked disposable to see if the subscription has been disposed before continuing the loop.The takeaway is that implementing the observable contract by hand is hard, and this is why the Rx library exists - to build functionality by composition.