I have a flink program with source from kafka, and i opened three windowedStream:seconds, minutes,hours.Then sending window result to others by AsyncHttpSink extends RichSinkFunction.But i found that same window,one kafka message, same result may invoke AsyncHttpSink.invoke() function multiple times which aroused my curiosity.Shouldn't it happen just once in same window,one kafka message, same result?
hourOperator.addSink(httpSink(WindowType.h));
minuteOperator.addSink(httpSink(WindowType.m));
secondOperator.addSink(httpSink(WindowType.s));
/**
* http sink
*/
public class AsyncHttpSink extends RichSinkFunction<Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>>> {
public AsyncHttpSink(WindowType windowType) {
this.windowType = windowType;
}
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpAsyncClients.custom()
.build();
httpClient.start();
}
@Override
public void close() throws Exception {
httpClient.close();
}
@Override
public void invoke(Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>> tuple3, Context context) throws Exception {
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
logger.info("[httpSink]http sink completed.");
} catch (IOException e) {
logger.error("[httpSink]http sink completed. exception:", e);
}
}
@Override
public void failed(Exception ex) {
logger.error("[httpSink]http sink failed.", ex);
}
@Override
public void cancelled() {
logger.info("[httpSink]http sink cancelled.");
}
});
}
}
If this is a keyed window, then each distinct key that has results for a given window will report its results separately.
And you may have several parallel instances of the sink.