I have the following code which throws error and no flux elements is "processed" so printed on the screen:
public class Test {
public static void main(String[] args) {
Flux.just("A", "B", "Z", "D")
.flatMap(Test::validate)
.collectList()
.subscribe(System.out::println);
}
private static Mono<String> validate(String s) {
return "Z".equals(s) ? Mono.error(new RuntimeException("It's Z!")) : Mono.just(s);
}
}
But let's say I want to improve performance with buffering and change main method to:
public static void main(String[] args) {
Flux.just("A", "B", "Z", "D")
.flatMap(Test::validate)
.buffer(2)
.subscribe(System.out::println);
}
This time error is thrown as well but 2 first elements belonging to first buffer are "processed" and displayed on the screen. How to make program behaving like in the first case where no flux elements is processed and keep buffering? This is ilustration of much more complex problem of mine. Introducing buffering gives big performance boost but if one element in the flux fails to validate, some elements are saved to the database while nothing should be saved.