How do i can read all the files from aws s3 directory in a single poll using apache-camel

1.8k Views Asked by At

Im trying to achieve :

  1. Read all the files from the s3 directory.
  2. copy all the files to backup directory on s3.
  3. aggregate all the file contents to a single file and copy it to the another directory on s3.

But Im stuck on the first point to read all the files in a single poll.

my from router : aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s

for example if, some_path_on_s3 -> has 2 files say first.txt and 
second.txt

according to camel documentation, it has to read both the files in a 
single poll, but is reading 1 file per poll.

I also tried with parameter,  maxMessagesPerPoll=2 but no luck. It 
still reads one file per poll.

Is there a way to fetch all the files from s3 directory in a single poll ?

2

There are 2 best solutions below

5
On BEST ANSWER

Truth is it sends one file at a time to the route, but it acknowledges the whole batch per poll.

maxMessagesPerPoll only creates a limit on the number of files read per batch. I think the information you are looking for is on the camel batch headers on every exchange:

CamelBatchComplete: A boolean indicating the last Exchange in the batch. Is only true for the last entry.

CamelBatchIndex: The current index of the batch. Starts from 0.

CamelBatchSize: The total number of Exchanges that was polled in this batch.

With this info you can multicast the message , then implement an aggregator to join the files on one route, once CamelBatchComplete=true, and backup the files on another.

Find more info here:

Batch consumer

Multicast

0
On
  • I got it working here,

    from("file://<some_path_to_dir>")
    .routeId("some_route_id")
    .to("backup_dir")
    .to("direct:aggregate")
    .end();
    
    
    
    from("direct:aggregate")
    .routeId("aggregate_router")
    .aggregate(constant(true), new GroupedExchangeAggregationStrategy())
            .completionPredicate(exchange -> {
                List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                Exchange latestExchange = list.get(list.size() - 1);
                return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE);
            })
    .to("direct:merge");
    
    
    from("direct:merge")
            .routeId("merge_router")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                    StringBuilder builder = new StringBuilder();
                    for(Exchange ex : list){
                        builder.append(ex.getIn().getBody(String.class));
                    }
    
                    exchange.getIn().setBody(builder.toString());
                    // set any other necessary header if required here
                    // example, if aws s3 is the endpoint, set the S3Constants.KEY header here
                }
            })
    .to("some_final_endpoint");