In RxJava 1 subscribing with an Observer returned a Subscription which could be unsubscribed.
In RxJava 2 subscribing with an Observer returns void and no Disposeable. How is it possible to stop that "Subscription"?
// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = new TestSubscriber<>();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();
// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = new TestObserver<>();
v2hot.subscribe(v2Observer); // void
EDIT: how to handle the case, where we use an observer which doesn't itself implement Disposable
, like BehaviorSubject
? Like in this example:
// v1
rx.Observable<Long> v1hot = rx.Observable.interval(1, TimeUnit.SECONDS);
rx.Observer<Long> v1observer = rx.subjects.BehaviorSubject.create();
Subscription subscription = v1hot.subscribe(v1observer);
subscription.unsubscribe();
// v2
Observable<Long> v2hot = Observable.interval(1, TimeUnit.SECONDS);
Observer<Long> v2Observer = BehaviorSubject.createDefault(-1L);
v2hot.subscribe(v2Observer); // void
All other subscribe methods return a
Disposable
. In your example, theTestObserver
itself implementsDisposable
, so you can calldispose()
on the observer itself to dispose of the subscription.Otherwise you can use
DisposableObserver
as a base class for your own custom observers to have theDisposable
behavior provided to you by the abstract base class.EDIT to answer the updated question:
In case you need to use the
subscribe(Observer)
method (the one returning void), but you need to use anObserver
which doesn't implementDisposable
, you still have the option to wrap yourObserver
in aSafeObserver
which will provide you withDisposable
behavior (among other contract conformance guarantees).