Why sink operation execute multiple times in my flink program?

337 Views Asked by At

I have a flink program with source from kafka, and i opened three windowedStream:seconds, minutes,hours.Then sending window result to others by AsyncHttpSink extends RichSinkFunction.But i found that same window,one kafka message, same result may invoke AsyncHttpSink.invoke() function multiple times which aroused my curiosity.Shouldn't it happen just once in same window,one kafka message, same result?

hourOperator.addSink(httpSink(WindowType.h));
minuteOperator.addSink(httpSink(WindowType.m));
secondOperator.addSink(httpSink(WindowType.s));


/**
 * http sink
 */
public class AsyncHttpSink extends RichSinkFunction<Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>>> {


    public AsyncHttpSink(WindowType windowType) {
        this.windowType = windowType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .build();
        httpClient.start();
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

    }



    @Override
    public void invoke(Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>> tuple3, Context context) throws Exception {
        
        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                try {
                    logger.info("[httpSink]http sink completed.");
                } catch (IOException e) {
                    logger.error("[httpSink]http sink completed. exception:", e);
                }
            }

            @Override
            public void failed(Exception ex) {
                logger.error("[httpSink]http sink failed.", ex);
            }

            @Override
            public void cancelled() {
                logger.info("[httpSink]http sink cancelled.");
            }
        });
    }
}

1

There are 1 best solutions below

6
On

If this is a keyed window, then each distinct key that has results for a given window will report its results separately.

And you may have several parallel instances of the sink.