My first attempt to define a measurement schedule was:
var schedule = Observable.Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(3)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(5)));
Unfortunately, it restarts when unsubscribed and resubscribed, which is not a desired behavior in my case. Therefore, I came with something like this:
class Schedule : IObservable<DateTime>, IDisposable
{
readonly ISubject<DateTime> _subject;
readonly IDisposable _subscrption;
public Schedule()
{
_subject = new BehaviorSubject<DateTime>(DateTime.UtcNow);
_subscrption = Observable.Concat(
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(3)).Take(3),
Observable.Interval(TimeSpan.FromSeconds(5)))
.Select(i => DateTime.UtcNow)
.Subscribe(_subject);
}
public IDisposable Subscribe(IObserver<DateTime> observer)
{
return _subject.Subscribe(observer);
}
public void Dispose()
{
_subscrption.Dispose();
}
}
It works but requires disposing after use. Are there any simple way to define Schedule without exposing IDisposable?
Here is the solution I defined which does not require disposing. It is pretty complex though, so I would highly appreciate if something can be simplified using Rx instrumentation.
The usage:
Where Schedule is: