I want to implement a processing queue in RxJava which downloads some files. The number of files I want to download may be up to around 100.
Everything is developed on Android using RxJava 1.1.1
My current implementation looks something like this:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
_subject.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
});
// download a file
_subject.onNext(aValidURL);
Where _getObserver()
returns a new observer object which downloads in the file in the "onNext" method.
However, my problem is that I quickly gets a MissingBackpreasureException
, which I do not understand. I tried implementing a backpreasurebuffer
, but it just doesn't seem to be called.
What am I doing wrong?
In RxJava, when you apply an operator, you get a new Observable instance with the modified behavior but the original stays the same. Here, you called
onBackpressureBuffer
on the_subject
but then don't use its result and otherwise the call does nothing. You need to apply that in sequence: