Read file locations from table and copy to specific folder using pollEnrich()

990 Views Asked by At

I am trying to write a camel route that reads a database table to get the list of absolute file paths and then copy those files to another folder. However only the file path is created as content instead of the original content.

    from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionSql + "?dataSource=dataSource")
    .split(body())
    .to("file://" + positionlistDir )
    .log("Finished copying the list of Files.")

Please let me know what am i missing here to convert an absolute file path to an actual file.

Update #1.

Below snippet is invoking the pollEnrich(). But instead the pollEnrich() is copying the no of files which is equal to the no of rows returned by the sql and not according to the file name from the previous exchange.

        String positionListSqlOptions = "?dataSource=dataSource";
//      String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${header.positionFileToBeCopied}";
        String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${body}";
        String positionStagingDirOptionsForWriting = "?doneFileName=${file:name}.DONE";

        from("timer://testDataGen?repeatCount=1")
        .to("sql:" + positionListSql + positionListSqlOptions)
        .split(body())
        \\ Getting the column value from the resultset which is a LinkedCaseInsensitiveMap and storing in the body
        .process(new positionFeederProcessor()) 
        .setHeader("positionFileToBeCopied", body())
        .pollEnrich("file://" + positionSrcDir + positionSrcDirOptions)
//      .pollEnrich().simple("file://" + positionSrcDir + positionSrcDirOptions)
        .to("file://" + positionStagingDir + positionStagingDirOptionsForWriting)
        .log("Finished copying the list of Files.");

I am still unable to get the actual file name passed to the pollingEnrich() endpoint. I tried extracting it from body as well as through a header too. What could have gone wrong.

2

There are 2 best solutions below

0
On

Well, Finally I was able to do this without using pollEnrich() at all.

    String positionListSqlOptions = "?dataSource=dataSource";
    String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${header.CamelFileName}";
    String positionStagingDirOptionsForWriting = "?fileName=${header.position.file.name}&doneFileName=${file:name}.DONE";

    from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionListSql + positionListSqlOptions)
    .routeId("Copier:")
    .setHeader("positionFileList", body())
    .log("Creating the list of position Files ...")
    .split(body())
    .process(new PositionListProcessor())
    .setHeader("position.file.name", body())
    .setHeader("position.dir.name", constant(positionSrcDir))
    .process(new PositionFileProcessor())
    .choice()
        .when(body().isNull())
            .log("Position File not found. ${header.position.file.name}")
        .otherwise()
            .to("file://" + positionStagingDir + positionStagingDirOptionsForWriting)
            .log("Position File Copied from Src to : " + "${header.CamelFileNameProduced} ... ${headers} ...");

And here are the processors.

public class PositionListProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
    LinkedCaseInsensitiveMap positionFilesResultSet = (LinkedCaseInsensitiveMap) exchange.getIn().getBody();
    try {
        String positionFileStr = positionFilesResultSet.get("PF_LOCATION_NEW").toString();
        }
        exchange.getOut().setBody(positionFileStr.trim());
    } catch (Exception e) {     }
} }




public class PositionFileProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
    String filename = exchange.getIn().getBody(String.class);
    String filePath = exchange.getIn().getHeader("position.dir.name", String.class);
    URI uri = new URI("file:///".concat(filePath.concat(filename)));
    File file = new File(uri);
    if (!file.exists()) {
            logger.debug((String.format("File %s not found on %s", filename, filePath)));
        exchange.getIn().setBody(null);
    } else {
        exchange.getIn().setBody(file);
    }
} }
1
On

the file component, when used on a to definition, produce a file with the content of the exchange, it doesn't read a file. you can use for example a pollEnrich processor :

from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionSql + "?dataSource=dataSource")
    .split(body())
    .pollEnrich().simple("file:folder?fileName=${body}")
    .to("file://" + positionlistDir )
    .log("Finished copying the list of Files.")