How to process multiple AMQP messages in parallel with the same @Incoming method

1.2k Views Asked by At

Is it possible to process multiple amqp - messages in parallel with the same method annotated with @Incoming("queue") with quarkus and smallrye-reactive-messaging?

To be more precise, I have following class:

@ApplicationScoped
public class Receiver {
    @Incoming("test-queue")
    public void process(String input) {
        System.out.println("start processing:" + input);
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end processing:" + input);
    }
}

With the configuration in the application.properties:

amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue

Now I'd like define by configuration how many parallel processing of messages are possible. For example, on a 4 core cpu it should run 4 in parallel.

Currently I can just add 4 copies of the method with different names to allow this parallelism, but that is not configurable.

3

There are 3 best solutions below

0
On

I'm not sure, but I don't think Reactive Messaging supports what you're asking for.

You can, however, do what you want another way. I think it's also a better overall pattern for using messaging.

http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound

Find the example with the CompletionStage and the explicit ack(). That variant is asynchronous, so if you combine it with Java's existing concurrency facilities, you'll get efficient parallel processing.

I would send the incoming work to an executor, and then have the executing task ack() when it completes.

0
On

I just came across the same scenario and here is how the spec intends for you to handle concurrency: From eclipse Microprofile spec

Basically, instead of having a class with a method like this:

@Incoming("test-queue")
public void process(String input) {}

You have 2 classes like this:

@ApplicationScoped
public class MessageSubscriberProducer {

    @Incoming("test-queue")
    public Subscriber<String> createSubscriber() {
        return new SubscriberImpl();
    }
}

public class SubsciberImpl implements Subscriber<String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(4);  // this tells how many messages to grab right away
    }

    @Override
    public void onNext(String val) {
        // do processing
        this.subscription.request(1);  // grab 1 more
    }
}

This has the additional advantage of moving your processing code from the vert.x event-loop thread to a worker thread pool.

0
On

This question is a bit old, but I will respond to help someone with this same problem. I had posted an answer for this question that I think it is not the best. The actual better solutions I have found is

on the income method

@Blocking(
    value = "name-of-thread-pool",
    ordered = false
)

on the properties file

smallrye.messaging.worker.name-of-thread-pool.max-concurrency=4

here are one github repo where you can find two branches one with amqp connector and another with rabbitmq connector, both configured with an thread pool

https://github.com/eduardo-villasboas/quarkus-poc/tree/rabbitmq-connector

https://github.com/eduardo-villasboas/quarkus-poc/tree/config-amqp-connector