I am adding a Sink (RichSinkFunction) to an Apache Flink project running on AWS Managed Flink that sends data to a Http webservice via REST calls (Http POST).
To improve performance I am using the Apache Http client (org.apache.httpcomponents.client5.httpclient5 v 5.3.1).
Here is the RichSinkFunction.open() method where I create the client. I only use a keep alive strategy of 30 sec and all other settings are default:
public void open(Configuration parameters) {
CloseableHttpAsyncClient client = HttpAsyncClients.custom().
.setKeepAliveStrategy((httpResponse, httpContext) -> TimeValue.of(30, TimeUnit.SECONDS))
.build();
client.start();
}
Here is the RichSinkFunction.invoke() function where I call the Http Post request:
SimpleHttpRequest simpleHttpRequest
= SimpleRequestBuilder.post(endPoint).setHeader(HttpHeaders.AUTHORIZATION, authHeader).build();
simpleHttpRequest.setBody(JSON_content, ContentType.APPLICATION_JSON);
simpleHttpRequest.setHeader("Accept", "application/json");
simpleHttpRequest.setHeader("Content-type", "application/json");
httpClient.execute(simpleHttpRequest, new FutureCallback<SimpleHttpResponse>() {
//Callback methods implemented here
});
I see that when there are number of Http updates are very high (I cannot find the exact threshold), and when the REST end point response time is slow, I get a lot of DeadlineTimeoutException.
org.apache.hc.core5.util.DeadlineTimeoutException: Deadline: 2024-03-09T12:40:32.438+0000, -199656424 MILLISECONDS overdue
at org.apache.hc.core5.util.DeadlineTimeoutException.from(DeadlineTimeoutException.java:49)
at org.apache.hc.core5.pool.StrictConnPool.processPendingRequest(StrictConnPool.java:323)
at org.apache.hc.core5.pool.StrictConnPool.processNextPendingRequest(StrictConnPool.java:304)
at org.apache.hc.core5.pool.StrictConnPool.release(StrictConnPool.java:266)
at org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager.release(PoolingAsyncClientConnectionManager.java:410)
at org.apache.hc.client5.http.impl.async.InternalHttpAsyncExecRuntime.discardEndpoint(InternalHttpAsyncExecRuntime.java:147)
at org.apache.hc.client5.http.impl.async.InternalHttpAsyncExecRuntime.discardEndpoint(InternalHttpAsyncExecRuntime.java:170)
at org.apache.hc.client5.http.impl.async.InternalAbstractHttpAsyncClient$2.failed(InternalAbstractHttpAsyncClient.java:346)
at org.apache.hc.client5.http.impl.async.AsyncRedirectExec$1.failed(AsyncRedirectExec.java:248)
at org.apache.hc.client5.http.impl.async.AsyncHttpRequestRetryExec$1.failed(AsyncHttpRequestRetryExec.java:197)
at org.apache.hc.client5.http.impl.async.AsyncProtocolExec$1.failed(AsyncProtocolExec.java:295)
at org.apache.hc.client5.http.impl.async.HttpAsyncMainClientExec$1.failed(HttpAsyncMainClientExec.java:131)
at org.apache.hc.core5.http2.impl.nio.ClientH2StreamHandler.failed(ClientH2StreamHandler.java:253)
at org.apache.hc.core5.http2.impl.nio.AbstractH2StreamMultiplexer$H2Stream.reset(AbstractH2StreamMultiplexer.java:1668)
at org.apache.hc.core5.http2.impl.nio.AbstractH2StreamMultiplexer$H2Stream.cancel(AbstractH2StreamMultiplexer.java:1693)
at org.apache.hc.core5.http2.impl.nio.AbstractH2StreamMultiplexer.onDisconnect(AbstractH2StreamMultiplexer.java:575)
at org.apache.hc.core5.http2.impl.nio.AbstractH2IOEventHandler.disconnected(AbstractH2IOEventHandler.java:96)
at org.apache.hc.core5.http2.impl.nio.ClientH2IOEventHandler.disconnected(ClientH2IOEventHandler.java:39)
at org.apache.hc.core5.reactor.ssl.SSLIOSession$1.disconnected(SSLIOSession.java:247)
at org.apache.hc.core5.reactor.InternalDataChannel.disconnected(InternalDataChannel.java:204)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.processClosedSessions(SingleCoreIOReactor.java:231)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.doTerminate(SingleCoreIOReactor.java:106)
at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:93)
at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
at java.base/java.lang.Thread.run(Thread.java:829)\n
Since this Sink's HTTP update is critical how can I configure this client in a way that it pools requests, waits for each request to complete, without dropping it and can handle high throughput (about 100K messages sent in parallel)?
Update: Approach 2:
I wanted to simplify the Client and try this without a keep alive strategy.
CloseableHttpAsyncClient client =
HttpAsyncClients.custom().build();
client.start();
I am seeing smaller number of errors, but I still see this ConnectionClosedException exception now:
org.apache.hc.core5.http.ConnectionClosedException: Connection is closed
at org.apache.hc.core5.http2.impl.nio.AbstractH2StreamMultiplexer.onDisconnect(AbstractH2StreamMultiplexer.java:581)
at org.apache.hc.core5.http2.impl.nio.AbstractH2IOEventHandler.disconnected(AbstractH2IOEventHandler.java:96)
at org.apache.hc.core5.http2.impl.nio.ClientH2IOEventHandler.disconnected(ClientH2IOEventHandler.java:39)
at org.apache.hc.core5.reactor.ssl.SSLIOSession$1.disconnected(SSLIOSession.java:247)
at org.apache.hc.core5.reactor.InternalDataChannel.disconnected(InternalDataChannel.java:204)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.processClosedSessions(SingleCoreIOReactor.java:231)
at org.apache.hc.core5.reactor.SingleCoreIOReactor.doExecute(SingleCoreIOReactor.java:133)
at org.apache.hc.core5.reactor.AbstractSingleCoreIOReactor.execute(AbstractSingleCoreIOReactor.java:86)
at org.apache.hc.core5.reactor.IOReactorWorker.run(IOReactorWorker.java:44)
at java.base/java.lang.Thread.run(Thread.java:829)
This seems like it would be a good use-case for Flink's available Asynchronous I/O API which is fault-tolerant, supports custom retry-strategies, etc. via a
RichAsyncFunction.Alternatively if your endpoint could handle extremely large requests similar to what you indicated (~100k messages), you could consider implementing your own queue in a stateful function to support your bulk scenario and your eviction strategy (either via a timer or with some defined threshold such as 10k requests).