I am trying to create client for Apache Zookeeper server with Apache Curator.
Idea: To create client for some worker or job that will be running with some interval. For now I have 3 clients with this worker. Each class has isDefaultLeader field that means that this client must be a leader (if it's connected to Zoo server).
If so, then this client must be a leader otherwise leader must be selected automatically.
So if 2 client working with isDefaultLeader=false then leader must be selected automatically, if third leader join with isDefaultLeader=true then this one must become leader.
I am going to launch several such clients for each specific task and I would not like all the tasks to be performed on one node and I need some kind of task distribution tool. I mean, if we first launch a set of different tasks on the first server, they will immediately become leaders, and when I launch the same tasks on another machine, they will already have a leader and will all be executed on 1 machine. So I need to distribute it somehow.
So this is the only thing that came to my mind, maybe you have some better ideas.
How I can implement this?
There is my test code:
package org.example;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.CountDownLatch;
public class DistributedProgram {
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183";
private static final String LEADER_PATH = "/leader-election";
private static final String PROGRAM_INSTANCE_NAME = "my-program-instance";
private static boolean isDefaultLeader = true;
public static void main(String[] args) {
final CuratorFramework client = CuratorFrameworkFactory.newClient(
ZOOKEEPER_CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3));
client.start();
final CountDownLatch shutdownLatch = new CountDownLatch(1);
try {
final LeaderSelector leaderSelector = new LeaderSelector(client, LEADER_PATH, new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) {
System.out.println("I am the leader. Performing the work.");
try {
while (!Thread.currentThread().isInterrupted()) {
executeJob();
}
} finally {
shutdownLatch.countDown();
}
}
@Override
public void stateChanged(CuratorFramework curatorFramework, org.apache.curator.framework.state.ConnectionState connectionState) {
if (connectionState == org.apache.curator.framework.state.ConnectionState.SUSPENDED
|| connectionState == org.apache.curator.framework.state.ConnectionState.LOST) {
System.out.println("I am not the leader. Waiting...");
// isLeader = false;
} else if (connectionState == ConnectionState.CONNECTED) {
System.out.println("NEW CONNECTION!!!");
}
}
});
leaderSelector.setId(PROGRAM_INSTANCE_NAME);
leaderSelector.autoRequeue();
if (isDefaultLeader) {
leaderSelector.requeue();
}
leaderSelector.start();
shutdownLatch.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
private static void executeJob() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Executing the custom job as a leader.");
}
}