SpringBoot WebFlux: Parsing Parallel WebClient Requests

460 Views Asked by At

I am making parallel rest calls to an API with the spring webclient. The response looks like this

{
  "d": {
    "results": [
        {
            "id": "1",
            "name": "Test A"
        },
        {
            "id": "2",
            "name": "Test B",
        }

    ]
  }
}

POJOs:

public class ProductResponse {
    private Products d;
}

public class Products {
    private List<ProductModel> results;
}

public class Product {
    private String id;
    private String name;
}

API Call:

public Flux<ProductsResponse> getProducts(final List<String> pages) {
    return Flux.fromIterable(pages)
            .flatMap(page -> webClient.get().uri("SOMEURL?page={page}", page)
                    .accept(MediaType.APPLICATION_JSON)                        
                    .retrieve()
                    .bodyToMono(ProductsResponse.class))
            .log()
            .subscribeOn(Schedulers.elastic());
}

Getting the products

Flux<ProductsResponse> productList = getProductList(pages);
List<ProductsResponse> productsResponse = productList.collectList().block();
for (ProductsResponse response : productsResponse) {
    for (ProductModel product: response.getD().getResults()) {
        System.out.println(product.getProductId());
    }
}

This is also working but is there a way to return directly Flux<Product> or is it better to return Mono<List<Product>>?

(the next step would be to save the products in a database)

2

There are 2 best solutions below

0
On

Please try it as below:

        Flux<Product> productFlux = Flux.fromIterable(pages)
            .flatMap(page -> client.get().uri("SOMEURL?page={page}", page)
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(ProductResponse.class)
                    .map(productResponse -> productResponse.d.results)
                    .flatMapMany(Flux::fromIterable))
            .log()
            .subscribeOn(Schedulers.elastic());
0
On

You will optimally use the ReactiveCrudRepository interface for persisting data, where you can persist without using block().

public interface ProductRepository extends R2dbcRepository<Product> {}

You can use both approaches:

  1. with the raw flux:getProductList(pages).flatMap(productRepository::save);
  2. with collectList: getProductList(pages).collectList().flatMap(productRepository::saveAll);

I would go for the second approach, as it translates to one SQL statement with multiple inserts/updates.