We have the following web handler which queries some data, process them and update them back into db in the background of the web service with Mono.subscribeOn().
public Mono<ServerResponse> handle(ServerRequest request) {
return request
.bodyToMono(String.class)
.doOnNext(param -> queryData(param) // Query gets a flux of items
.collectList() // Data processing require a view of all the data queried, so we need to collect them
.flatMapMany(this::executeBulkOperations)
.reduce(true, (a, c) -> a && c) // In case of some update failed, we still want to update the rest (hence not using Flux.any)
.flatMap(this::notify)
.onErrorResume(e -> {
log.error("onErrorResume ", e);
return notify(false);
})
.subscribeOn(Schedulers.boundedElastic())
.doOnError(e -> log.error("doOnError ", e))
.subscribe())
.flatMap(param -> ok().body(Mono.just("ok"), String.class));
}
Note that:
- We put the query-processing-update under
doOnNextbecause the application we are implementing is using best-effort 'fire-and-forget' pattern. When this API is triggered, query-processing-update will take time in the background but the triggering application does not need to wait for its result and even if the server is shutdown in the middle, it is still fine. - Everything in the handler is async / reactive but we still put it on
Schedulers.boundedElastic()because IntelliJ recommended it and I believe if not publisher may get scheduled on epoll / http threads and too many of those stuff can lead to delay in event loops even if it's fully async / reactive.
Our issue happens that sometimes when an error is thrown from executeBulkOperations, we observed that doOnError is executed but onErrorResume is not.
The exact executeBulkOperations is an API from AzureSDK. I tried a couple of things on my end but couldn't seem to implement a function to reproduce the exact error handling behavior (in my cases, onErrorResume is always run):
private Flux<String> queryData(String param) {
return Flux.just("a", "b", "c");
}
private Flux<Boolean> executeBulkOperations(List<String> items) {
return Flux
.fromIterable(items)
.flatMap(c -> c.equals("c") ?
Mono.error(new RuntimeException("Boom")) : Mono.just(true)
);
}
private Mono<Boolean> notify(boolean res) {
return Mono.just(true);
}
While I'm not interested in digging into Microsoft/Azure specific stuff, I'd like to understand:
- If it's possible to implement a small
executeBulkOperationswhich can have the same behavior or perhaps what we observed is rather an abnormal behavior - If the above is a normal behavior, what's the best practice for running stuff in the background while also handling error? (perhaps we shouldn't run those background stuff on
boundedElastic()?)