Is it wrong to use "try catch" inside a reactive stream operator(project reactor)?

220 Views Asked by At

I was exploring reactive streams with project reactor, and I had a use case where I needed to skip to the next event if an error occurred during the processing of the current event (e.g., deserialization error).

For such a use case, I found out that I can use the onErrorContinue operator, but I also thought about wrapping my callback with a try catch like this:

Flux.just(1, 2, 3, 4, 5).mapNotNull(item -> {                                                         
    try {                                                                                             
        System.out.println("item = " + item);                                                         
        if (item.equals(3)) throw new IllegalArgumentException(); // deserialization issue for example
        return item;                                                                                  
    } catch (Exception exception){                                                                    
        System.out.println(exception);                                                                
        return null;                                                                                  
    }                                                                                                 
}).subscribe(item -> System.out.println("got element :" + item));                                                                                                                                               

I have found this old issue before the introduction of onErrorContinue opperator where someone was complaining about having to wrap his callbacks in a try catch to solve the issue as I have shown in the example.

So I am wondering if there is a reason to prefer onErrorContinue over the second approach other than sticking to the functional reactive programming style, maybe? And if there is something wrong with the try-catch approach?

1

There are 1 best solutions below

1
On

In Reactive Streams when an error is signaled, the publisher is considered terminated:

Terminal State: For a Publisher: When onComplete or onError has been signalled. For a Subscriber: When an onComplete or onError has been received.

In order to prevent the termination of the Publisher in Reactor, you can try/catch or use onErrorContinue depending on your requirements.

In my opinion there is nothing wrong with try/catch and sometimes like in the following scenario, it's the best solution, because onErrorContinue will skip all error events, and sometimes you will handle terminal and non-terminal exceptions in the same chain of operators, for example:

        Flux.just(1, 2, 3, 4, 5, 6, 7)
            .map(item -> {
                System.out.println("item = " + item);
                if (item.equals(5)) {
                    throw new IllegalArgumentException("Terminal event"); // terminal exception
                }
                return item;
            })
            .mapNotNull(item -> {
                System.out.println("item = " + item);
                if (item.equals(3))  {
                    throw new IllegalArgumentException("Non-terminal event"); // non-terminal exception
                }
                return item;
            })
            .onErrorContinue((error, item) -> {
                System.out.println(error.getMessage());
            }).subscribe(item -> System.out.println("got element :" + item));

In this scenario, both exceptions thrown are the same type, but one is considered a terminal event. The correct solution for this problem in my opinion is using try/catch inside flatMap/concatMap instead of onErrorContinue:

        Flux.just(1, 2, 3, 4, 5, 6, 7)
            .map(item -> {
                System.out.println("item = " + item);
                if (item.equals(5)) {
                    throw new IllegalArgumentException("Terminal event"); // terminal exception
                }
                return item;
            })
            .concatMap(item -> {
                System.out.println("item = " + item);
                try {
                    if (item.equals(3))  {
                        throw new IllegalArgumentException("Non-terminal event"); // non-terminal exception
                    }
                    return Flux.just(item);
                } catch (IllegalArgumentException e) {
                    return Flux.empty();
                }
            })
            .subscribe(item -> System.out.println("got element :" + item));

This way the terminal event will end the Publisher, and the non-terminal event will be processed, preventing the emission by returning Flux.empty() in case of exception.