Exactly once semantics possible in consumer-producer-cycle with micronaut kafka?

121 Views Asked by At

I am trying to setup exactly once semantics using kafka transactions with micronaut kafka.

I need to read records from topic A, map key and value and produce to topic B. I would like to implement this exactly once using kafka transactions.

According to the micronaut kafka documentation, when using the @SendTo annotation on a method of an @KafkaListener bean, the offset commit can be forwarded to the producer transaction.

However, the @SendTo annotation only allows to return a new record value and uses the key of the original kafka message received by the @KafkaListener bean. Is there a way to also change the key?

@KafkaListener(
    groupId = "mx-group",
    offsetReset = EARLIEST,
    offsetStrategy = SEND_TO_TRANSACTION,
    producerClientId = "prod-client-id",
    producerTransactionalId = "prod-client-tx"
)
class MyListener {
    
    @Topic("source.topic")
    @SendTo("target.topic")
    fun handle(
        @KafkaKey key: SourceKey,
        @MessageBody value: SourceValue
    ): TargetValue = convertValue(value)

...

}

If not, is there a way to forward the offset commit to a transaction of a producer bean created with the @KafkaClient annotation?

@KafkaListener(
    groupId = "my-group",
    offsetReset = EARLIEST,
    offsetStrategy = ???
)
class MyListener(private val producer: MyProducer) {
    
    @Topic("source.topic")
    fun handle(
        @KafkaKey key: SourceKey,
        @MessageBody value: SourceValue
    ) {
        LOGGER.info("Received record key {}, value {}", key, value)
        producer.send(convertKey(key), convertValue(value))
    }

...

}

@KafkaClient(
    id = "producer-id",
    transactionalId = "producer-tx",
)
interface MyProducer {
    @Topic("target.topic")
    fun send(@KafkaKey key: TargetKey, @MessageBody value: TargetValue)
}

Any help is very welcome!

Regards, Lars

1

There are 1 best solutions below

0
On

I found out meanwhile, that I can return a KafkaMessage<K,V> by the listener method that is annotated with @SendTo and this way change key and value. That solves my problem. This way I can forward the offset commit to the producer as stated in the documentation.