The use case is that I have a zip
file containing many csv
files. Each line in each file is then sent to a seda
queue for processing. The problem I'm having is that I'd like to know when every line has been processed by the seda
queue to perform some other work. I'm not sure how to approach this. Currently, I'm investigating using polling to test when the seda
queue is empty, but this could produce false results if the lines get processed faster than they arrive.
Code
I have a class that unzips the zip
file and reads each file as an InputStream
. Each line in the file is then sent to a producer which in turn is sent to a seda
queue.
@Component
public class CsvProcessor {
@Resource(name = "csvLineProducer")
ProducerTemplate producer;
public void process(InputStream flatFileStream) throws IOException {
if (flatFileStream==null) return;
try {
LineIterator it = IOUtils.lineIterator(flatFileStream, "UTF-8");
while (it.hasNext()) {
final String recordLine = it.nextLine();
this.producer.send(new Processor() {
public void process(Exchange outExchange) {
outExchange.getIn().setBody(recordLine);
}
});
}
} finally {
IOUtils.closeQuietly(flatFileStream);
}
}
}
Here is the camel config.
<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true">
<template id="csvLineProducer" defaultEndpoint="seda:flatRecordStream"/>
...
<route>
<from uri="seda:flatRecordStream" />
<bean ref="myProcessor" method="processLine" />
</route>
...
</camelContext>