I am trying to achieve exactly once functionality but getting KafkaException with message as "org.apache.kafka.common.KafkaException: TransactionalId db13196c-6974-48b0-9835-aed40cec4ca4: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION". Below is my code for KafkaConfig and Producer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
private ProducerFactory<String, String> getProducerFactory() {
DefaultKafkaProducerFactory<String, String> factory =
new DefaultKafkaProducerFactory<>(getProducerConfigMap());
factory.setTransactionIdPrefix(KafkaEventConstants.TRANSACTION_ID_PREFIX);
return factory;
}
@Bean
public KafkaTemplate<String, String> getKafkaTemplate() {
return new KafkaTemplate<>(getProducerFactory());
}
private Map<String, Object> getProducerConfigMap() {
String randomProducerID = UUID.randomUUID().toString();
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put("enable.idempotence", "true");
config.put("transactional.id", randomProducerID);
return config;
}
@Bean
public KafkaProducer<String, String> producerFactory() {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfigMap());
producer.initTransactions();
return producer;
}
}
Producer:
kafkaTemplate.executeInTransaction(
kafkaOperations -> {
kafkaPublisher.pushKafkaNotification(
topic, kafkaNotification.getUserId(), new JSONObject(kafkaNotification).toString());
return true;
});
public void pushKafkaNotification(
String topic, String partitionKey, String serializedKafkaNotification) {
try {
producer.beginTransaction();
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topic, partitionKey, serializedKafkaNotification);
producer.send(
producerRecord,
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error(
"Callback : Failed to push event to kafka for partition key, notification {} {}",
partitionKey,
serializedKafkaNotification,
exception);
} else {
log.info(
"Kafka Success Callback : Event pushed successfully to Kafka for partition key, notification {}, {}",
partitionKey,
serializedKafkaNotification);
}
}
});
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
metricLogger.errorMetricLogging(SERVICE_NAME, ErrorMetrics.DLQ_PUBLISH_ERROR.getCode());
log.error("Exception while pushing notification to DLQ = {}", serializedKafkaNotification, e);
}
}
There are several things wrong with your code.
You should not be creating your own producer, the template will take care of all that for you.
The producer factory should be a
@Bean.It looks like you are using Spring Boot, in which case you should just use its auto configured infrastructure beans.
Regarding your question(s). "Exactly one" semantics apply to
consume->process->producescenarios; the term is not used with producer-initiated transactions. It is also unnecessary (and rather expensive) to use a transaction to just produce a single record. They are used when you are also doing something else in the transaction and you want the send to be rolled back if that something else fails. Or, if you want to send multiple records and either all or none of them are committed.See https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions
and https://docs.spring.io/spring-kafka/docs/current/reference/html/#exactly-once
for more information.
Trace level logging will show the templates interaction with the producer (
beginTransaction()etc).