I've setup a kafka consumer using smallrye kafka. For particular reasons, I need to have control over the client.id of every producer on my code.

So, how can i change the client.id of the producer used when a nack() is sent from my code?

My current incoming setup is:

@Incoming("shifts")
@Retry(delay = 10, maxRetries = 5)
public CompletionStage<Void> consume(KafkaRecord<K, V> record) {
  // for now, we log
  log.info("clShift message received: with key {} and value {} ", record.getKey(),record.getPayload());
  return record.nack(new Exception("this should do to dl"));
}

part of application.properties:

mp.messaging.incoming.shifts.connector=smallrye-kafka
mp.messaging.incoming.shifts.topic=<topic_name>
mp.messaging.incoming.shifts.failure-strategy=dead-letter-queue
mp.messaging.incoming.shifts.kafka-configuration=default-kafka-broker
mp.messaging.incoming.shifts.auto.offset.reset=latest
1

There are 1 best solutions below

0
On

Such a requirement was not part of smallrye-reactive-messaging implementation. It was added on version 4.0.0 on commit