publishOn vs subscribeOn when making a blocking call in Reactor

3.6k Views Asked by At

In the blog post Flight of the Flux 3, the authors suggest wrapping a synchronous blocking call in a Mono with a subscribeOn call, as shown in this snippet from the article:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

But earlier in the same post, they show that you can use publishOn to ensure a blocking call is done on a separate thread:

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Given that, why not just implement the betterFetchUrls method using publishOn directly?

final Flux<String> betterFetchUrls(List<String> urls) {
    return Flux.fromIterable(urls)
        .publishOn(Schedulers.boundedElastic())
        .map(url -> blockingWebClient.get(url));
}

Isn't that simpler? The Reference Manual for Reactor in Appendix C also does the wrapped call in a Mono with a subscribeOn, so I presume there must be a reason that's preferred, but I can't figure out what that reason might be.

Thanks for any insight.

2

There are 2 best solutions below

0
On

Your code is actually fine, expect for you have to use a flatMap instead of a map as:

final Flux<String> betterFetchUrls(List<String> urls) {
    return Flux.fromIterable(urls)
        .publishOn(Schedulers.boundedElastic())
        .flatMap(url -> blockingWebClient.get(url));
}

Both subscribeOn and publishOn do a context switch, so both are pretty expensive operations (I have no idea on which one is more expensive though, would love some documentation around this). But the usual accepted way to make blocking calls is using a subscribeOn as mentioned in the docs.

0
On

The blog post aims at explaining the two operators. Also, the reason why subscribeOn could be preferable in that second snippet is exposed in the text:

But what if that url-fetching method was written by somebody else, and they regrettably forgot to add the publishOn? Is there a way to influence the Thread upstream?

Yes, the publishOn solution might be easier to read and follow, when that's available. That said, if you have no control about the outer sequence and you just want to return a Mono that ensures its work is done on a thread of your chosing, .subscribeOn is going to provide more guarantees.