Apache Beam KafkaIO Reader & Writer - Error handling and Retry mechanism

428 Views Asked by At

I'm working on an Apache Beam Pipeline-based implementation and I consume data from a Kafka stream. After doing some processing I need to publish the processed data into three different Kafka topics. As the runner, I use Apache Flink.

My question is, how do I handle errors in the KafkaIO reader and the writer, especially how do we handle the errors and implement a retry mechanism for KafkaIO Writers?

I would like to get some advice on how to do this and appreciate it if anyone can provide some resources/code samples.
(Not sure these things handled by the Flink by default.)

Reader definition

KafkaIO.<String, GenericRecord>read()
   .withBootstrapServers(<server>)
   .withTopics(<my topics>)
   .withConsumerConfigUpdates(<my configs>)
   .withKeyDeserializer(StringDeserializer.class)
   .withValueDeserializerAndCoder(GenericRecordDeserializer.class, GenericRecordCoder.of());

Writer definition
KafkaIO.<GenericRecord, GenericRecord>write()
   .withBootstrapServers(<server>)
   .withProducerConfigUpdates(<my configs>)
   .withTopic(<topic-name>)
   .withKeySerializer(GenericRecordSerializer.class)
   .withValueSerializer(GenericRecordSerializer.class);
1

There are 1 best solutions below

4
On

Instead of catching errors in KafkaIO, you can catch errors in the Beam pipeline with TupleTags, side outputs and a dead letter queue.

You can also use a library in Beam to simplify error handling, called Asgarde, example :

final WithFailures.Result<PCollection<Integer>, Failure> resultComposer = CollectionComposer.of(input)
        .apply("Map", MapElements.into(TypeDescriptors.strings()).via((String word) -> word + "Test"))
        .apply("FlatMap", FlatMapElements
                .into(TypeDescriptors.strings())
                .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5))))
        .apply("ParDo", MapElementFn.into(TypeDescriptors.integers()).via(word -> 1 / word.length()))
        .getResult();

A wrapper CollectionComposer catch errors in each steps of the pipeline and returns :

  • PCollection of correct outputs
  • PCollection of a Failure object

The structure of Failure object is :

public class Failure implements Serializable {
    private final String pipelineStep;
    private final Integer inputElement;
    private final Throwable exception;
    ....

You can then sink the PCollection of Failure to a BigQuery table, GCS bucket or a topic for retry.

I share with you examples from my Github repository :

You can also check this article : Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time