Is there a way to cancel and replace an observable if it is producing too many values too fast?

78 Views Asked by At

I have an Observable that produces values at variable speed. In order to not get overwhelmed with values I added a Throttle of three seconds so I only get a value if no further values were published for three seconds. Want I want though is to end the stream if I get a certain amount of updates within a time period and replace it with another observable.

For example, if I get 50 updates in within three seconds, end the stream and replace it with a different stream, similar to how Catch can replace an observable that was terminated by an exception with another one.

Something like the below but no exception is being thrown so can't use Catch:

myObservable
   .Throttle(TimeSpan.FromSeconds(3)) //Not sure if we need to remove Throttle
   .Catch<long, Exception>(e => Observable.Return(long)0)  //Instead of catching an exception, some way to monitor how many updates are coming in before throttling
   .Subscribe

EDIT: I added a marble diagram to try to show what I am looking for.

The initial observable produces values at a variable rate. Values 1-6 come in, none within a burst of 50 in 3 seconds, and these values pass through to throttle, and the final values of 1, 5, and 6 are produced.

Then, the initial observable produces values 7-60, within 3 seconds. Here is where I am trying to do what "???" is showing. The idea was to recognize that 50 or more items were produced within the set timeframe, complete the original obs. and replace it with one I provide, similar to how you can provide an obs. sequence in Catch to replace one that errored (for example if I saw the original sequence produced that huge burst and threw an exception).

After the initial obs. is replaced, the sequence continues with the new one, with the produced items going through the existing throttle.

If only 49 items come within the timespan checked in "???", those values will all pass through to Throttle and only the last one would be produced. If no updates come in at all then nothing happens and no output is produced.

Hopefully what I am asking is a bit more clear now.

enter image description here

3

There are 3 best solutions below

0
Progman On

You can use Scan() to build a sliding window of the last 50 items (couldn't get it work with Buffer() or Window(), but I guess it's possible). Each item is enriched with a timestamp. Then for each sliding window, you check the first and last timestamp and check if they are too close. If that is the case, you switch to the other observable.

To "enrich" the value with a timestamp, you simply use Select():

IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
{
    return Tuple.Create(it, DateTime.UtcNow);
});

Then use Scan() to build the sliding window:

IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
(acc, it) =>
{
    while (acc.Count >= countLimit)
    {
        acc.RemoveAt(0);
    }
    acc.Add(it);
    return acc;
});

Now your "values" are a list of tuples, where each entry has the value and the timestamp it was emitted. We use TakeWhile() to stop emitting values when the start and end timestamps are too close:

IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it => 
{
    if (it.Count < countLimit)
    {
        return true;
    }
    DateTime firstTime = it.First().Item2;
    DateTime lastTime = it.Last().Item2;
    TimeSpan timeDiff = lastTime - firstTime;
    if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds)) 
    {
        return false;
    }
    return true;
});

For debugging purposes, we will print the values of the first and last entry:

IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
{
    Console.WriteLine("Count: "+it.Count);
    if (it.Count > 0) 
    {
        DateTime firstTime = it.First().Item2;
        DateTime lastTime = it.Last().Item2;
        TimeSpan timeDiff = lastTime - firstTime;
        Console.WriteLine("Timediff is: "+timeDiff);
    } 
});

Then we "extract" the original value again:

IObservable<int> onlyValueOfLastItem = withDebugging.Select(it => it.Last().Item1);

Now we have a stream which will "die" when the timestamps are too close. We can switch to the other observable with a simple Concat() (or Switch()):

IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));

Here is the full source code:

static void Main(string[] args)
{
    ISubject<int> source = new Subject<int>();

    IObservable<Tuple<int, DateTime>> sourceWithTimeInfo = source.Select(it =>
    {
        return Tuple.Create(it, DateTime.UtcNow);
    });

    int countLimit = 50;
    int timeLimitInSeconds = 3;

    IObservable<IList<Tuple<int, DateTime>>> bufferedSource = sourceWithTimeInfo.Scan((IList<Tuple<int, DateTime>>)new List<Tuple<int, DateTime>>(),
    (acc, it) =>
    {
        while (acc.Count >= countLimit)
        {
            acc.RemoveAt(0);
        }
        acc.Add(it);
        return acc;
    });

    IObservable<IList<Tuple<int, DateTime>>> stopWhenTooMuchSource = bufferedSource.TakeWhile(it => 
    {
        if (it.Count < countLimit)
        {
            return true;
        }
        DateTime firstTime = it.First().Item2;
        DateTime lastTime = it.Last().Item2;
        TimeSpan timeDiff = lastTime - firstTime;
        if (timeDiff < TimeSpan.FromSeconds(timeLimitInSeconds)) 
        {
            return false;
        }
        return true;
    });

    IObservable<IList<Tuple<int, DateTime>>> withDebugging = stopWhenTooMuchSource.Do(it =>
    {
        Console.WriteLine("Count: "+it.Count);
        if (it.Count > 0) 
        {
            DateTime firstTime = it.First().Item2;
            DateTime lastTime = it.Last().Item2;
            TimeSpan timeDiff = lastTime - firstTime;
            Console.WriteLine("Timediff is: "+timeDiff);
        } 
    });

    IObservable<int> onlyValueOfLastItem = withDebugging
        .Select(it => it.Last().Item1);

    IObservable<int> concatSource = onlyValueOfLastItem.Concat(Observable.Return(-1));

    Console.WriteLine("Subscribe start");
    concatSource.Subscribe(it =>
    {
        Console.WriteLine(it);
    });
    
    Thread t = new Thread(() =>
    {
        int maxDelay = 300;
        int counter = 1;
        while (maxDelay > 0)
        {
            Console.WriteLine("maxDelay is: "+maxDelay);
            source.OnNext(counter++);
            int sleepAmount = Random.Shared.Next(1, maxDelay);
            maxDelay--;
            Thread.Sleep(sleepAmount);
        }
    });
    t.Start();

    t.Join();

    Console.WriteLine("Program ends");
}

This can generate an output like this:

              [...]
maxDelay is: 111
Count: 50
Timediff is: 00:00:03.0516947
190
maxDelay is: 110
Count: 50
Timediff is: 00:00:03.0656792
191
maxDelay is: 109
Count: 50
Timediff is: 00:00:03.0892908
192
maxDelay is: 108
Count: 50
Timediff is: 00:00:03.1163132
193
maxDelay is: 107
Count: 50
Timediff is: 00:00:03.1003305
194
maxDelay is: 106
Count: 50
Timediff is: 00:00:03.0078090
195
maxDelay is: 105
-1
              [...]

You still have to readd the Throttle() call at the right position again.

0
Shlomo On

@Progman's logic is sound, but it doesn't use built-in functions which can really cut down on the code.

Solution:

public static IObservable<T> SwitchAfter50In3Seconds<T>(this IObservable<T> original, IObservable<T> alt)
{
    return original.Publish(_original => _original
            .Timestamp()
            .Zip(_original.Timestamp().Skip(50))
            .TakeWhile(t => t.First.Timestamp - t.Second.Timestamp > TimeSpan.FromSeconds(3))
            .Select(t => t.First.Value)
            .Merge(_original.Take(50))
        )
        .Concat(alt);
}

Explanation:

  • The Publish call is just there to ensure we only subscribe to original once.
  • The Timestamp function adds a timestamp to each message.
  • Zip with Skip(50) essentially pairs each item of index n with item index n-50.
  • Once they're paired, it's easy to compare them in a TakeWhile which will terminate if we're too frequent.
  • The Merge is there to get the first 50 items, since they will otherwise be skipped (since they have nothing to pair to).
  • The final Concat will switch to the alternate observable.
1
Enigmativity On

Whenever you're trying to replace one observable with another without changing subscriptions you should use the Switch operator to, well, make the switch.

The pattern looks like this:

IObservable<T> final =
    original
        .Publish(published =>
            published
                .TransformInSomeWayThatProducesAValueWhenYouWantToSwitch()
                .Take(1) //only switch once
                .Select(xs => replacement)
                .StartWith(published)
                .Switch());

Here's a concrete example that, I think, achieves what you're after.

var rng = new Random();
IObservable<int> myObservable = Observable.Generate(0, x => true, x => x + 1, x => x, x => TimeSpan.FromSeconds(rng.NextDouble() / 9.75));
IObservable<int> myReplacement = Observable.Return(-1);

IObservable<int> myFinal =
    myObservable
        .Publish(published =>
            published
                .Timestamp()
                .Select(x => x.Timestamp)
                .Buffer(50, 1)
                .Where(xs => xs.Count == 50)
                .Where(xs => xs.Last().Subtract(xs.First()).TotalSeconds < 3.0)
                .Take(1)
                .Select(xs => myReplacement)
                .StartWith(published)
                .Switch());

A typical run for me will produce values like this:

1
2
3
...
109
110
111
-1