I have a Spring Integration flow that reads a csv file from a directory, splits the lines, then processes each line and extracts 2 objects from each line. These two objects are then send to two seperate int-mongodb:outbound-channel-adapter
. I want to delete the incoming file after all of the lines have been processed and persisted. I have seen example of using the Transaction Manager to do this with the inbound adapter, but nothing with the outbound adapter. Is there a way to do this?
My config looks something like this:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration/mongodb http://www.springframework.org/schema/integration/mongodb/spring-integration-mongodb.xsd
http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int-mongodb="http://www.springframework.org/schema/integration/mongodb"
xmlns:mongo="http://www.springframework.org/schema/data/mongo">
<int:poller default="true" fixed-delay="50"/>
<int-file:inbound-channel-adapter id="filesInChannel"
directory="file:${file.ingest.directory}"
auto-create-directory="true">
<int:poller id="poller" fixed-rate="100">
</int:poller>
</int-file:inbound-channel-adapter>
<task:executor id="executor" pool-size="10" queue-capacity="50" />
<int:channel id="executorChannel">
<int:queue capacity="50"/>
</int:channel>
<int:splitter input-channel="filesInChannel" output-channel="executorChannel"
expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
<int:service-activator id="lineParserActivator" ref="lineParser" method="parseLine"
input-channel="executorChannel" output-channel="lineChannel">
<int:poller task-executor="executor" fixed-delay="500">
</int:poller>
</int:service-activator>
<bean name="lineParser" class="com.xxx.LineParser"/>
<int:channel id="lineChannel">
<int:queue/>
</int:channel>
<int:channel id="lineMongoOutput">
<int:queue/>
</int:channel>
<int:channel id="actionMongoOutput">
<int:queue/>
</int:channel>
<int:transformer input-channel="lineChannel" output-channel="lineMongoOutput">
<bean id="lineTransformer" class="com.xxx.transformer.LineTransformer"></bean>
</int:transformer>
<int:transformer input-channel="lineChannel" output-channel="actionMongoOutput">
<bean id="actionTransformer" class="com.xxx.transformer.ActionTransformer"></bean>
</int:transformer>
<mongo:db-factory id="mongoDbFactory" dbname="${mongo.db.name}" password="${mongo.db.pass}" username="${mongo.db.user}" port="${mongo.db.port}" host="${mongo.db.host}"/>
<int-mongodb:outbound-channel-adapter id="lineMongoOutput"
collection-name="full"
mongodb-factory="mongoDbFactory" />
<int-mongodb:outbound-channel-adapter id="actionMongoOutput"
collection-name="action"
mongodb-factory="mongoDbFactory" />
</beans>
You can't really do it on the outbound adapter because you don't know when you're "done". Given you are asynchronously handing off to the downstream flow (via executors and queue channels), you can't do it on the inbound adapter either, because the poller thread will return to the adapter as soon as all the splits are sent.
Aside from that, I see some issues in your flow:
You seem to have an excessive amount of thread handoffs - you really don't need queue channels in the downstream flow because your executions are controlled by the exec. channel.
It is quite unusual to make every channel a QueueChannel.
Finally, you have 2 transformers subscribed to the same channel.
Do you realize that messages sent to
lineChannel
will alternate round-robin style.Perhaps that is your intent, given your description, but it seems a little brittle to me; I would prefer to see the different data types going to different channels.
If you avoid using queue channels, and use gateways within your service activator to send out the data to the mongo adapters, your service activator would know when it is complete and be able to remove the file at that time.
EDIT:
Here is one solution (it writes to logs rather than mongo, but you should get the idea)...
.
.
Add a task executor to the
<poller/>
to process multiple files concurrently. Add a router as needed.