I use Flink to enrich a flow of inputs
case class Input( key: String, message: String )
with precomputed scores
case class Score( key: String, score: Int )
and produce an output
case class Output( key: String, message: String, score: Int )
Both the input and score streams are read from Kafka topics and the resulting output stream is published to Kafka too
val processed = inputStream.connect( scoreStream )
.flatMap( new ScoreEnrichmentFunction )
.addSink( producer )
with the following ScoreEnrichmentFunction:
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )
override def flatMap1( input: Input, out: Collector[Output] ): Unit =
{
Option( scoreState.value ) match {
case None => out.collect( Output( input.key, input.message, -1 ) )
case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )
}
}
override def flatMap2( score: Score, out: Collector[Output] ): Unit =
{
scoreState.update( score )
}
}
This works well. However, if I take a safe point and cancel the Flink job, the scores stored in the ValueState are lost when I resume the job from the save point.
As I understand, it seems that ScoreEnrichmentFunction needs to be extended with a CheckPointedFunction
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction
but I struggle to understand how to implement the methods snapshotState and initializeState to work with a keyed state
override def snapshotState( context: FunctionSnapshotContext ): Unit = ???
override def initializeState( context: FunctionInitializationContext ): Unit = ???
Note that I use the following env:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism( 2 )
env.setBufferTimeout( 1 )
env.enableCheckpointing( 1000 )
env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
env.getCheckpointConfig.setCheckpointTimeout( 60000 )
env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )
I think I found the problem. I was trying to use separate directories for the checkpoints and the savepoints, which resulted in having the savepoint directory and the FsStateBackend directory to be different.
Using the same directory in
and when taking a savepoint
solves the problem.