I am trying to learn RxJava and going through some content. I came through this example where Subscriber unsubscribes soon after subscribing to observable.
public static void main(String[] args) {
Observable<Integer> ints = Observable.create(subscriber -> {
Runnable r = () -> {
sleep(10, SECONDS);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(5);
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
Subscription subscription = Subscriptions.create(thread::interrupt);
System.out.println("inner subscription : "+subscription);
subscriber.add(subscription);
});
Subscription subscription = ints.subscribe(x->System.out.println(x));
System.out.println("outer subscription : "+subscription);
subscription.unsubscribe();
}
static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException ignored) {
//intentionally ignored
}
}
As we are creatig a subscription in main method and adding it into subscriber subscription callback will interupt the thread immediately without waiting for 10 seconds.
My understanding was that subscrption which we have added should be same as the one on which unsubscribe is called but when I am printing both on console they are different.
- if subscription are different how calling unsubscribe on one is triggering interrupt on thread.
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb outer subscription : rx.observers.SafeSubscriber@5d5eef3d
- Does we have 1 subscriber with two different subscription here ?
Thanks
SafeSubscriber
wraps the originalSubscriber
, in documentation you can read:So your
unsubscribe()
call will eventueally call yoursubscription
which will interrupt your thread. You can investigate it on your own in debugger, looking at theSafeSubscriber
instance, in actual->subscriptions list.No, there is only one
Subscriber
here, theSafeSubscriber
is just a wrapper around the originalSubscriber
to ensure the RxJava contract is correctly followed.You also should be aware that your code indicates you are following some rxjava1 guides/books,
Subscription
class was replaced in rxjava2 withDisposable
class, over 4 (?) years ago. Currently rxjava is at version 3.