I just try the code provided by the flink offical guide book.
public class ControlStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE") // as blackList
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE") // items should be filtered by blackList
.keyBy(x -> x);
control.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute(ControlStream.class.getName());
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private final Logger LOGGER = LoggerFactory.getLogger(ControlFunction.class);
private ValueState<Boolean> blackList;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("blackList", Types.BOOLEAN);
blackList = getRuntimeContext().getState(desc);
}
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
blackList.update(true);
LOGGER.info(String.format("[flatMap1] value=%s, state=%s", value, blackList.value()) ); // null
}
@Override
public void flatMap2(String value, Collector<String> out) throws Exception {
LOGGER.info(String.format("[flatMap2] value=%s, state=%s", value, blackList.value()) ); // null
if (blackList.value() == null || !blackList.value()) {
out.collect(value);
}
}
}
}
what impressed me is that the log is (the std out print order)
[flatMap2] value=Flink, state=null
[flatMap2] value=Apache, state=null
[flatMap2] value=IGNORE, state=null
[flatMap2] value=DROP, state=null
16> Flink
4> DROP
13> IGNORE
11> Apache
[flatMap1] value=DROP, state=true
[flatMap1] value=IGNORE, state=true
sincerely hoping the solution
exchange the code in flatMap1 and flatMap2;
add log output in both method;
alter control and streamOfWords call order in main;
and GPT...
I'm the author of that part of the documentation about learning Flink.
The paragraph at end of that section explains what is going on: