What I do:
I am using vertx rx http client to perform a large number of HTTP requests. in this specific case I am calling "method A" which returns a list of IDs. to receive all the IDs I need to call method A several times to get the next batch of results. ( each time I specify a different page number I want to receive)
In order to improve performance and make the calls in parallel as much as possible I create a list of (RxJava) Observables items each represent the result of a single page request. when I am done creating this list I call Obserable.zip operator and I pass the list of observable.
The Issue:
Using the vertx http client without special settings everything works but rather very slowly. e.g. 3000 http requests are processed in 5 minutes.
I tried to improve the performance by setting the vertx http client options as follows:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
but when I do that I get unstable results: sometimes everything works fine and I am able to receive all responses in less than 20 seconds. however, sometimes the external server I all calling closes the connection and the log shows the following error:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
- No error handler in my code is called
- When this error appears the zip operator hangs
Here is the code which creates the HttpClientRequest
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
asdasdasd
If you think that you can hook up your exception logic like this
You won't receive any error because essentially all your processing is now dwelling in Rx ecosystem and hence nothing would be reported to you here in your try catch blocks.
The errors from this point of time would be coming to you from your
or