How to read file only once when app deployed on two nodes

722 Views Asked by At

I'm going to read files from SFTP location line by line:

@Override
public void configure() {
    from(sftpLocationUrl)
            .routeId("route-name")
            .split(body().tokenize("\n"))
            .streaming()
            .bean(service, "build")
            .to(String.format("activemq:%s", queueName));
}

But this application will be deployed on two nodes, and I think that in this case, I can get an unstable and unpredictable application work because the same lines of the file can be read twice. Is there a way to avoid such duplicates in this case?

2

There are 2 best solutions below

2
On BEST ANSWER

Camel has some (experimental) clustering capabilities - see here.

In your particular case, you could model a route which is taking the leadership when starting the directory polling, preventing thereby other nodes from picking the (same or other) files.

1
On

soluion is active passive mode . “In active/passive mode, you have a single master instance polling for files, while all the other instances (slaves) are passive. For this strategy to work, some kind of locking mechanism must be in use to ensure that only the node holding the lock is the master and all other nodes are on standby.”

it can implement with hazelcast, consul or zookeper

public class FileConsumerRoute extends RouteBuilder {

private int delay;
private String name;

public FileConsumerRoute(String name, int delay) {
    this.name = name;
    this.delay = delay;
}

@Override
public void configure() throws Exception {
    // read files from the shared directory
    from("file:target/inbox" +
            "?delete=true")
        // setup route policy to be used
        .routePolicyRef("myPolicy")
        .log(name + " - Received file: ${file:name}")
        .delay(delay)
        .log(name + " - Done file:     ${file:name}")
        .to("file:target/outbox");
}}

ServerBar

public class ServerBar {

private Main main;

public static void main(String[] args) throws Exception {
    ServerBar bar = new ServerBar();
    bar.boot();
}

public void boot() throws Exception {
    // setup the hazelcast route policy
    ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
    // the service names must be same in the foo and bar server
    routePolicy.setServiceName("myLock");
    routePolicy.setTtl(5);

    main = new Main();
    // bind the hazelcast route policy to the name myPolicy which we refer to from the route
    main.bind("myPolicy", routePolicy);
    // add the route and and let the route be named Bar and use a little delay when processing the files
    main.addRouteBuilder(new FileConsumerRoute("Bar", 100));
    main.run();
}

}

Server Foo

public class ServerFoo {

private Main main;

public static void main(String[] args) throws Exception {
    ServerFoo foo = new ServerFoo();
    foo.boot();
}

public void boot() throws Exception {
    // setup the hazelcast route policy
    ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
    // the service names must be same in the foo and bar server
    routePolicy.setServiceName("myLock");
    routePolicy.setTtl(5);

    main = new Main();
    // bind the hazelcast route policy to the name myPolicy which we refer to from the route
    main.bind("myPolicy", routePolicy);
    // add the route and and let the route be named Bar and use a little delay when processing the files
    main.addRouteBuilder(new FileConsumerRoute("Foo", 100));
    main.run();
}}

Source : Camel In Action 2nd Edition