Spring integration inboundChannelAdapter stops polling unexpectedly

304 Views Asked by At

In our project we need to retrieve prices from a remote ftp server. During the office hours this works fine, prices are retrieved and successfully processed. After office hours there are no new prices published on the ftp server, so as expected we don't find anything new.

Our problem is that after a few hours of not finding new prices, the poller just stops polling. No error in the logfiles (even when running on org.springframework.integration on debug level) and no exceptions. We are now using a separate TaskExecutor to isolate the issue, but still the poller just stops. In the mean time we adjusted the cron expression to match these hours, to limited the resource use, but still the poller just stops when it is supposed to run.

Any help to troubleshoot this issue is very much appreciated!

We use an @InboudChannelAdapter on a FtpStreamingMessageSource which is configured like this:

@Bean
    @InboundChannelAdapter(
        value = FTP_PRICES_INBOUND,
        poller = [Poller(
            maxMessagesPerPoll = "\${ftp.fetch.size}",
            cron = "\${ftp.poll.cron}",
            taskExecutor = "ftpTaskExecutor"
        )],
        autoStartup = "\${ftp.fetch.enabled:false}"
    )
    fun ftpInboundFlow(
        @Value("\${ftp.remote.prices.dir}") pricesDir: String,
        @Value("\${ftp.remote.prices.file.pattern}") remoteFilePattern: String,
        @Value("\${ftp.fetch.size}") fetchSize: Int,
        @Value("\${ftp.fetch.enabled:false}") fetchEnabled: Boolean,
        clock: Clock,
        remoteFileTemplate: RemoteFileTemplate<FTPFile>,
        priceParseService: PriceParseService,
        ftpFilterOnlyFilesFromMaxDurationAgo: FtpFilterOnlyFilesFromMaxDurationAgo
    ): FtpStreamingMessageSource {
        val messageSource = FtpStreamingMessageSource(remoteFileTemplate, null)

        messageSource.setRemoteDirectory(pricesDir)
        messageSource.maxFetchSize = fetchSize
        messageSource.setFilter(
            inboundFilters(
                remoteFilePattern,
                ftpFilterOnlyFilesFromMaxDurationAgo
            )
        )
        return messageSource;
    }

The property values are:

poll.cron: "*/30 * 4-20 * * MON-FRI"
fetch.size: 10
fetch.enabled: true

We limit the poll.cron we used the retrieve every minute.

In the related DefaultFtpSessionFactory, the timeouts are set to 60 seconds to override the default value of -1 (which means no timeout at all):

sessionFactory.setDataTimeout(timeOut)
sessionFactory.setConnectTimeout(timeOut)
sessionFactory.setDefaultTimeout(timeOut)
2

There are 2 best solutions below

1
On

Maybe my answer seems a bit too easy, bit is it because your cron expression states that it should schedule the job between 4 and 20 hour. After 8:00 PM it will not schedule the job anymore and it will start polling again at 4:00 AM.

0
On

It turned out that the processing took longer than the scheduled interval, so during processing a new task was already executed. So eventually multiple task were trying to accomplish the same thing.

We solved this by using a fixedDelay on the poller instead of a fixedRate. The difference is that a fixedRate schedules on a regular interval independent if the task was finished and the fixedDelay schedules a delay after the task is finished.