here I have a flowable that emits elements every millisecond.
Flowable<Long> source = Flowable.interval(1,TimeUnit.MILLISECONDS).take(14000);
source.map(e->{
Log.d("TAGBefore","before " + e);
return e;
})
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(
e-> {
Log.d("TAGNext","onNext: " + e);
Thread.sleep(100);
},
e-> Log.d("TAGError","error: " + e),
()-> Log.d("TAGComplete","onComplete")
);
I use the before to know the moment in which the observable emits elements, my doubt is that here from 127 (when the observer is full) it goes to 9688
TAGNext: onNext: 125
TAGNext: onNext: 126
TAGNext: onNext: 127
TAGNext: onNext: 9668
TAGNext: onNext: 9669
TAGNext: onNext: 9670
However, when I check the console more (with other search filters), I realize that when 127 was issued it already goes to 12794, so instead of 9688 it shouldn't be 12794 or a close number? , thanks.
TAGBefore: before: 12793
TAGBefore: before: 12794
TAGNext: onNext: 127
TAGBefore: before: 12795
TAGBefore: before: 12796
However, when I check the console more (with other search filters), I realize that when 127 was emitted it already goes for 12794, so instead of 9688 it should not be 12794 or a close number that is when the observable is already free ?, I clarify that I am new in RxJava in case I said something wrong, thanks.
observeOn
has a default 128 element buffer that fills up pretty fast.It is drained every 100 milliseconds until it has only 32 element remaining, at which point it requests 96 more. So it takes about 9600 milliseconds to have more items pass the
onBackpressureDrop
, hence you seeTAGNext: onNext: 9668
.When the 127th element is drained, you are about 12700 milliseconds into the run thus you see
TAGBefore: before: 12795
from the producer side.