Throwing data through narrow gate with Rx (Reactive Extensions)

162 Views Asked by At

I have a device with some send buffer, and i have to feed them with data avoiding buffer overflow.

I have two streams:

  • one is flow of data (IObservable<T>)
  • other is free buffer size (number of data pieces, wich I can throw to device) (IObservable<int>).

I need to combine it into one stream:

  1. when data comes through first stream:
    1. if buffer is occupied (data in second stream = 0), data should be buffered
    2. if buffer is free (data in second stream = n > 0), first n (or less) pieces of data should be throwed to exit stream as IList<T>, other part of data should be buffered
  2. when new free buffer size comes (if it > 0), first n (or less) pieces of data from buffer should be throwed to exit stream
1

There are 1 best solutions below

0
On

You should be able to do that with Observable.Buffer and specify a count of n. Each time the buffer reaches n the data gets sent to the subscriber, which will be code to send it to the exit stream

var obs = Observable.Range(0,10)
                    .Buffer(4)
                    .Subscribe(x =>
{
    Console.WriteLine("Sending data...");

    // Simulate sending all data in x
    foreach (var element in x)
    {
        Console.WriteLine(element);
    }
});