How to unsubscribe from Flowable in RxKotlin/RxJava?

3.4k Views Asked by At

I am using Room with RxJava/RxKotlin Flowable following this article. I got it running but the got issue using it with ViewPager with 3 fragments.

I will walk you through my code little bit:

I have a viewpager with tab layout and three fragments(A, B, and favorites). First two fragments contain lists of data which can be added to favorite.

In favorite fragment, I am using Flowable to listen to changes made by A and B and update list accordingly. But what happens is when an item is made favorite in A and B, app crashes because the Flowable subscription in favorite fragment runs even when the fragment is not in foreground.

What I want is to be able to stop the subscription when the fragment is not in foreground and start in foreground.

I tried to stop it in onPause method of favorite fragment but flowable has no unsubscribe or dispose method.

My code is

dbRepository?.getAllImportant()?.subscribeOn(Schedulers.io())
            ?.observeOn(AndroidSchedulers.mainThread())
            ?.subscribe(getFlowableSubscriber())
4

There are 4 best solutions below

0
On BEST ANSWER

Thank you for answers. I found a work around. By saving Subscription in onSubscribe(subscription: Subscription) function of FlowableSubscriber. And later calling subscription.cancel in onStop()

        override fun onSubscribe(subscription: Subscription) {
            log("onSubscribe")
            [email protected] = subscription
        }

And

    override fun onStop() {
           log("onStop() ")
           this.subscription.cancel()
           super.onStop()
     }
1
On

When you subscribe to a Flowable it returns a Dispose object. If you call dispose() on that, you will unsubscribe from the disposable.

I can't see your code, but if that object is not accessible to you, you can also add a takeUntil or takeWhile to your Flowable if you are out of view. This is a bit of a hack though because it doesn't unsubscribe from the source observable until the next item is emitted and takeUntil is called. Calling dispose is the best option.

0
On

For the current situation: let your subscriber extend DisposableSubscriber, and subscribe like


yourFlowable
  .subscribeWith(yourSubscriber)
  .addTo(compositeDisposable)
0
On

You should include the Flowable in a compositeDisposable(), and use composite.dipose() when you want unsubscribe.

CompositeDisposable composite = new CompositeDisposable();

composite.add(dbRepository?.getAllImportant()?.subscribeOn(Schedulers.io())
        ?.observeOn(AndroidSchedulers.mainThread())
        ?.subscribe(getFlowableSubscriber())

and then when you want unsubscribe:

composite.dispose();

Normally you should unsubscribed the Flowable in the onDestroy or onPause method.