I'm trying to setup retryAdvice if there is an error in publishing to kafka in my publisherFlow. I can't seem to find the correct configuration on how to get it working. Any help or guidance would be much appreciated.
@Bean
public IntegrationFlow publisherFlow(CsvToEmailInteractionConverter converter, KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlow.from("publisherChannel")
.transform(converter, "transform")
.transform(Transformers.toJson())
.log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, m -> "Payload: " + m.getPayload())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(Constants.KAFKA_TOPIC), e -> e.id(Constants.ENGAGE_ACOUSTIC_ID))
// .routeByException(r -> r.defaultOutputChannel(MessageChannels.direct("standardFlowExceptionChannel").getObject()))
.get();
}
I think the advice needs to go before the handle()?
We don't see any retry configuration in your question, but see this one which might be exactly what you are looking for: How to get RetryAdvice working for KafkaProducerMessageHandler.
So, this is essentially what you need: