we are using Quarkus/Smallrye reactive messaging to produce messages to the Kafka topic.
In most cases this works well.
However - from time to time - we are facing timeouts - meaning the emitter.sendMessage(message) does not produce either Item or Failure signal and the downstream part of the pipeline timeouts.
What is strange - messages are happily written in the Kafka topic.
We have implemented a retry wrapper over the sendMessage call, but the results are just duplicate messages in the target topic.
Code sample here:
return Uni.createFrom().deferred(() -> emitter.sendMessage(message).replaceWith(message))
.ifNoItem().after(timeout).failWith(() -> {
Log.warnf("Timeout emitting %s %s to %s Kafka channel", event.getClass().getSimpleName(), event.getId(), channelName);
return new TimeoutException();
})
.onFailure(TimeoutException.class).retry().atMost(numberOfAttempts)
.onFailure(th -> {
Log.errorf(th, "Error emitting %s %s to %s Kafka channel", event.getClass().getSimpleName(), event.getId(), channelName);
return th instanceof IllegalStateException;
}).retry().withBackOff(timeout, timeout).atMost(numberOfAttempts);
Is there a way to detect what's going on behind the scenes?
Jan