I'm working on an Apache Beam Pipeline-based implementation and I consume data from a Kafka stream. After doing some processing I need to publish the processed data into three different Kafka topics. As the runner, I use Apache Flink.
My question is, how do I handle errors in the KafkaIO reader and the writer, especially how do we handle the errors and implement a retry mechanism for KafkaIO Writers?
I would like to get some advice on how to do this and appreciate it if anyone can provide some
resources/code samples.
(Not sure these things handled by the Flink by default.)
Reader definition
KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(<server>)
.withTopics(<my topics>)
.withConsumerConfigUpdates(<my configs>)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(GenericRecordDeserializer.class, GenericRecordCoder.of());
Writer definition
KafkaIO.<GenericRecord, GenericRecord>write()
.withBootstrapServers(<server>)
.withProducerConfigUpdates(<my configs>)
.withTopic(<topic-name>)
.withKeySerializer(GenericRecordSerializer.class)
.withValueSerializer(GenericRecordSerializer.class);
Instead of catching errors in
KafkaIO
, you can catch errors in theBeam
pipeline withTupleTags
, side outputs and a dead letter queue.You can also use a library in
Beam
to simplify error handling, called Asgarde, example :A wrapper
CollectionComposer
catch errors in each steps of the pipeline and returns :PCollection
of correct outputsPCollection
of aFailure
objectThe structure of
Failure
object is :You can then sink the
PCollection
ofFailure
to aBigQuery
table, GCS bucket or a topic for retry.I share with you examples from my
Github
repository :You can also check this article : Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time