Is there a way to make amqp reactive-messaging and quarkus ensure order of events in message groups?
There is my implementation next, and I can see that inbox-events subscriber can observe multiple events within one message group, and that's not exactly what I had expected.
In my opinion, the next message in a group should come only after acknowledging the prev one.
Consumer
@Channel("inbox-events") Multi<Long> inboxEvents;
...
inboxEvents
.onItem().transformToUniAndMerge(this::handleEvent)
.subscribe()
.with(eventId -> {
log.info("Event was handled, id={}", eventId);
});
Producer
@Channel("outbox-events") MutinyEmitter<Long> outboxEventsEmitter;
...
final var metadata = OutgoingAmqpMetadata.builder()
.withGroupId(String.valueOf(groupId))
.build();
final var message = Message.of(id)
.addMetadata(metadata);
return outboxEventsEmitter.sendMessage(message);
application.properties
mp.messaging.incoming.inbox-events.connector=smallrye-amqp
mp.messaging.incoming.inbox-events.address=DISPATCHING
mp.messaging.outgoing.outbox-events.connector=smallrye-amqp
mp.messaging.outgoing.outbox-events.durable=true
mp.messaging.outgoing.outbox-events.address=DISPATCHING
The AMQP clients do not provide any parallelism by message group id out of the box.
You therefore need to manually dispatch event processing to a worker thread pool to process events in parallel. Mutiny APIs allow to do such operations. You can couple that with the KeyedMulti support to group messages by group id (available since Reactive Messaging 4.6.0).
Here is a sample code :
The
KeyValueExtractoris self-explanatory, for each incoming message the code extracts a key and a value to be used inKeyedMulti.The
processmethod will be called for each new group id, and the method will userunSubscriptionOnto run the processing on a worker thread pool (You cannot use@Blockingannotation in this scenario).You'll notice that while the message order is preserved inside a group, it'll happen concurrently across all groups.