Flink exactly once - checkpoint and barrier acknowledgement at sink

2.5k Views Asked by At

I have a Flink job with a sink that is writing the data into MongoDB. The sink is an implementation of RichSinkFunction.

Externalized checkpointing enabled. The interval is 5000 mills and scheme is EXACTLY_ONCE.

  • Flink version 1.3,
  • Kafka (source topic) 0.9.0

I can't upgrade to the TwoPhaseCommitSink of Flink 1.4.

I have few doubts

  1. At which point of time does the sink acknowledges the checkpoint barrier, at the start of the invoke function or when invoke completed? Means it waits for persisting (saving in MongoDB) response before acknowledging the barrier?
  2. If committing checkpoint is done by an asynchronous thread, how can Flink guarantee exactly once in case of job failure? What if data is saved by the sink to MongoDB but the checkpoint is not committed? I think this will end up duplicate data on restart.
  3. When I cancel a job from the Flink dashboard, will Flink complete the async checkpoint threads to complete or it's a hard kill -9 call?
1

There are 1 best solutions below

2
On

First of all, Flink can only guarantee end-to-end exactly-once consistency if the sources and sinks support this. If you are using Flink's Kafka consumer, Flink can guarantee that the internal state of the application is exactly-once consistent. To achieve full end-to-end exactly-once consistency, the sink needs properly support this as well. You should check the implementation of the MongoDB sink if it is working correctly.

Checkpoint barriers are send a regular messages over the data transport channels, i.e., a barrier for checkpoint n separates the stream into records that go into checkpoint n and n + 1. A sink operator will process a barrier between two invoke() calls and trigger the state backend to perform a checkpoint. It is then up to the state backend, whether and how it can perform the checkpoint asynchronously. Once the call to trigger the checkpoint returns, the sink can continue processing. The sink operator will report to the JobManager that it completed checkpointing its state once it is notified by the state backend. An overall checkpoint completes when all operators successfully reported that they completed their checkpoints.

This blog post discusses end-to-end exactly-once processing and the requirements for sink operators in more detail.