How to recover from unreachable Kafka in reactive producer with transactions?

1k Views Asked by At

I have a simple reactive Kafka producer and I need it to manually commit the transaction. Following the javadoc in Reactor Kafka for method begin() I created a producer method

@Autowired
private ReactiveKafkaProducerTemplate<String, String> kafkaTemplate;

public Mono<Void> send(String message) {
    return kafkaTemplate.transactionManager()
            .begin()
            .then(kafkaTemplate.send("my.test.topic", message))
            .then(kafkaTemplate.transactionManager().commit());
}

and its configuration is

@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

    return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
}

It's working well, I'm sending messages into Kafka. But now I need to protect it from potential Kafka/network outages in our company. To simulate this I stopped my Kafka docker, send a message and after some time start Kafka again. But weird things are happening. There are 3 possible scenarios I encounter:

  1. When I start Kafka within a minute or so the producer recovers the connection and transaction and sending finish well
  2. When I wait longer (~3 - 4 minutes) the sender fails with
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my.test.topic-0:219057 ms has passed since batch creation

This state is unrecoverable, every followup attempt fails at

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state

I have to restart the application

  1. When user cancels the request (i.e. the REST call that is triggering this send) and then the Kafka comes up again, every followup request fails with
TransactionalId producer-tx-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

This state is also unrecoverable, I have to restart the application.

Question

How to recover from this situation? What I'd like to achieve is that the producer waits f.e. 20 seconds and if the connection isn't re-established then just throw exception and discard the request. And most of all, the system shouldn't collapse from unreachable Kafka.

What I tried

At first I thought that I need to manually abort the transaction with kafkaTemplate.transactionManager().abort() in some .doOnError() but this didn't work (and f.e. in scenario 3. the code doesn't throw any exception). Then I played around with ProducerConfig trying TRANSACTION_TIMEOUT_CONFIG, REQUEST_TIMEOUT_MS_CONFIG or DELIVERY_TIMEOUT_MS_CONFIG but the request didn't timeout, not sure what I did wrong.

1

There are 1 best solutions below

0
On
  1. This is because the default delivery.timeout.ms is 2 minutes.

  2. After that time, you will need to create a new producer and dispose of the old one.