In the blog post Flight of the Flux 3, the authors suggest wrapping a synchronous blocking call in a Mono
with a subscribeOn
call, as shown in this snippet from the article:
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
But earlier in the same post, they show that you can use publishOn
to ensure a blocking call is done on a separate thread:
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Given that, why not just implement the betterFetchUrls
method using publishOn
directly?
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url));
}
Isn't that simpler? The Reference Manual for Reactor in Appendix C also does the wrapped call in a Mono
with a subscribeOn
, so I presume there must be a reason that's preferred, but I can't figure out what that reason might be.
Thanks for any insight.
Your code is actually fine, expect for you have to use a
flatMap
instead of amap
as:Both
subscribeOn
andpublishOn
do a context switch, so both are pretty expensive operations (I have no idea on which one is more expensive though, would love some documentation around this). But the usual accepted way to make blocking calls is using asubscribeOn
as mentioned in the docs.