Kafka Producer time out during initTransactions while one broker is down

134 Views Asked by At

I'm using Kafka 2.7.0 and I have created kafka producer with Kafka Transaction. But it hangs and timeout while initialializing the Kafka transaction. In my Kafka cluster, I have 3 brokers.

public class KafkaTransactionRecoveryExample {

    public static void main(String[] args) throws InterruptedException {
        // Kafka producer properties
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "MyBroker1:9092");
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        producerProps.put("acks", "all");
        producerProps.put("enable.idempotence", "true");
        producerProps.put("transactional.id", "transaction");
        
        producerProps.put("request.timeout.ms",  12000);
        producerProps.put("transaction.timeout.ms",  12000);
        producerProps.put("max.block.ms",  12000); 
        producerProps.put("batch.size", 60000);
        producerProps.put("retries", 3); // Number of retries before giving up
        producerProps.put("delivery.timeout.ms", 30000); // Increase delivery timeout to 30 seconds
        producerProps.put("request.timeout.ms",  30000); // Increase request timeout to 30 seconds   
        producerProps.put("transaction.state.log.replication.factor",  1);
        producerProps.put("transaction.state.log.min.isr",  1); 
        
        KafkaProducer<String, String> producer1 = null;
        try {
        producer1 =  new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer());
        
        // Initialize producer's transaction
        **producer1.initTransactions();**
        System.out.println("initTransactions");

        .....
        }
    }
}

Output (While MyBroker1 is down):

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default enable.idempotence to true since transactional.id is specified.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Instantiated a transactional producer.
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Overriding the default acks to all since idempotence is enabled.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1695122942876
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Invoking InitProducerId for the first time in order to acquire a producer ID
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Cluster ID: m4HYe1sbTX6I9IScDjS04A

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

......

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

**Exception 2: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 1000milliseconds while awaiting InitProducerId**

**Broker appears to be down. Waiting and retrying...**

[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)
[kafka-producer-network-thread | producer-MyKafka2] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-MyKafka2, transactionalId=MyKafka2] Discovered transaction coordinator MyBroker2:9092 (id: 1 rack: null)

It seems Kafka transaction is not initialised and I have tried adding different properties and still not able to resolve the issue. Also this error occured while using kafka transactions only.

0

There are 0 best solutions below