I'm currently confused about windows and states. Suppose I have a program that counts user access data every minute and needs to do sum statistics in each window. Assume that at this time, I configure checkpoint for the fault tolerance of the program. The checkpoint configuration is to trigger every 30 seconds. Then when the time is 01:00, the program hangs. In theory, it can only be restored to the state data at 00:30, but there is no trigger window at 00:30. After calculation, we get the kafka offset data at 00:30 and the window data at 00:00. Is there any problem with my understanding?
here is my program,flink version is 1.14: flink graph
build stream config is
config.getDelayMaxDuration() = 60000,config.getAggregateWindowMillisecond()=60000 checkpoint interval is 30s
SingleOutputStreamOperator<BaseResult> wordCountSampleStream = subStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<MetricEvent>forBoundedOutOfOrderness(config.getDelayMaxDuration())
.withTimestampAssigner(new MetricEventTimestampAssigner())
.withIdleness(config.getWindowIdlenessTime())
).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"))
.flatMap(new WordCountToResultFlatMapFunction(config)).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"))
.keyBy(new BaseResultKeySelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(config.getAggregateWindowMillisecond())))
.apply(new WordCountWindowFunction(config)).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"));
wordCountSampleStream.addSink(sink).setParallelism(CommonJobConfig.getParallelismOfSubJob("WORD_COUNT_SAMPLE_TEST"));
window apply function:
public class WordCountWindowFunction extends RichWindowFunction<BaseResult, BaseResult, String, TimeWindow> {
private StreamingConfig config;
private Logger logger = LoggerFactory.getLogger(WordCountWindowFunction.class);
public WordCountWindowFunction(StreamingConfig config) {
this.config = config;
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void apply(String s, TimeWindow window, Iterable<BaseResult> input, Collector<BaseResult> out) throws Exception {
WordCountEventPortrait result = new WordCountEventPortrait();
long curWindowTimestamp = window.getStart() / config.getAggregateWindowMillisecond() * config.getAggregateWindowMillisecond();
result.setDatasource("word_count_test");
result.setTimeSlot(curWindowTimestamp);
for (BaseResult sub : input) {
logger.info("in window cur sub is {} ", sub);
WordCountEventPortrait curInvoke = (WordCountEventPortrait) sub;
result.setTotalCount(result.getTotalCount() + curInvoke.getTotalCount());
result.setWord(curInvoke.getWord());
}
logger.info("out window result is {} ", result);
out.collect(result);
}
}
sink function:
public class ClickHouseRichSinkFunction extends RichSinkFunction<BaseResult> implements CheckpointedFunction {
private ConcurrentHashMap<String, SinkBatchInsertHelper<BaseResult>> tempResult = new ConcurrentHashMap<>();
private ClickHouseDataSource dataSource;
private Logger logger = LoggerFactory.getLogger(ClickHouseRichSinkFunction.class);
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
for (Map.Entry<String, SinkBatchInsertHelper<BaseResult>> helper : tempResult.entrySet()) {
helper.getValue().insertAllTempData();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
@Override
public void open(Configuration parameters) throws Exception {
Properties properties = new Properties();
properties.setProperty("user", CommonJobConfig.CLICKHOUSE_USER);
properties.setProperty("password", CommonJobConfig.CLICKHOUSE_PASSWORD);
dataSource = new ClickHouseDataSource(CommonJobConfig.CLICKHOUSE_JDBC_URL, properties);
}
@Override
public void close() {
AtomicInteger totalCount = new AtomicInteger();
tempResult.values().forEach(it -> {
totalCount.addAndGet(it.getTempList().size());
batchSaveBaseResult(it.getTempList());
it.getTempList().clear();
});
}
@Override
public void invoke(BaseResult value, Context context) {
tempResult.compute(value.getDatasource(), (datasource, baseResults) -> {
if (baseResults == null) {
baseResults = new SinkBatchInsertHelper<>(CommonJobConfig.COMMON_BATCH_INSERT_COUNT,
needToInsert -> batchSaveBaseResult(needToInsert),
CommonJobConfig.BATCH_INSERT_INTERVAL_MS);
}
baseResults.tempInsertSingle(value);
return baseResults;
});
}
private void batchSaveBaseResult(List<BaseResult> list) {
if (list.isEmpty()) {
return;
}
String sql = list.get(0).getPreparedSQL();
try {
try (PreparedStatement ps = dataSource.getConnection().prepareStatement(sql)) {
for (BaseResult curResult : list) {
curResult.addParamsToPreparedStatement(ps);
ps.addBatch();
}
ps.executeBatch();
}
} catch (SQLException error) {
log.error("has exception during batch insert,datasource is {} ", list.get(0).getDatasource(), error);
}
}
}
batch insert helper:
public class SinkBatchInsertHelper<T> {
private List<T> waitToInsert;
private ReentrantLock lock;
private int bulkActions;
private AtomicInteger tempActions;
private Consumer<List<T>> consumer;
private AtomicLong lastSendTimestamp;
private long sendInterval;
private Logger logger = LoggerFactory.getLogger(SinkBatchInsertHelper.class);
public SinkBatchInsertHelper(int bulkActions, Consumer<List<T>> consumer, long sendInterval) {
this.waitToInsert = new ArrayList<>();
this.lock = new ReentrantLock();
this.bulkActions = bulkActions;
this.tempActions = new AtomicInteger(0);
this.consumer = consumer;
this.sendInterval = sendInterval;
this.lastSendTimestamp = new AtomicLong(0);
}
public void tempInsertSingle(T data) {
lock.lock();
try {
waitToInsert.add(data);
if (tempActions.incrementAndGet() >= bulkActions || ((System.currentTimeMillis() - lastSendTimestamp.get()) >= sendInterval)) {
batchInsert();
}
} finally {
lastSendTimestamp.set(System.currentTimeMillis());
lock.unlock();
}
}
public long insertAllTempData() {
lock.lock();
try {
long result = tempActions.get();
if (tempActions.get() > 0) {
batchInsert();
}
return result;
} finally {
lock.unlock();
}
}
private void batchInsert() {
for(T t: waitToInsert){
logger.info("batch insert data:{}", t);
}
consumer.accept(waitToInsert);
waitToInsert.clear();
tempActions.set(0);
}
public int getTempActions() {
return tempActions.get();
}
public List<T> getTempList() {
lock.lock();
try {
return waitToInsert;
} finally {
lock.unlock();
}
}
}
The resulting phenomenon is: Suppose I cancel the task at 00:31:30, then when I restart the task, the statistics at 00:31:00 will be less than expected. I found out by printing records that it was because when the sink wrote the data at 00:30:00, the kafka consumer had actually consumed the data after 00:31:00, but this part of the data was not written to ck. , it was not replayed in the window when restarting, so this part of the data was lost. The statistics will not return to normal until 00:32:00.
There's no need to be concerned about synchronizing checkpointing with your windows; the two are independent of each other.
Whenever a checkpoint is taken it will include
https://www.youtube.com/watch?v=zkWa2ZDjwb4 walks through this in a bit more detail.