I'm working on an application using Spring Boot 1.5.13.RELEASE and Spring Integration 4.3.16.RELEASE.
I'm pretty new to Spring Integration and I have encountered a problem.
So the basic idea is that on some external triggers (could be and HTTP call) I need to create an IntegrationFlow which will consume messages from a rabbitMQ queue, do some work with 'em and then (maybe) produce to another rabbitMQ endpoint.
Now this is supposed to happen a lot of times so I will have to create multiple IntegrationFlows.
I am using IntegrationFlowContext to register each one of the IntegrationFlows like this:
IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();
I have to clarify that this can be happening concurrently, creating multiple IntegrationFlows at the same time.
So each time I'm trying to create an IntegrationFlow, my "source" is a Gateway that looks like this:
MessagingGatewaySupport sourceGateway = Amqp
.inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
.concurrentConsumers(1)
.adviceChain(retryInterceptor)
.autoStartup(false)
.id("sgX-" + uuid)
.get();
It's not a @Bean (yet) but I expect it to get registered when each IntegrationFlow is registered.
My "target" is an AmqpOutBoundAdapter that looks like this:
@Bean
public AmqpOutboundEndpoint outboundAdapter(
RabbitTemplate rabbitTemplate,
ApplicationMessagingProperties applicationMessagingProperties
) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("someStandardExchange")
.routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
.get();
}
Now this one IS a bean already and is injected each time I'm trying to create a flow.
And my flow(s) looks like this:
public IntegrationFlow configure() {
return IntegrationFlows
.from(sourceGateway)
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
.filter(injectedGenericSelectorFilter)
.<HashMap<String, String>>handle((payload, headers) -> {
String uuid = payload.get("uuid");
boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
myInjectedApplicationService.handlePayload(payload);
return MessageBuilder
.withPayload(payload)
.setHeader("shouldForward", shouldForwardMessage)
.setHeader("rabbitmq.ROUTING_KEY", uuid)
.build();
})
.filter("headers.get('shouldForward').equals(true)")
.transform(Transformers.toJson(jsonObjectMapper))
.handle(outboundAdapter)
.get();
}
My problem is that while the application starts fine and creates the first IntegrationFlows,etc. later on, I'm getting this kind of exceptions:
java.lang.IllegalStateException: Could not register object [org.springframework.integration.transformer.MessageTransformingHandler#872] under bean name 'org.springframework.integration.transformer.MessageTransformingHandler#872': there is already object [org.springframework.integration.transformer.MessageTransformingHandler#872] bound
I even tried setting an id to each of the components used, which is supposed to be used as beanName , like this:
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))
But, even though bean name problems with components like .filter were resolved, I still get the same exception about a MessageTransformingHandler.
UPDATE
I didn't mention the fact that once each IntegrationFlow
is done with its work, it is getting removed using the IntegrationFlowContext
like this:
flowContext.remove(flowId);
So what seems to have (kind of) worked is synchronizing both the flow registration block and the flow removing block by using the same object as a lock.
So my class responsible for registering and removing flows looks like this:
...
private final Object lockA = new Object();
...
public void appendNewFlow(String callUUID){
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);
synchronized (lockA) {
flowContext.registration(integrationFlow).id(callUUID).register();
}
}
public void removeFlow(String flowId){
synchronized (lockA) {
flowContext.remove(flowId);
}
}
...
My problem now is this kind of lock is kinda heavy for the application, since I'm getting quite a lot:
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.
which doesn't happen as fast as I'd like to.
But I guess that is expected since each time a thread acquires the lock, it will take some time to either register the flow and all its components or deregister the flow and all its components.
You also have there this one:
How does it work if you add an
.id()
there as well?On the other hand, since you say that this happens concurrently, I wonder if you can make some piece of your code
synchonized
, e.g. wrap thatflowContext.registration(integrationFlow).id(callUUID).register();
.The bean definition and registration process is really not thread-safe and intended to be used only from the one, initializing thread in the beginning of application lifecycle.
We probably really need to make an
IntegrationFlowContext
as thread-safe in itsregister(IntegrationFlowRegistrationBuilder builder)
function or, at least, itsregisterBean(Object bean, String beanName, String parentName)
since this is exactly a place where we generate bean name and register it.Feel free to raise a JIRA on the matter.
Unfortunately, Spring Integration Java DSL extension project is already out of support and we can add a fix only to the current
5.x
generation. Nevertheless I believe thatsynchonized
workaround should work here, therefore no need to back port it into the Spring Integration Java DSL extension.