I have two IntegrationFlows both receive messages from Apache Kafka
first IntegrationFlow - in the input channel, Consumer1(concurrency=4) reads topic_1
second IntegrationFlow - in the input channel, Consumer2(concurrency=4) reads topic_2
but these two IntegrationFlows, send messages to the output channel, where one common class MyMessageHandler is specified
like this:
@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.handle(message)
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.handle(message)
.get();
}
class MyMessageHandler have method send(message), this method passes messages further to another service
class MyMessageHandler {
protected void handleMessageInternal(Message<?> message)
{
String postResponse = myService.send(message); // remote service calling
msgsStatisticsService.sendMessage(message, postResponse);
// *******
}
}
inside each IntegrationFlow, 4 Consumer-threads are working ( a total of 8 threads), and they all go to one class MyMessageHandler, into one metod send()
what problems could there be? two IntegrationFlow, do they see each other when they pass a message to one common class??? do I need to provide thread safety in the MyMessageHandler class??? Do I need to prepend the send () method with the word synchronized???
But what if we make a third IntegrationFlow?
so that only one IntegrationFlow can pass messages through itself to the MyMessageHandler class? then would it be thread safe? example:
@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1, "topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2, "topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public MessageChannel **SOME_CHANNEL**() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {
return IntegrationFlows
.from(**SOME_CHANNEL**())
.handle(message)
.get();
}
You need to make your handler code thread-safe.
Using
synchronized
on the whole method you will effectively disable the concurrency.It's better to use thread-safe techniques - no mutable fields or use limited synchronization blocks, just around critical code.