Spring WebFlux return response when error occurs in Mono.zip but keep processing them

90 Views Asked by At

I have a REST service (let's call it 'MAIN SERVICE') that calls other remote services. For performance purpose I need MAIN SERVICE to instant answer when one of the remote calls throw an error ; but I need to keep listening to other calls response to log them or compute verifications.

Basically I have something like, a 'MAIN SERVICE':

public ResponseEntity<SomeType> mainService() {
 Mono<A> remoteCallA = getRemoteCallA();
 Mono<B> remoteCallB = getRemoteCallB();

 SomeType result = Mono.zip(remoteCallA , remoteCallB)
  .doOnSuccess(...)
  .map(...)
  .block();

 return ResponseEntity.ok(response);
}

An other service that calls remote A and do some things with result :

//TAKES 1 SECONDE TO RESPOND
public Mono<A> getRemoteCallA() {
 return client.get()
  ...
  .doOnSuccess(response ->
    //MANDATORY EXECUTION);
}

An other service that calls remote B that throw an error :

//ERROR TRHOWED IN 500 MS
public Mono<B> getRemoteCallB() {
 return client.get()
  ...
  .doOnError(exception ->
    //SOME LOGGGING
  );
}

My problem is that call B fails before call A and I want to instant return an HTTP response. But then, when call A completes, doOnSuccess is not triggered (any doOnXXX method aswell).

I am not sure but I think the zip stream is canceled (saw it with log()) so A events are not triggered ?

zipDelayError is not a solution otherwise I have to wait 1 seconde before responding the error

1

There are 1 best solutions below

1
Vincent C. On BEST ANSWER

What seems to be working:

  • cache both Publisher so that they are hot sources
  • duplicate the subscription of the zip so that you can return one to client and handle errors in your local code
  • handle error on the local code using onErrorXXX
  • handle success on the local code using any transformation you wish
  • handle error signal on response code
  @Test
  void so_78147735() throws InterruptedException {

    var monoA = Mono.fromSupplier(() -> delay(1000L, false, "A")).subscribeOn(Schedulers.boundedElastic());
    var monoB = Mono.fromSupplier(() -> delay(500L, true, "B")).subscribeOn(Schedulers.boundedElastic());

    var cachedA = monoA.cache();
    var cachedB = monoB.cache();

    var response = Mono.zip(cachedA, cachedB).subscribeOn(Schedulers.boundedElastic());
    var local = Mono.zip(cachedA.map(resultOfA -> resultOfA.replace("A", "SUCCESS")),
                         cachedB.onErrorReturn("FAIL"))
                    .subscribeOn(Schedulers.boundedElastic());

    response.subscribe(objects -> System.out.println("response mono: " + objects.getT1() + " " + objects.getT2()));
    local.subscribe(objects -> System.out.println("local mono: " + objects.getT1() + " " + objects.getT2()));

    Thread.sleep(1000L);

  }

  private String delay(Long delay, Boolean error, String id) {
    try {
      Thread.sleep(delay);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }

    if (error) {
      throw new IllegalArgumentException(id);
    }

    System.out.println("delay method: " + id);

    return id;
  }