Why do I get a MissingBackpreasureException while using a backpreasurebuffer?

216 Views Asked by At

I want to implement a processing queue in RxJava which downloads some files. The number of files I want to download may be up to around 100.

Everything is developed on Android using RxJava 1.1.1

My current implementation looks something like this:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

_subject.onBackpressureBuffer(10000, new Action0() {
    @Override
    public void call() {
        Log.d(TAG, "onBackpressureBuffer called");
        return;
    }
});

// download a file
_subject.onNext(aValidURL);

Where _getObserver() returns a new observer object which downloads in the file in the "onNext" method.

However, my problem is that I quickly gets a MissingBackpreasureException, which I do not understand. I tried implementing a backpreasurebuffer, but it just doesn't seem to be called.

What am I doing wrong?

1

There are 1 best solutions below

1
On BEST ANSWER

In RxJava, when you apply an operator, you get a new Observable instance with the modified behavior but the original stays the same. Here, you called onBackpressureBuffer on the _subject but then don't use its result and otherwise the call does nothing. You need to apply that in sequence:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .onBackpressureBuffer(10000, new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "onBackpressureBuffer called");
                        return;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

// download a file
_subject.onNext(aValidURL);