I have a Integration flow configured using Java DSL which pulls file from Ftp server using Ftp.inboundChannelAdapter then transforms it to JobRequest, then I have a .handle() method which triggers my batch job, everything is working as per required but the process in running sequentially for each file inside the FTP folder
I added currentThreadName in my Transformer Endpoint it was printing same thread name for each file
Here is what I have tried till now
1.task executor bean
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("Integration");
}
2.Integration flow
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) throws IOException {
return IntegrationFlows.from(Ftp.inboundAdapter(myFtpSessionFactory)
.remoteDirectory("/bar")
.localDirectory(localDir.getFile())
,c -> c.poller(Pollers.fixedRate(1000).taskExecutor(taskExecutor()).maxMessagesPerPoll(20)))
.transform(fileMessageToJobRequest(importUserJob(step1())))
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.route(JobExecution.class,j->j.getStatus().isUnsuccessful()?"jobFailedChannel":"jobSuccessfulChannel")
.get();
}
3.I also read in another SO thread that I need ExecutorChannel so I configured one but I don't know how to inject this channel into my Ftp.inboundAdapter, from logs is see that the channel is always integrationFlow.channel#0 which I guess is a DirectChannel
@Bean
public MessageChannel inputChannel() {
return new ExecutorChannel(taskExecutor());
}
I dont know what I'm missing here, or I might have not properly understood Spring Messaging System as I'm very much new to Spring and Spring-Integration
Any help is appreciated
Thanks
The
ExecutorChannelyou can simply inject into the flow and it is going to be applied to theSourcePollingChannelAdapterby the framework. So, having thatinputChanneldefined as a bean you just do this:before your
.transform(fileMessageToJobRequest(importUserJob(step1()))). See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channelsOn the other hand to process your files in parallel according your
.taskExecutor(taskExecutor())configuration, you just need to have a.maxMessagesPerPoll(20)as1. The logic in theAbstractPollingEndpointis like this:So, we do have tasks in parallel, but only when they reach that
maxMessagesPerPollwhere it is20in your current case. There is also some explanation in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer