In the blog post Flight of the Flux 3, the authors suggest wrapping a synchronous blocking call in a Mono with a subscribeOn call, as shown in this snippet from the article:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
But earlier in the same post, they show that you can use publishOn to ensure a blocking call is done on a separate thread:
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Given that, why not just implement the betterFetchUrls method using publishOn directly?
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url));
}
Isn't that simpler? The Reference Manual for Reactor in Appendix C also does the wrapped call in a Mono with a subscribeOn, so I presume there must be a reason that's preferred, but I can't figure out what that reason might be.
Thanks for any insight.
Your code is actually fine, expect for you have to use a
flatMapinstead of amapas:Both
subscribeOnandpublishOndo a context switch, so both are pretty expensive operations (I have no idea on which one is more expensive though, would love some documentation around this). But the usual accepted way to make blocking calls is using asubscribeOnas mentioned in the docs.