In my case i use JmsOutboundGateway for mq connection - inputChannel is ExecutorChannel.
Currently ServiceActivator don't have any output channel. Gateway interface method returns CompletableFuture object.
@MessagingGateway
public interface SimpleGateway {
@gateway(requestChannel = "mqRequestChannel")
CompletableFuture sendAndReceiveMqMessage(String message);
}
configuration class
@bean
public MessageChannel mqRequestChannel(AsyncTaskExecutor taskExecutor) {
return new ExecutorChannel(taskExecutor);
}
@ServiceActivator(inputChannel = "mqRequestChannel")
@Bean
public JmsOutboundGateway jmsOutboundGateway(... ) {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(mqConnectionFactory);
gateway.setRequestDestinationName(requestDestination);
gateway.setReplyDestinationName(responseDestination);
gateway.setAsync(true);
....
....
return gateway;
}
@Autowired
private SimpleGateway simpleGateway;
CompletableFuture<String> mqResponse = simpleGateway.sendAndReceiveMqMessage("sdsadas");
I tried to use recursion but it won't be best solution.
private void callGateway(int retry, int requestNum){
String dummyOfs = "dsfsdf";
CompletableFuture<String> response = simpleGateway.sendAndReceiveMqMessage(dummyOfs);
nCoreResponse.exceptionally(throwable -> { LOG.info("error occured");
LOG.info("[Error section CURRENT THREAD in gateway class call ]: {} , requestNum = {}, retry = {}", Thread.currentThread(), requestNum, retry );
if(MAX_RETRY > retry){
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
callGateway(retry + 1, requestNum);
}
return null;});
response.thenAccept((String someString) ->
{
LOG.info("[success ]: {} , returned value: {}", Thread.currentThread(), someString);
});
};
}
I was also thinking of pushing retries to a different thread pool.
But for me is important to set up numer of retries and back off time and also if timeout for request is reached then retries won't be triggered or stopped.
There is no retry API around
CompletableFuture. At least as far as I'm aware. According to yourgateway.setAsync(true)andExecutorChannelupfront there is no connection between yourCompletableFuturereturn from a messaging gateway and that reply from a JMS. TheJmsOutboundGatewaywill still produce a real result from its call to thereplyChannelheader after completion of theFuture, and only after that it is wrapped to anotherCompletableFuturefor your messaging gateway expectations. AnExecutorChannelis also a bit of an overhead since aCompletableFuturereturn type from a messaging gateway method uses aTaskExecutorfor async request reply. Therefore you do have a lot of thread switching in between request and reply.It might not be what you would expect, but I can suggest to change your
sendAndReceiveMqMessage()to return aMono. And already there you can use aretry()API. For downstream flow it doesn't matter and it still going to be anasync. In the end, the successful result of thisMonocan be converted to theCompletableFuture(seeMono.toFuture()) if that is what required in the consumer of your API.