Say I have a flink job processing a data flow like 1, 2, control_flag, 3... When control_flag is met, the job should be stopped with savepoint and the following messages 3... should neither be processed or dropped. When centern actions are taken outside the flink and the job is restarted from savepoint, the job should go on process the following messages. However, if the job hangs with a sleeping loop inside the process operator to prevent the following messages to be processed, it can not be stopped with savepoint using flink api. So how do I stop the job at the position of control_flag and let the job to be restarted with the position next to it?
How to stop a flink job at specified position
550 Views Asked by David At
1
There are 1 best solutions below
Related Questions in APACHE-FLINK
- How to flatMap a function on GroupedDataSet in Apache Flink
- Flink error - org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
- Output of Join in Apache Flink
- Flink Python API Bug
- OutOfBoundsException with ALS - Flink MLlib
- Apache Flink - use values from a data stream to dynamically create a streaming data source
- Flink-CEP use of hashCode and equals
- Flink: NoClassDefFoundError when runnig a Table API query
- Flink Avro Parquet Writer in RollingSink
- Apache Flink read Avro byte[] from Kafka
- Fetching first-n elements from multiple sorted partitions
- Flink error: java.lang.NoSuchMethodError: org.apache.flink.api.table.Table
- Flink Error: java.lang.ClassNotFoundException: org.apache.flink.shaded.calcite.com.google.common.base.Throwables
- Getting ClassNotFound Exception in Flink SourceFunction
- How to use Flink with Kafka 0.10.1.0?
Related Questions in SAVEPOINTS
- DB2 savepoint in stored procedure is invalid
- ActiveRecord, MySQL, and nested transactions -- what's the behavior?
- Is Flink standalone cluster manual restart graceful wrt to job state/processing offset?
- How to get all savepoint states list in mysql
- How to retry transaction after exception in postgreSQL
- Rollback to savepoint doesn't release locks
- Drupal on mysql cluster
- Jdbc check for capability - savepoint release
- In SQL, what happens if you try to rollback to a savepoint from another user?
- Flink, How to create a Sink supported Savepoint?
- Duplicated messages reading via flink savepoints
- Flink: How to persist and recover a ValueState
- Flink savepoint not saving the valuestates
- ora-01086 : save point was not established or invalid
- Using SAVE TRANSACTION SavePointName in a Stored Procedure
Trending Questions
- UIImageView Frame Doesn't Reflect Constraints
- Is it possible to use adb commands to click on a view by finding its ID?
- How to create a new web character symbol recognizable by html/javascript?
- Why isn't my CSS3 animation smooth in Google Chrome (but very smooth on other browsers)?
- Heap Gives Page Fault
- Connect ffmpeg to Visual Studio 2008
- Both Object- and ValueAnimator jumps when Duration is set above API LvL 24
- How to avoid default initialization of objects in std::vector?
- second argument of the command line arguments in a format other than char** argv or char* argv[]
- How to improve efficiency of algorithm which generates next lexicographic permutation?
- Navigating to the another actvity app getting crash in android
- How to read the particular message format in android and store in sqlite database?
- Resetting inventory status after order is cancelled
- Efficiently compute powers of X in SSE/AVX
- Insert into an external database using ajax and php : POST 500 (Internal Server Error)
Popular Questions
- How do I undo the most recent local commits in Git?
- How can I remove a specific item from an array in JavaScript?
- How do I delete a Git branch locally and remotely?
- Find all files containing a specific text (string) on Linux?
- How do I revert a Git repository to a previous commit?
- How do I create an HTML button that acts like a link?
- How do I check out a remote Git branch?
- How do I force "git pull" to overwrite local files?
- How do I list all files of a directory?
- How to check whether a string contains a substring in JavaScript?
- How do I redirect to another webpage?
- How can I iterate over rows in a Pandas DataFrame?
- How do I convert a String to an int in Java?
- Does Python have a string 'contains' substring method?
- How do I check if a string contains a specific word?
Some suggestions can be found here.
There are a few possible ways that it can be done, but I think since You want to keep state between the runs, the best idea would be to have an operator that :
If the flag
stop_executionis false, processes data and outputs that for the downstream operators.If the flag
stop_executionis true, it adds the data it receives to list state.If it receives the
control_flagit emits side output meaning that job should be stopped.Now it's up to You to listen to the side output, this can be either external service that reads data from Kafka and executes correct REST calls to stop given job or anything else You want.