How to stop a flink job at specified position

523 Views Asked by At

Say I have a flink job processing a data flow like 1, 2, control_flag, 3... When control_flag is met, the job should be stopped with savepoint and the following messages 3... should neither be processed or dropped. When centern actions are taken outside the flink and the job is restarted from savepoint, the job should go on process the following messages. However, if the job hangs with a sleeping loop inside the process operator to prevent the following messages to be processed, it can not be stopped with savepoint using flink api. So how do I stop the job at the position of control_flag and let the job to be restarted with the position next to it?

1

There are 1 best solutions below

2
On BEST ANSWER

Some suggestions can be found here.

There are a few possible ways that it can be done, but I think since You want to keep state between the runs, the best idea would be to have an operator that :

  • If the flag stop_execution is false, processes data and outputs that for the downstream operators.

  • If the flag stop_execution is true, it adds the data it receives to list state.

  • If it receives the control_flag it emits side output meaning that job should be stopped.

Now it's up to You to listen to the side output, this can be either external service that reads data from Kafka and executes correct REST calls to stop given job or anything else You want.