Delete File after successful persist to MongoDB in Spring Integration

932 Views Asked by At

I have a Spring Integration flow that reads a csv file from a directory, splits the lines, then processes each line and extracts 2 objects from each line. These two objects are then send to two seperate int-mongodb:outbound-channel-adapter. I want to delete the incoming file after all of the lines have been processed and persisted. I have seen example of using the Transaction Manager to do this with the inbound adapter, but nothing with the outbound adapter. Is there a way to do this?

My config looks something like this:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
                http://www.springframework.org/schema/integration/mongodb http://www.springframework.org/schema/integration/mongodb/spring-integration-mongodb.xsd
                http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:int-mongodb="http://www.springframework.org/schema/integration/mongodb"
       xmlns:mongo="http://www.springframework.org/schema/data/mongo">

    <int:poller default="true" fixed-delay="50"/>

    <int-file:inbound-channel-adapter id="filesInChannel"
                                      directory="file:${file.ingest.directory}"
                                      auto-create-directory="true">
        <int:poller id="poller" fixed-rate="100">
        </int:poller>
    </int-file:inbound-channel-adapter>

    <task:executor id="executor" pool-size="10" queue-capacity="50" />
    <int:channel id="executorChannel">
        <int:queue capacity="50"/>
    </int:channel>

    <int:splitter input-channel="filesInChannel" output-channel="executorChannel"
                  expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>

    <int:service-activator id="lineParserActivator"  ref="lineParser" method="parseLine"
                           input-channel="executorChannel" output-channel="lineChannel">
        <int:poller task-executor="executor" fixed-delay="500">
        </int:poller>
    </int:service-activator>

    <bean name="lineParser" class="com.xxx.LineParser"/>

    <int:channel id="lineChannel">
        <int:queue/>
    </int:channel>
    <int:channel id="lineMongoOutput">
        <int:queue/>
    </int:channel>
    <int:channel id="actionMongoOutput">
        <int:queue/>
    </int:channel>

    <int:transformer input-channel="lineChannel" output-channel="lineMongoOutput">
        <bean id="lineTransformer" class="com.xxx.transformer.LineTransformer"></bean>  
    </int:transformer>
    <int:transformer input-channel="lineChannel" output-channel="actionMongoOutput">
        <bean id="actionTransformer" class="com.xxx.transformer.ActionTransformer"></bean>  
    </int:transformer>

    <mongo:db-factory id="mongoDbFactory" dbname="${mongo.db.name}" password="${mongo.db.pass}" username="${mongo.db.user}" port="${mongo.db.port}" host="${mongo.db.host}"/>

    <int-mongodb:outbound-channel-adapter id="lineMongoOutput"
                                          collection-name="full"
                                          mongodb-factory="mongoDbFactory" />
    <int-mongodb:outbound-channel-adapter id="actionMongoOutput"
                                          collection-name="action"
                                          mongodb-factory="mongoDbFactory" />
</beans>
1

There are 1 best solutions below

6
On BEST ANSWER

You can't really do it on the outbound adapter because you don't know when you're "done". Given you are asynchronously handing off to the downstream flow (via executors and queue channels), you can't do it on the inbound adapter either, because the poller thread will return to the adapter as soon as all the splits are sent.

Aside from that, I see some issues in your flow:

You seem to have an excessive amount of thread handoffs - you really don't need queue channels in the downstream flow because your executions are controlled by the exec. channel.

It is quite unusual to make every channel a QueueChannel.

Finally, you have 2 transformers subscribed to the same channel.

Do you realize that messages sent to lineChannel will alternate round-robin style.

Perhaps that is your intent, given your description, but it seems a little brittle to me; I would prefer to see the different data types going to different channels.

If you avoid using queue channels, and use gateways within your service activator to send out the data to the mongo adapters, your service activator would know when it is complete and be able to remove the file at that time.

EDIT:

Here is one solution (it writes to logs rather than mongo, but you should get the idea)...

<int-file:inbound-channel-adapter directory="/tmp/foo" channel="toSplitter">
    <int:poller fixed-delay="1000">
        <int:transactional synchronization-factory="sf" transaction-manager="ptxMgr" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="sf">
    <int:after-commit expression="payload.delete()" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/tmp/bad/' + payload.name))" />
</int:transaction-synchronization-factory>

<bean id="ptxMgr" class="org.springframework.integration.transaction.PseudoTransactionManager" />

<int:splitter input-channel="toSplitter" output-channel="processChannel">
    <bean class="org.springframework.integration.file.splitter.FileSplitter" />
</int:splitter>

<int:service-activator input-channel="processChannel">
    <bean class="foo.Foo">
        <constructor-arg ref="gate" />
    </bean>
</int:service-activator>

<int:gateway id="gate" service-interface="foo.Foo$Gate">
    <int:method name="toLine" request-channel="toLine" />
    <int:method name="toAction" request-channel="toAction" />
</int:gateway>

<int:channel id="toLine" />

<int:logging-channel-adapter channel="toLine" expression="'LINE:' + payload" level="WARN"/>

<int:channel id="toAction" />

<int:logging-channel-adapter channel="toAction" expression="'ACTION:' + payload" level="WARN"/>

.

public class Foo {

    private final Gate gateway;

    public Foo(Gate gateway) {
        this.gateway = gateway;
    }

    public void parse(String payload) {
        String[] split = payload.split(",");
        if (split.length != 2) {
            throw new RuntimeException("Bad row size: " + split.length);
        }
        this.gateway.toLine(split[0]);
        this.gateway.toAction(split[1]);
    }

    public interface Gate {

        void toLine(String line);

        void toAction(String action);
    }

}

.

@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class FooTests {

    @Test
    public void testGood() throws Exception {
        File file = new File("/tmp/foo/x.txt");
        FileOutputStream fos = new FileOutputStream(file);
        fos.write("foo,bar".getBytes());
        fos.close();
        int n = 0;
        while(n++ < 100 && file.exists()) {
            Thread.sleep(100);
        }
        assertFalse(file.exists());
    }

    @Test
    public void testBad() throws Exception {
        File file = new File("/tmp/foo/y.txt");
        FileOutputStream fos = new FileOutputStream(file);
        fos.write("foo".getBytes());
        fos.close();
        int n = 0;
        while(n++ < 100 && file.exists()) {
            Thread.sleep(100);
        }
        assertFalse(file.exists());
        file = new File("/tmp/bad/y.txt");
        assertTrue(file.exists());
        file.delete();
    }

}

Add a task executor to the <poller/> to process multiple files concurrently. Add a router as needed.