RX operator to scan with prior value, with signal deriving from value itself

47 Views Asked by At

Say you have a source observable of type IObservable<uint>. I'm looking for a transformation function

IObservable<uint> Countdown(IObservable<uint> source)
{
  // how? Something like ...
  return source.Select(value => ProduceInnerObservable(value)).Switch()
}

that transforms source as follows:

  • When source produces a value, the resulting observable should immediately publish the same value
  • Say the last element published was N. In the absence of any new elements from source, the result observable should publish the value N-1 after N ticks; then N-2 after N-1 ticks; and so on, until 0 is published
  • When source publishes a new value, this timer-like behaviour is reset based on the new value

Example inputs/outputs:

Tick time Source observable Result observable
0 10 10
10 9
19 8
21 2 2
23 1
24 0
100 1 1
101 0
... ... ...

What's the most elegant way to implement this?

1

There are 1 best solutions below

0
Oleg Dok On BEST ANSWER

Assuming your "ticks" are in tenths of a second - probably this will suit your needs:

static void Main()
{
    var subject = new Subject<int>();

    Countdown(subject).TimeInterval().Subscribe(x => Console.WriteLine(x));

    subject.OnNext(10);

    Thread.Sleep(4000);

    subject.OnNext(2);

    Thread.Sleep(4000);

    subject.OnNext(1);

    Console.ReadLine();

}

static IObservable<int> Countdown(IObservable<int> source)
{
    return source.Select(value => Observable.Generate(
        initialState: value-1,
        condition: i => i >= 0,
        iterate: i => i-1,
        resultSelector: i => i,
        timeSelector: _ => TimeSpan.FromMilliseconds(value*100)
        ).StartWith(value)).Switch();
}