If two integrationflows are passed to one general MessageHandler class, is it thread safe? in Spring Integration DSL

190 Views Asked by At

I have two IntegrationFlows both receive messages from Apache Kafka

first IntegrationFlow - in the input channel, Consumer1(concurrency=4) reads topic_1

second IntegrationFlow - in the input channel, Consumer2(concurrency=4) reads topic_2

but these two IntegrationFlows, send messages to the output channel, where one common class MyMessageHandler is specified

like this:

@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                    .configureListenerContainer(configureListenerContainer_priority1)
                    )
            .handle(message)
            .get();
}


@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                    .configureListenerContainer(configureListenerContainer_priority2)
                    )
            .handle(message)
            .get();
}

class MyMessageHandler have method send(message), this method passes messages further to another service

class MyMessageHandler {
            
    protected void handleMessageInternal(Message<?> message)
    {
        String postResponse = myService.send(message); // remote service calling
        msgsStatisticsService.sendMessage(message, postResponse);
        // *******
    }
}

inside each IntegrationFlow, 4 Consumer-threads are working ( a total of 8 threads), and they all go to one class MyMessageHandler, into one metod send()

what problems could there be? two IntegrationFlow, do they see each other when they pass a message to one common class??? do I need to provide thread safety in the MyMessageHandler class??? Do I need to prepend the send () method with the word synchronized???

But what if we make a third IntegrationFlow?

so that only one IntegrationFlow can pass messages through itself to the MyMessageHandler class? then would it be thread safe? example:

@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory1, "topic_1")
                .configureListenerContainer(configureListenerContainer_priority1)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory2, "topic_2")
                .configureListenerContainer(configureListenerContainer_priority2)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public MessageChannel **SOME_CHANNEL**() {

    DirectChannel channel = new DirectChannel();
    return channel;
 }

@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {

return IntegrationFlows
        .from(**SOME_CHANNEL**())
        .handle(message)
        .get();
}
1

There are 1 best solutions below

2
On

You need to make your handler code thread-safe.

Using synchronized on the whole method you will effectively disable the concurrency.

It's better to use thread-safe techniques - no mutable fields or use limited synchronization blocks, just around critical code.