RxJava subscribeOn conditionally

28 Views Asked by At

I would like to apply subscribeOn based on the condition on Observables.

For example in the below code

Observable.just("One", "Two", "Three")
    .flatMap(v ->
            performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread()) //I want to apply this if a condition is true. Otherwise run on the main thread
    )
    .subscribe(item -> System.out.println(item));
1

There are 1 best solutions below

0
On BEST ANSWER

You can introduce a local variable for the common part, then do the if, and return the common part or the augmented part based on it.

.flatMap(v -> {

    var o = performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " 
                 + Thread.currentThread().getName()));

    if (condition) {
        return o.subscribeOn(Schedulers.newThread());
    }
    return o;
})

If you want to be more fancy, use compose() and ternary:

.flatMap(v ->
            performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " 
                  + Thread.currentThread().getName()))
            .compose(o -> condition ? o.subscribeOn(Schedulers.newThread()) : o)
    )