I have a simple spring integration flow receiving a message from an AmqpInboundChannelAdapter. After some transformations, the messages must be put into an AWS Kinesis data stream.
@Bean
public AmqpInboundChannelAdapter inboundChannelAdapter(
ConnectionFactory connectionFactory,
@Qualifier(INPUT_CHANNEL_NAME) MessageChannel inputChannel) {
return Amqp.inboundAdapter(connectionFactory,"amqpInputQueue")
.configureContainer(c -> {
c.acknowledgeMode(AcknowledgeMode.MANUAL);
c.concurrentConsumers(1);
c.prefetchCount(250);
c.defaultRequeueRejected(false);
})
.outputChannel(inputChannel).get();
}
@Bean
public IntegrationFlow myFlow(
@Qualifier(INPUT_CHANNEL_NAME) MessageChannel input,
@Qualifier(OUTBOUND_HANDLER) MessageHandler outboundHandler) {
return IntegrationFlows
.from(input)
.transform(....)
.enrichHeaders(h -> h.headerFunction(PARTITION_KEY_HEADER, message -> getPartitionKey(message)))
.log(Level.INFO, m -> format("START PROCESSING MESSAGE (partitionKey: %s)", m.getHeaders().get(PARTITION_KEY_HEADER)))
.transform(....)
.handle(outboundHandler)
.get();
}
The received messages, in the payload, contain a dynamic partition key (I don't know all the possible keys).
How can I use the partition key to process the messages in parallel, per partition key? Moreover, I must maintain the order of the messages per partition key: the first message received (per partition key) must be sent first to kinesis.
I've read about the ExecutorChannel, but I haven't understood how to direct an input message to a specific thread used only for that partition key.
There is no way to choose a specific thread from a pool, plus there is no guarantee that you would be able to have as many threads as you are going to have partition key.
However we can come up with a solution like this:
You declare an
ExecutorChannelwith a single threadExecutor-Executors.newSingleThreadExecutor().You declare as many beans of this
ExecutorChannelas you think would enough for your concurrency support.All of them may be marked with the
@BridgeToto send to the common processing endpoint.Then you use a
Routerto determine anExecutorChannelbean name by your partition key.This way all the messages with the same partition key are going to be handled by the same thread in the respective
ExecutorChanneland all downstream operations for that message are going to happen exactly on this thread.There is no way to do that with Java DSL, but that even must not be: a router is a terminal endpoint in one
IntegrationFlow, allExecutorChannelinstances@BridgeToto input channel of anotherIntegrationFlow.