I want to process an AMQP message then send it to another queue for further processing.
I'm using Spring integration DSL to archive it as below,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
However I found that the message in NOTE_INCOMING_QUEUE queue still unacked when the new message is handling in type process phase. See below screenshot of rabbitmq management.
I'm wondering why the message in NOTE_INCOMING_QUEUE still unacked even though the handler already was executed successfully. Is it the design of spring integration amqp or something wrong in my code?
Use
Amqp.inboundAdapter()
instead of the gateway - the gateway is waiting for a reply that will never arrive.Gateways are for request/reply scenarios; channel adapters are for one-way scenarios.