I have to listen a queue using spring integration flow and intgeration sqs. Once message is received from queue it should trigger a integration flow. Below is the things which I am trying but everythings fine in but afater receiving test it is not triggering any Integration flow. Please let me know where I am doing wrong:
UPDATED as per comment from Artem
Adapter for SQS.
@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "Main");
adapter.setOutputChannel(inputChannel());
adapter.setAutoStartup(true);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
adapter.setMaxNumberOfMessages(10);
adapter.setVisibilityTimeout(5);
return adapter;
}
Queue configured:
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
Now the main integration flow trigger point:
@Bean
public IntegrationFlow inbound() {
return IntegrationFlows.from("inputChannel").transform(i -> "TEST_FLOW").get();
}
}
Appreciate any type of help.
sqsMessageDrivenChannelAdapter()must be declared as a@Beaninbound()must be declared as a@BeanIntegrationFlows.from(MessageChannels.queue()). What is the point to start the flow from anonymous channel? Who and how is going to produce messages to that channel?Make yourself familiar with different channels: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations
Pay attention that
QueueChannelmust be consumed via polling endpoint. Right, there is a default poller auto-configured by Spring Boot, but it is based on a single thread in theTaskSchedulerand has a polling period as10 millis.I wouldn't recommend to hand off SQS messages to the
QueueChannel: when consumer fails, you lose the data. It is better to process those messages in the consumer thread.Otherwise your intention is not clear in the provided code.
Can you, please, share with us what error you get or anything else?
You also can turn on
DEBUGlogging level fororg.springframework.integrationto see how your messages are processed.