I have one flink application which will read data and sink to openSearch. Now it encountered 502 Bad Gateway and crashed, didn't retry as it should be. Below is the source code and call stack, then the flink application crashed, instead of retry it. Thank you.
final OpensearchSinkBuilder<T> builder = new OpensearchSinkBuilder<T>()
.setHosts(HttpHost.create(opensearchOutputEndpoint))
.setBulkFlushBackoffStrategy(
FlushBackoffType.EXPONENTIAL, config.getFlushMaxRetries(), config.getFlushDelayMillis())
.setBulkFlushInterval(config.getFlushIntervalMs())
.setBulkFlushMaxActions(config.getFlushMaxAction())
.setBulkFlushMaxSizeMb(config.getFlushMaxSizeMb())
.setFailureHandler(
(FailureHandler) failure -> {
if (ExceptionUtils.findThrowable(
failure, OpenSearchRejectedExecutionException.class)
.isPresent()) {
// thread pool full and will retry
logger.info("encountered EsRejectedExecutionException and will retry", failure);
} else if (ExceptionUtils.findThrowable(
failure, OpenSearchParseException.class)
.isPresent()) {
// malformed document and will discard
logger.error("encountered OpenSearchParseException and will discard", failure);
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to
// throw custom exceptions
try {
throw failure;
} catch (Throwable e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
})
.setEmitter((element, context, indexer) -> setEmitter(index, element, context, indexer));
and below is the call stack
org.apache.flink.util.SerializedThrowable: org.opensearch.client.ResponseException: method [POST], host https://vpc-gmcmhoi2ca.us-west-1.es.amazonaws.com, URI [/_bulk?timeout=1m], status line [HTTP/1.1 502 Bad Gateway]
<html>
<head><title>502 Bad Gateway</title></head>
<body>
<center><h1>502 Bad Gateway</h1></center>
</body>
</html>
at org.opensearch.client.RestClient.convertResponse(RestClient.java:375)
at org.opensearch.client.RestClient$1.completed(RestClient.java:425)
at org.opensearch.client.RestClient$1.completed(RestClient.java:421)
...
org.apache.flink.util.SerializedThrowable: org.opensearch.OpenSearchStatusException: Unable to parse response body
at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2208)
at org.opensearch.client.RestHighLevelClient$1.onFailure(RestHighLevelClient.java:2116)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:707)
...
org.apache.flink.util.SerializedThrowable: java.lang.RuntimeException: An error occurred in OpensearchSink.
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkErrorAndRethrow(OpensearchSink.java:510)
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkAsyncErrorsAndRequests(OpensearchSink.java:515)
...
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
...
I tried to update setFailureHandler and let the flink to retry it, but haven't find a solution.
Some update about this issue, according to the code of RestClient.java, if the sink operation encountered openSearch server internal error, then the openSearch client will mark current openSearch server node as dead and try to send request to other available openSearch server hosts.
Here, I am using the endpoint of AWS openSearch cluster, instead of dedicated hosts, such as https://vpc-gmcmhoi2ca.us-west-1.es.amazonaws.com. So if I only pass the endpoint to the OpenSearchSinkBuilder, then the client will think there is only one available host, actually, there are multiple hosts behind the AWS endpoint.