I am working on a Flink application that sinks to Kafka. I created a Kafka producer that has default pool size of 5. I have enabled checkpoints with following config:
env.enableCheckpointing(1800000);//checkpointing for every 30 minutes.
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
The app sometimes keeps on crashing with following exception. Is this issue with kafka producer pool size or checkpoints ?
2020-03-20 22:31:23,859 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer011 0/1 aborted recovered transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=FileSplitReader -> metrics-map -> Sink: components-topic-sink-4ab008489d4c8ed0fe577883438cc1ff-1, producerId=21, epoch=3], transactionStartTime=1584742933826}
2020-03-20 22:31:23,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
java.lang.NullPointerException
at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:164)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-03-20 22:31:23,861 INFO org.apache.flink.runtime.taskmanager.Task - FileSplitReader -> metrics-map -> Sink: components-topic-sink (1/1) (92b7f3ed8f6362fe0087efd40eb94016) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createTransactionalProducer(FlinkKafkaProducer011.java:934)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:701)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:97)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:394)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:385)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:862)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
I recommend you upgrade to the latest flink/kafka connector -- it looks like you're running
FlinkKafkaProducer011
, which is intended for Kafka 0.11.You should be using
FlinkKafkaProducer
from the universal Kafka connector: flink-connector-kafka. Since Flink 1.9, this uses the Kafka 2.2.0 client.With maven you want to specify
Or replace 2.11 with 2.12 if you are using Scala 2.12.