I have a Integration flow configured using Java DSL which pulls file from Ftp server using Ftp.inboundChannelAdapter
then transforms it to JobRequest
, then I have a .handle()
method which triggers my batch job, everything is working as per required but the process in running sequentially for each file inside the FTP folder
I added currentThreadName
in my Transformer Endpoint it was printing same thread name for each file
Here is what I have tried till now
1.task executor bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
2.Integration flow
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
3.I also read in another SO thread that I need ExecutorChannel
so I configured one but I don't know how to inject this channel into my Ftp.inboundAdapter
, from logs is see that the channel is always integrationFlow.channel#0
which I guess is a DirectChannel
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
I dont know what I'm missing here, or I might have not properly understood Spring Messaging System as I'm very much new to Spring and Spring-Integration
Any help is appreciated
Thanks
The
ExecutorChannel
you can simply inject into the flow and it is going to be applied to theSourcePollingChannelAdapter
by the framework. So, having thatinputChannel
defined as a bean you just do this:before your
.transform(fileMessageToJobRequest(importUserJob(step1())))
. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channelsOn the other hand to process your files in parallel according your
.taskExecutor(taskExecutor())
configuration, you just need to have a.maxMessagesPerPoll(20)
as1
. The logic in theAbstractPollingEndpoint
is like this:So, we do have tasks in parallel, but only when they reach that
maxMessagesPerPoll
where it is20
in your current case. There is also some explanation in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer