Spring Webclient Flux fire and forget while keeping the flux

267 Views Asked by At

What I would like to achieve

  • For each element of a Flux of MyPojo Flux<MyPojo>, send a http request with the json payload to a third party web service, fire and forget style, while being able to return the Flux for follow up processing.

Information

  • the third party web server, which I have no control over, takes 5 seconds constant time to process one request.

  • The third party server does not offer any bulk or list API, it is one by one

  • The third party server is known to be very reliable, I am allowed to hit it as hard as possible

  • I do not care that much about the response, especially, there is no value in waiting 5 seconds for a response I will do nothing with.

What I have tried

  • Using this code:
    private Flux<MyPojo> sendMyPojoFireAndForget(Flux<MyPojo> myPojoFlux) {
        return myPojoFlux.flatMap(oneMyPojo -> webClient.post().uri("http://example.com/path").bodyValue(oneMyPojo).exchangeToMono(clientResponse -> Mono.just(oneMyPojo)));
    }

However, this is not fire and forget. With proper logs on my side and seeing some logs from the third party, it seems I am waiting for the response.

I also tried

webClient.post()
            .uri("http://example.com/path")
            .bodyValue(oneMyPojo)
            .retrieve()
            .bodyToMono(Void.class)
            .subscribe();

But this will make me "lose" the Flux of MyPojo and end up with a Flux of Void instead

Question:

  • How to send each element of the Flux to this external webserver fire and forget style, while retaining the entire flux for downstream processing?
1

There are 1 best solutions below

0
Emanuel Trandafir On BEST ANSWER

You can use multiple hooks for an observable (such as a Mono or a Flux): in this case, you can use the doOnNext() before subscribing:

@Test
void test() {
    Flux<MyPojo> myPojoFlux = Flux.just(
            new MyPojo(1),
            new MyPojo(2),
            new MyPojo(3),
            new MyPojo(4),
            new MyPojo(5)
    );

    myPojoFlux
      .doOnNext(this::sendPostRequest)
      .subscribe(this::doSomethingElse);
}

private void doSomethingElse(MyPojo myPojo) {
    System.out.println(myPojo);
}

private void sendPostRequest(MyPojo oneMyPojo) {
    webClient.get()
            .uri(...)
            .bodyValue(oneMyPojo)
            .retrieve()
            .bodyToMono(String.class)
            .subscribe(resp -> System.out.println(oneMyPojo + " " + resp));
}

Since the client works asynchronously if you don't block , the doSomethingElse method will be called regardless of the response of sendPostRequest