Flink failed to trigger checkpoint when using table API

1.3k Views Asked by At

My flink streaming application (v1.14.4) contain JDBC connector used for initial fetch data from MySQL server Logic:

  1. JDBC table source -> select.where() -> convert to datastream
  2. Kafka datastream join jdbc table -> further computation

When I run the application locally I can see following exception

14:52:00.401 [Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0] INFO  o.a.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, asset, project=[id, status]]], fields=[id, status]) -> Calc(select=[CAST(_UTF-16LE'sergey-test-asset-id':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS id, status], where=[(id = _UTF-16LE'sergey-test-asset-id')]) -> TableToDataSteam(type=ROW<`id` STRING, `status` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (4/4)#0 (e8870cf296ac770346384fe2529b325f) switched from RUNNING to FINISHED.
...

14:57:52.963 [Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Failed to trigger checkpoint for job 8303c423dd7b9e3f303f0b299d7d37bb because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running.

I do understand that after SQL select statement flink mark jdbc operator to FINISHED state, but I need to continue run the streaming application and have checkpoints during runtime

Do I need to use execution.checkpointing.checkpoints-after-tasks-finish.enabled: true property to fix my issue OR do I need to change my application graph?

1

There are 1 best solutions below

0
On BEST ANSWER

Yes, you need to use

execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

so that the job can checkpoint despite the fact that the jdbc source tasks have completed.