How to notify when a Camel seda queue is complete?

793 Views Asked by At

The use case is that I have a zip file containing many csv files. Each line in each file is then sent to a seda queue for processing. The problem I'm having is that I'd like to know when every line has been processed by the seda queue to perform some other work. I'm not sure how to approach this. Currently, I'm investigating using polling to test when the seda queue is empty, but this could produce false results if the lines get processed faster than they arrive.

Code

I have a class that unzips the zip file and reads each file as an InputStream. Each line in the file is then sent to a producer which in turn is sent to a seda queue.

@Component
public class CsvProcessor { 
    @Resource(name = "csvLineProducer")
    ProducerTemplate producer;

    public void process(InputStream flatFileStream) throws IOException {
        if (flatFileStream==null) return;

        try {
            LineIterator it = IOUtils.lineIterator(flatFileStream, "UTF-8");
            while (it.hasNext()) {
                final String recordLine = it.nextLine();
                this.producer.send(new Processor() {
                    public void process(Exchange outExchange) {
                        outExchange.getIn().setBody(recordLine);
                    }
                });
            }
        } finally {
            IOUtils.closeQuietly(flatFileStream);
        }
    }
}

Here is the camel config.

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true">
    <template id="csvLineProducer" defaultEndpoint="seda:flatRecordStream"/>

    ...

    <route>
        <from uri="seda:flatRecordStream" />
        <bean ref="myProcessor" method="processLine" />
    </route>

    ...

</camelContext>
0

There are 0 best solutions below