Blunder about RichCoFlatMapFunction in flink 1.17.2 according to the official leanring guide

14 Views Asked by At

etl-Connected Streams

I just try the code provided by the flink offical guide book.

public class ControlStream {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> control = env
                .fromElements("DROP", "IGNORE") // as blackList
                .keyBy(x -> x);
        DataStream<String> streamOfWords = env
                .fromElements("Apache", "DROP", "Flink", "IGNORE") // items should be filtered by blackList
                .keyBy(x -> x);
                
        control.connect(streamOfWords)
                .flatMap(new ControlFunction())
                .print();

        env.execute(ControlStream.class.getName());
    }
    
    public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
        private final Logger LOGGER = LoggerFactory.getLogger(ControlFunction.class);
        private ValueState<Boolean> blackList;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("blackList", Types.BOOLEAN);
            blackList = getRuntimeContext().getState(desc);
        }

        @Override
        public void flatMap1(String value, Collector<String> out) throws Exception {
            blackList.update(true);
            LOGGER.info(String.format("[flatMap1] value=%s, state=%s", value, blackList.value()) ); // null
        }

        @Override
        public void flatMap2(String value, Collector<String> out) throws Exception {
            LOGGER.info(String.format("[flatMap2] value=%s, state=%s", value, blackList.value()) ); // null
            if (blackList.value() == null || !blackList.value()) {
                out.collect(value);
            }
        }
    }
}

what impressed me is that the log is (the std out print order)

[flatMap2] value=Flink, state=null
[flatMap2] value=Apache, state=null
[flatMap2] value=IGNORE, state=null
[flatMap2] value=DROP, state=null

16> Flink
4> DROP
13> IGNORE
11> Apache

[flatMap1] value=DROP, state=true
[flatMap1] value=IGNORE, state=true

sincerely hoping the solution

exchange the code in flatMap1 and flatMap2; add log output in both method; alter control and streamOfWords call order in main; and GPT...

1

There are 1 best solutions below

1
David Anderson On

I'm the author of that part of the documentation about learning Flink.

The paragraph at end of that section explains what is going on:

It is important to recognize that you have no control over the order in which the flatMap1 and flatMap2 callbacks are called. These two input streams are racing against each other, and the Flink runtime will do what it wants to regarding consuming events from one stream or the other. In cases where timing and/or ordering matter, you may find it necessary to buffer events in managed Flink state until your application is ready to process them. (Note: if you are truly desperate, it is possible to exert some limited control over the order in which a two-input operator consumes its inputs by using a custom Operator that implements the InputSelectable [interface].)