Spring integration rotating server advice with SFTP outbound gateway

38 Views Asked by At

I wanted to ask if I could use a rotating server advice with SFTP outbound Gateway, I am polling files with SFTP inbound adapter, and in the poller method I'm using rotating server advice. It works flawlessly, but I want to rename the files when I'm done with the flow.

I'm using SFTP outbound gateway, it does not need a poll. I just want to rename whenever I get a message in my output channel. How can I use advice or make sure that the right file is renamed on the right server, I have multiple servers and multiple directories?

Also, similar question for a different solution, is there a way to just list files on my multiple servers and directories. I don't want to download or get a stream to the file I just want to get the file name and path. For this as well, I'm using SFTP outbound gateway with list command but it seems that it does not rotate on my multiple servers/directories either.

2

There are 2 best solutions below

1
Artem Bilan On BEST ANSWER

No, you cannot use a RotatingServerAdvice on the outbound gateway: it is fully tied to the MessageSource logic in something like that SFTP inbound adapter. However you still can use a DelegatingSessionFactory on the SFTP outbound gateway together with a ContextHolderRequestHandlerAdvice to set a thread local key for the target session factory. Then SftpRemoteFileTemplate is going to select a proper target session factory for the specific gateway operation.

You can have a splitter for those keys to iterate before your SFTP outbound gateway to list dir on the specific server. Then you can have an aggregator to collect those results into a single message.

The SFTP inbound adapter provides some specific info into the message it emits:

        String remoteFileUri = this.synchronizer.getRemoteFileMetadata(messageBuilder.getPayload());
        if (remoteFileUri != null) {
            URI uri = URI.create(remoteFileUri);
            messageBuilder.setHeader(FileHeaders.REMOTE_HOST_PORT, uri.getHost() + ':' + uri.getPort())
                    .setHeader(FileHeaders.REMOTE_DIRECTORY, uri.getPath())
                    .setHeader(FileHeaders.REMOTE_FILE, uri.getFragment());
        }

So, you may use that FileHeaders.REMOTE_HOST_PORT to determine the target server for file renaming in the end of your flow.

See more info in docs:

https://docs.spring.io/spring-integration/reference/sftp/remote-file-info.html

https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html

4
Ricardo Gellman On

Solution 1

If you want to rename files after processing with a rotating server advice in an SFTP outbound gateway, I would use a channel interceptor, it grabs info from the message to decide which server and directory the file is on, then handles the renaming. For listing files on multiple servers and directories without downloading or streaming, you'd follow a similar approach with the interceptor, but instead of renaming, it would list files using the SFTP outbound gateway, avoiding file content transfer.

import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptorAdapter;

public class FileRenameInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        //Extract info from the msg and get server and dir, rename file etc
        renameFile(server, directory, fileName);

        return super.preSend(message, channel);
    }

    private void renameFile(String server, String directory, String fileName) {
        //file renaming logic
    }
}

Add config to applicationContext.xml

<int:channel id="outputChannel">
    <int:interceptors>
        <bean class="com.example.FileRenameInterceptor"/>
    </int:interceptors>
</int:channel>

Solution 2

using DelegatingSessionFactory

import org.springframework.integration.channel.interceptor.ChannelInterceptorAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.file.remote.session.SessionFactory;

public class SessionFactoryPassingInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        //extract session factory information from message headers
        SessionFactory<?> sessionFactory = (SessionFactory<?>) message.getHeaders().get("sessionFactory");

        //pass session factory to thread local for late renaming
        return super.preSend(message, channel);
    }
}

Add config to applicationContext.xml

<int:channel id="outputChannel">
    <int:interceptors>
        <bean class="com.example.SessionFactoryPassingInterceptor"/>
    </int:interceptors>
</int:channel>

When publishing a message to the output channel after the Spring Batch job finishes, ensure that you include the session factory information in the message headers

@Autowired
@Qualifier("outputChannel")
private MessageChannel outputChannel;

@Autowired
private DelegatingSessionFactory sessionFactory;

//in your Sping Batch job completion 
Message<?> message = MessageBuilder
                        .withPayload(payload)
                        .setHeader("sessionFactory", sessionFactory)
                        .setHeader("filePath", filePath)
                        .build();
outputChannel.send(message);

Attache session factory details to messages as headers

import org.springframework.integration.annotation.Transformer;

@Autowired
private DelegatingSessionFactory sessionFactory;

@Transformer(inputChannel = "outputChannel")
public void renameFile(Message<?> message) {
    SessionFactory<?> sessionFactory = (SessionFactory<?>) message.getHeaders().get("sessionFactory");
    String filePath = (String) message.getHeaders().get("filePath");
    
    //get filePath to rename file from sessionFactory
}