I have an FTP Streaming Inbound Channel Adapter from Spring Integration which produces message with payloads of type InputStream, letting files be fetched without writing to the local file system.
@Bean
@InboundChannelAdapter(channel = Constants.CHANNEL_INBOUND_FTP_ADAPTER, poller = @Poller(fixedDelay = Constants.FIXED_POLLING_FROM_INBOUND_FTP_ADAPTER_DELAY))
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource ftpStreamingMessageSource = new FtpStreamingMessageSource(ftpRemoteFileTemplate());
ftpStreamingMessageSource.setRemoteDirectory(ftpConnectionParameters.getRootDir());
ftpStreamingMessageSource.setFilter(chainFileListFilter());
ftpStreamingMessageSource.setMaxFetchSize(Constants.INBOUND_ADAPTER_MAX_FETCH_SIZE);
return ftpStreamingMessageSource;
}
After I transform with
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = Constants.CHANNEL_INBOUND_FTP_ADAPTER, outputChannel = Constants.CHANNEL_STREAMED_DATA)
public Transformer transformer() {
return new StreamTransformer(Charset.defaultCharset().name());
}
Then handle data to check it works and maybe for custom inteceptors for future:
@ServiceActivator(inputChannel = Constants.CHANNEL_STREAMED_DATA, outputChannel = "BATCH_ALARM_CHANNEL")
public Message<?> alarmHandler(Message<?> message) {
System.out.println(Constants.CHANNEL_ALARM);
System.out.println(message.getHeaders());
return message;
}
After this according to official Spring Batch Integration documentation I have one more Transformer which let us transform to JobLaunchRequest
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("dummy", new Date());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
Here we have Message from last BATCH_ALARM_CHANNEL which needs in Spring Batch Jobs, but JobParametersBuilder doesn't allow to put complex object only primitive types. So how I can pass message payload for JobLaunching and do the rest of the work such as read, parse and save to DB?