I am trying to learn RxJava and going through some content. I came through this example where Subscriber unsubscribes soon after subscribing to observable.
public static void main(String[] args) {
Observable<Integer> ints = Observable.create(subscriber -> {
Runnable r = () -> {
sleep(10, SECONDS);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(5);
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
Subscription subscription = Subscriptions.create(thread::interrupt);
System.out.println("inner subscription : "+subscription);
subscriber.add(subscription);
});
Subscription subscription = ints.subscribe(x->System.out.println(x));
System.out.println("outer subscription : "+subscription);
subscription.unsubscribe();
}
static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException ignored) {
//intentionally ignored
}
}
As we are creatig a subscription in main method and adding it into subscriber subscription callback will interupt the thread immediately without waiting for 10 seconds.
My understanding was that subscrption which we have added should be same as the one on which unsubscribe is called but when I am printing both on console they are different.
- if subscription are different how calling unsubscribe on one is triggering interrupt on thread.
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb outer subscription : rx.observers.SafeSubscriber@5d5eef3d
- Does we have 1 subscriber with two different subscription here ?
Thanks
SafeSubscriberwraps the originalSubscriber, in documentation you can read:So your
unsubscribe()call will eventueally call yoursubscriptionwhich will interrupt your thread. You can investigate it on your own in debugger, looking at theSafeSubscriberinstance, in actual->subscriptions list.No, there is only one
Subscriberhere, theSafeSubscriberis just a wrapper around the originalSubscriberto ensure the RxJava contract is correctly followed.You also should be aware that your code indicates you are following some rxjava1 guides/books,
Subscriptionclass was replaced in rxjava2 withDisposableclass, over 4 (?) years ago. Currently rxjava is at version 3.