Spring Integration how to get file from many ftp server and dirictory?

1.1k Views Asked by At

I'm trying to set up fetching new files from multiple ftp servers and directories for post-processing. At the moment, only the first server from the collection is connected. Tell me what I'm doing wrong.

With my implementation, I see that there is no transition to the second server and errors appear: 1.LoggingHandler - org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving

FtpConfiguration.java

@Log4j2
@Configuration
@PropertySource("classpath:/ftp.properties")
public class FtpConfiguration {

@Value("${sftp.name}")
private String[] names;
@Value("${sftp.host}")
private String[] hosts;
@Value("${sftp.user}")
private String[] users;
@Value("${sftp.pwd}")
private String[] pwds;

private final MessageService messageService;

@Autowired
public FtpConfiguration(MessageService messageService) {
    this.messageService = messageService;
}


@Bean
public DelegatingSessionFactory<FTPFile> delegatingSessionFactory() {
    Map<Object, SessionFactory<FTPFile>> factories = new LinkedHashMap<>();
    for (int i = 0; i < this.names.length; i++) {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost(this.hosts[i]);
        factory.setUsername(this.users[i]);
        factory.setPassword(this.pwds[i]);
        factory.setClientMode(2);
        factories.put(this.names[i], factory);
    }
    return new DelegatingSessionFactory<>(factories, factories.values().iterator().next());
}

@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory(this.names[0], "/test"));
    keyDirectories.add(new RotationPolicy.KeyDirectory(this.names[1], "/prod"));
    return new RotatingServerAdvice(delegatingSessionFactory(), keyDirectories, true);
}

@Bean
public IntegrationFlow ftpIntegrationFlow() {
    return IntegrationFlows.from(
            Ftp.inboundStreamingAdapter(template())
                    .remoteDirectory("."),
            e -> e.poller(Pollers.fixedDelay(500).advice(advice())))
            .transform(new StreamTransformer("UTF-8"))
            .handle(message -> {
                log.info("Read file: {}", message.getHeaders()
                        .get("file_remoteDirectory" + "/" + message.getHeaders()
                                .get("file_remoteFile")));
                messageService.unmarshall(message);
            })
            .get();
}

@Bean
public FtpRemoteFileTemplate template() {
    return new FtpRemoteFileTemplate(delegatingSessionFactory());
}

}

ftp.properties

sftp.name=test,prod
sftp.host=10.10.10.11,10.10.10.12
sftp.user=user1,user2
sftp.pwd=pass1,pass2

The first server is polled, gets the file, goes to the second, and an error occurs. If I change the server places, then it also receives one file from the server, it goes to another error, it goes to the first one, it gets it successfully. I would like to take all the files from one server, when I ended up moving to another, so can I do it?

Stack trace

[DEBUG] 2021-03-17 23:07:23.352 [task-scheduler-1] DefaultFtpSessionFactory - Connected to server [10.10.10.12:21]
[DEBUG] 2021-03-17 23:07:42.592 [HikariPool-1 housekeeper] HikariPool - HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
[DEBUG] 2021-03-17 23:07:42.592 [HikariPool-1 housekeeper] HikariPool - HikariPool-1 - Fill pool skipped, pool is at sufficient level.
[DEBUG] 2021-03-17 23:08:10.543 [task-scheduler-1] FtpSession - failed to disconnect FTPClient
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:186) ~[?:?]
    at java.net.SocketInputStream.read(SocketInputStream.java:140) ~[?:?]
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[?:?]
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[?:?]
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:?]
    at java.io.InputStreamReader.read(InputStreamReader.java:185) ~[?:?]
    at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:?]
    at java.io.BufferedReader.read(BufferedReader.java:182) ~[?:?]
    at org.apache.commons.net.io.CRLFLineReader.readLine(CRLFLineReader.java:58) ~[commons-net-3.7.jar:3.7]
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:320) ~[commons-net-3.7.jar:3.7]
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:299) ~[commons-net-3.7.jar:3.7]
    at org.apache.commons.net.ftp.FTP.getReply(FTP.java:731) ~[commons-net-3.7.jar:3.7]
    at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1861) ~[commons-net-3.7.jar:3.7]
    at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:114) ~[spring-integration-ftp-5.4.2.jar:5.4.2]
    at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:155) ~[spring-integration-ftp-5.4.2.jar:5.4.2]
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:230) ~[spring-integration-file-5.4.2.jar:5.4.2]
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:210) ~[spring-integration-file-5.4.2.jar:5.4.2]
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.2.jar:5.3.2]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.2.jar:5.3.2]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.2.jar:5.3.2]
    at org.springframework.integration.aop.ReceiveMessageAdvice.invoke(ReceiveMessageAdvice.java:56) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.2.jar:5.3.2]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.2.jar:5.3.2]
    at com.sun.proxy.$Proxy125.receive(Unknown Source) ~[?:?]
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:408) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.2.jar:5.3.2]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-5.3.2.jar:5.3.2]
    at com.sun.proxy.$Proxy124.call(Unknown Source) ~[?:?]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:377) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:324) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.2.jar:5.3.2]
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:321) ~[spring-integration-core-5.4.2.jar:5.4.2]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.2.jar:5.3.2]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95) [spring-context-5.3.2.jar:5.3.2]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
[INFO ] 2021-03-17 23:08:10.554 [task-scheduler-1] FtpStreamingMessageSource - Removing the remote file 'FileInfo [isDirectory=false, isLink=false, Size=4792, ModifiedTime=Tue Mar 09 00:24:00 MSK 2021, Filename=XX.xml, RemoteDirectory=/test, Permissions=----------]' from the filter for a subsequent transfer attempt
[DEBUG] 2021-03-17 23:08:10.555 [task-scheduler-1] PublishSubscribeChannel - preSend on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[DEBUG] 2021-03-17 23:08:10.555 [task-scheduler-1] LoggingHandler - bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[ERROR] 2021-03-17 23:08:10.556 [task-scheduler-1] LoggingHandler - org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:391)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$3(AbstractPollingEndpoint.java:324)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:321)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.UncheckedIOException: IOException when retrieving /test/XX.xml
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:231)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:210)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.integration.aop.ReceiveMessageAdvice.invoke(ReceiveMessageAdvice.java:56)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
    at com.sun.proxy.$Proxy125.receive(Unknown Source)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:408)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
    at com.sun.proxy.$Proxy124.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:377)
    ... 13 more
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/XX.xml: 550
    at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:104)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:221)
    ... 37 more

[DEBUG] 2021-03-17 23:08:10.557 [task-scheduler-1] PublishSubscribeChannel - postSend (sent=true) on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: nested exception is java.io.UncheckedIOException: IOException when retrieving /test/XX.xml, headers={id=37cd10d7-41fd-0ac8-2630-14fd8c3a3cd5, timestamp=1616011690555}]
[DEBUG] 2021-03-17 23:08:11.075 [task-scheduler-2] DefaultFtpSessionFactory - Connected to server [10.10.10.11:21]
[DEBUG] 2021-03-17 23:08:11.201 [task-scheduler-2] SourcePollingChannelAdapter - Poll resulted in Message: GenericMessage [payload=org.apache.commons.net.io.SocketInputStream@77de7f0a,
0

There are 0 best solutions below