retryWhen used with flatMap throws exception

433 Views Asked by At

I wanted to retry in case of any exception from the service. But when using retryWhen am getting exception java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.

Without retry, its working fine

Flux.window(10)
    .flatMap(
          windowedFlux -> 
         webclient.post().uri(url)
        .body(BodyInserters.fromPublisher(windowedFlux, Request.class))
        .exchange()
        .doOnNext(ordRlsResponse -> {
                     if( ordRlsResponse.statusCode().is2xxSuccessful()) {                               
                        Mono<ResponseEntity<Response>> response = ordRlsResponse.toEntity(Response.class);              
                        //doing some processing here             
                     }           
                     else {                  
                        throw new CustomeException(errmsg);              
                     }

        }).retryWhen(retryStrategy)).subscribe();

And my retryStrategy is defined like:

Retry retryStrategy = Retry.fixedDelay((long)5, Duration.ofSeconds((long)5)) 
                         .filter(exception -> exception instanceof CustomeException) 
                         .doAfterRetry( exception -> log.info("Retry attempted"))
0

There are 0 best solutions below