Is it possible to process multiple amqp - messages in parallel with the same method annotated with @Incoming("queue")
with quarkus and smallrye-reactive-messaging?
To be more precise, I have following class:
@ApplicationScoped
public class Receiver {
@Incoming("test-queue")
public void process(String input) {
System.out.println("start processing:" + input);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end processing:" + input);
}
}
With the configuration in the application.properties:
amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue
Now I'd like define by configuration how many parallel processing of messages are possible. For example, on a 4 core cpu it should run 4 in parallel.
Currently I can just add 4 copies of the method with different names to allow this parallelism, but that is not configurable.
I'm not sure, but I don't think Reactive Messaging supports what you're asking for.
You can, however, do what you want another way. I think it's also a better overall pattern for using messaging.
http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound
Find the example with the CompletionStage and the explicit ack(). That variant is asynchronous, so if you combine it with Java's existing concurrency facilities, you'll get efficient parallel processing.
I would send the incoming work to an executor, and then have the executing task ack() when it completes.