We are writing a Batch Job which takes a file as input from an FTP, generates some new files and writes them to an S3 bucket, and for this we are using Spring Integration.
The file in the FTP is an extraction from a DB and is updated each night.
The problem is that, when we start the app the first time, it connects well to the FTP, downloads the file, and uploads the generation result S3. Then we delete the downloaded file locally and wait to the next generation of the file in the FTP to restart the process. But it never downloads the file again.
Any idea?
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(ftpReader(),
spec -> spec.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(period)))
.enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
.releaseStrategy(group -> group.getMessages().size() == 2))
.transform(stockUnmarshaller)
.transform(stockTransformer)
.transform(stockMarshaller)
.transform(picturesDownloader)
.transform(picturesZipper)
.transform(stockIndexer)
.handle(directoryCleaner)
.nullChannel();
}
@Bean
public FtpInboundChannelAdapterSpec ftpReader() {
return Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(rootFolder)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(host);
sessionFactory.setUsername(userName);
sessionFactory.setPassword(password);
sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return sessionFactory;
}
Thanks in advance.
EDIT:
I use enrichHeaders
to ensure that the pipeline is triggered if we have exactly 2 files. Maybe the headers are not removed and the condition will be always greater than 2? Maybe it's the wrong manner to proceed?
Thanks again.
The inbound channel adapter has two filters
.filter
and.localFilter
.The first filters the remote files before downloading, the second filters files on the file system.
By default the
filter
is aFtpPersistentAcceptOnceFileListFilter
which will only fetch new or changed files.By default, the
localFilter
is anFileSystemPersistentAcceptOnceFileListFilter
which, again, will only pass a file a second time if it's timestamp has changed.So the file will only be reprocessed if its timestamp changes.
I suggest you run in a debugger to see why it is not passing the filter.