Implementing Spring Kafka retry

168 Views Asked by At

I have a topic which contains 2 types of messages: one is to create the data and one is to update. I have a situation where some of the update events reaches before create one's. I'm trying to implement Spring Kafka retry to retry update messages again.

I see couple of ways to do it:

  1. Blocking retry: I have updated DefaultErrorHandler and supplied with FixedBackOff. However I think if the update and create messages are on the same partition, then update messages will end up going to ConsumerRecordRecoverer and as of now, I am just logging it.

  2. Non-blocking retry: I am trying to use @RetryableTopicannotation. But, I don't see a way to implement to provide retry topic and consumer names explicitly.

For option#2, is it possible to provide the topic names and consumer names in a programatic way instead of relying on framework?

1

There are 1 best solutions below

0
Sebastiaan van den Broek On

A lot of it depends on the requirements for your system. How important is it that updates are processed ASAP, including unrelated updates to the blocked ones, for example? If it's not that important then you could just pause processing entirely once you encounter an update you can't process.

But personally I'd just write the failed messages to a different retry-update topic. This means you can keep processing the original stream. You may need to also postpone additional updates to the same object if you encounter more of them, depending on how that works in your system.

Then the retry-update stream could be processed a bit more leniently. Even then you can have different retry-update topics (retry-update-1, retry-update-2) with a different waiting interval before they are attempted. This is useful if you absolutely must keep everything streaming and individual updates should not conflict with each other at all.

E.g.

retry-update-1: retry after 1 second
retry-update-2: retry after 5 seconds
retry-update-3: retry after 1 minute
retry-update-4: retry after 1 hour

I don't know exactly how much the Spring Kafka API supports here, but with the Kafka library this is easy enough to do, which also works in a Spring Boot application.

You have a org.apache.kafka.streams.processor.ProcessorContext class that lets you schedule these things, and can implement a org.apache.kafka.streams.kstream.Transformer to do that in.

context.schedule(scheduledInterval, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
...
}