Why is this happening in with onBackpressureDrop () in RxJava

355 Views Asked by At

here I have a flowable that emits elements every millisecond.

   Flowable<Long> source = Flowable.interval(1,TimeUnit.MILLISECONDS).take(14000);
        source.map(e->{
            Log.d("TAGBefore","before " + e);
            return e;
        })
        .onBackpressureDrop()
        .observeOn(Schedulers.computation())
        .subscribe(
                        e-> {
                            Log.d("TAGNext","onNext: " + e);
                            Thread.sleep(100);
                        },
                        e-> Log.d("TAGError","error: " + e),
                        ()-> Log.d("TAGComplete","onComplete")
        );

I use the before to know the moment in which the observable emits elements, my doubt is that here from 127 (when the observer is full) it goes to 9688

   TAGNext: onNext: 125
   TAGNext: onNext: 126
   TAGNext: onNext: 127
   TAGNext: onNext: 9668
   TAGNext: onNext: 9669
   TAGNext: onNext: 9670

However, when I check the console more (with other search filters), I realize that when 127 was issued it already goes to 12794, so instead of 9688 it shouldn't be 12794 or a close number? , thanks.

   TAGBefore: before: 12793
   TAGBefore: before: 12794
   TAGNext:   onNext: 127
   TAGBefore: before: 12795
   TAGBefore: before: 12796
 

However, when I check the console more (with other search filters), I realize that when 127 was emitted it already goes for 12794, so instead of 9688 it should not be 12794 or a close number that is when the observable is already free ?, I clarify that I am new in RxJava in case I said something wrong, thanks.

1

There are 1 best solutions below

0
On

observeOn has a default 128 element buffer that fills up pretty fast.

It is drained every 100 milliseconds until it has only 32 element remaining, at which point it requests 96 more. So it takes about 9600 milliseconds to have more items pass the onBackpressureDrop, hence you see TAGNext: onNext: 9668.

When the 127th element is drained, you are about 12700 milliseconds into the run thus you see TAGBefore: before: 12795 from the producer side.