Significant failure rate while using WebClient to make non-blocking REST calls

402 Views Asked by At

I'm writing application that scans one of online shops and it's looking for new offers there. I'm using official REST API that allows up to 9000 requests per second (per clientId) and I've been asked to check around 350 different queries so right now I'm calling this API every 3 seconds. My initial approach was to use blocking RestTemplate and spawning up to 350 Threads, each for one query. It worked, however I wanted to try non-blocking approach with WebClient. And here where problems started to grow.

My code is pretty straight forward, this is my non-blocking client

@Component
public class OffersListingReactiveClient {
    private final AuthenticationService authenticationService;
    private final WebClient webClient;

    public OffersListingReactiveClient(final AuthenticationService authenticationService, final WebClient webClient) {
        this.authenticationService = authenticationService;
        this.webClient = webClient;
    }

    public Mono<ApiResponse> collectData(final SearchQuery query) {
        final var accessToken = authenticationService.getAccessToken();
        return invoke(query.toUrlParams(), accessToken);
    }

    private Mono<ApiResponse> invoke(final String endpoint, final String accessToken) {
        return webClient.get()
                .uri(endpoint)
                .header("Authorization", accessToken)
                .retrieve()
                .bodyToMono(ApiResponse.class)
                .onErrorReturn(ApiResponse.emptyResponse());
    }
}

And part of service that is using that client, rest of the code is irrelevant I guess, I'm initializing initialItems with RestClient in few different Threads, once data is initialized, I'm publishing event that carries that data and from that moment I'm starting to collect new items with code below.

@Scheduled(fixedRate = 3000)
private void collectData() {
    if (!initialItems.isEmpty()) {
        try {
            queryRepository.getSearchQueries()
                    .forEach(query -> reactiveClient.collectData(query)
                            .subscribe(response -> {
                                this.processResponse(response);
                                this.logResults(response);
                            }));
        } catch (Throwable t) {
            logger.severe("Throwable " + t.getMessage());
        }
    }
}

private void processResponse(final ApiResponse response) {
    response.collectItems().forEach(item -> {
        if (!initialItems.containsKey(item.getId())) {
            initialItems.put(item.getId(), item);
            eventPublisher.publishEvent(new NewItemEvent(this, item));
        }
    });
}

private void logResults(final ApiResponse response) {
    if (response.isFailed()) {
        failRate.incrementAndGet();
    } else {
        successRate.incrementAndGet();
    }
    logger.info("Success rate: " + successRate + ", Failure rate: " + failRate);
}

As you can see, I added logResults() method to simply see how many requests are failing and that number is huge.

With 350 requests / 3 seconds, over 70% of them are failed!
with 175 requests / 3 seconds, its around 50% success / fail rate.
With 80 requests / 3 seconds, failure rate is around 10%.
With 40 requests / 3 seconds it's around 1%.

Why is that? on 40 / 80 requests I'm getting this exception

2020-10-12 20:19:08.345 WARN 5676 --- [ctor-http-nio-7] r.netty.http.client.HttpClientConnect : [id: 0xedf175d7, L:/192.168.1.11:62743 ! R:api.pl/xxxxx:443] The connection observed an error

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

Once I add more queries, meaning more requests, then I'm getting this one too

io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062) ~[netty-handler-4.1.52.Final.jar:4.1.52.Final] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Request to GET https://api.pl/offers/listing?phrase=ryzen&include=-all&include=items&sort=-startTime&category.id=42540aec-367a-4e5e-b411-17c09b08e41f [DefaultWebClient] Stack trace: at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062) ~[netty-handler-4.1.52.Final.jar:4.1.52.Final] at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.52.Final.jar:4.1.52.Final] at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]

Edit#

I'm adding my dependencies

<properties>
    <java.version>14</java.version>
</properties>

<parent>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>   
</dependencies>
0

There are 0 best solutions below