My first attempt to define a measurement schedule was:
var schedule = Observable.Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(3)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(5)));
Unfortunately, it restarts when unsubscribed and resubscribed, which is not a desired behavior in my case. Therefore, I came with something like this:
class Schedule : IObservable<DateTime>, IDisposable
{
readonly ISubject<DateTime> _subject;
readonly IDisposable _subscrption;
public Schedule()
{
_subject = new BehaviorSubject<DateTime>(DateTime.UtcNow);
_subscrption = Observable.Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(3)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(5)))
.Select(i => DateTime.UtcNow)
.Subscribe(_subject);
}
public IDisposable Subscribe(IObserver<DateTime> observer)
{
return _subject.Subscribe(observer);
}
public void Dispose()
{
_subscrption.Dispose();
}
}
It works but requires disposing after use. Are there any simple way to define Schedule without exposing IDisposable?
If you will be unsubscribing all of your observers and then resubscribing, there really is no way around keeping a
Disposablearound for tracking the connection. Since your Observable would not have anyway of directly knowing if the latest unsubscribe is really the last one.I would recommend the use of
Generateinstead of Concatenating ObservablesPublishwill turn your Observable into a ConnectableObservable, and gives you two options for managing when the source actually produces events.In the first version the source will become "hot" once you connect, and when you want to disconnect you should dispose of connection.
In the second the source will automatically disconnect when it has no more observers.
see also hot vs cold observables