How to specify default Leader for Apache Curator

40 Views Asked by At

I am trying to create client for Apache Zookeeper server with Apache Curator.

Idea: To create client for some worker or job that will be running with some interval. For now I have 3 clients with this worker. Each class has isDefaultLeader field that means that this client must be a leader (if it's connected to Zoo server).

If so, then this client must be a leader otherwise leader must be selected automatically.

So if 2 client working with isDefaultLeader=false then leader must be selected automatically, if third leader join with isDefaultLeader=true then this one must become leader.

I am going to launch several such clients for each specific task and I would not like all the tasks to be performed on one node and I need some kind of task distribution tool. I mean, if we first launch a set of different tasks on the first server, they will immediately become leaders, and when I launch the same tasks on another machine, they will already have a leader and will all be executed on 1 machine. So I need to distribute it somehow.

So this is the only thing that came to my mind, maybe you have some better ideas.

How I can implement this?

There is my test code:

package org.example;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;

public class DistributedProgram {

    private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183";
    private static final String LEADER_PATH = "/leader-election";
    private static final String PROGRAM_INSTANCE_NAME = "my-program-instance";
    private static boolean isDefaultLeader = true;


    public static void main(String[] args) {
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZOOKEEPER_CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3));
        client.start();

        final CountDownLatch shutdownLatch = new CountDownLatch(1);


        try {
            final LeaderSelector leaderSelector  = new LeaderSelector(client, LEADER_PATH, new LeaderSelectorListener() {
                @Override
                public void takeLeadership(CuratorFramework curatorFramework) {
                    System.out.println("I am the leader. Performing the work.");

                    try {
                        while (!Thread.currentThread().isInterrupted()) {
                            executeJob();
                        }
                    } finally {
                        shutdownLatch.countDown();
                    }

                }

                @Override
                public void stateChanged(CuratorFramework curatorFramework, org.apache.curator.framework.state.ConnectionState connectionState)  {

                    if (connectionState == org.apache.curator.framework.state.ConnectionState.SUSPENDED
                            || connectionState == org.apache.curator.framework.state.ConnectionState.LOST) {
                        System.out.println("I am not the leader. Waiting...");
                        // isLeader = false;
                    } else if (connectionState == ConnectionState.CONNECTED) {
                        System.out.println("NEW CONNECTION!!!");


                    }
                }

            });


            leaderSelector.setId(PROGRAM_INSTANCE_NAME);
            leaderSelector.autoRequeue();

            if (isDefaultLeader) {
                leaderSelector.requeue();
            }
            leaderSelector.start();


            shutdownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            client.close();
        }
    }

    private static void executeJob() {
        try {
            Thread.sleep(5000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Executing the custom job as a leader.");

    }
}
0

There are 0 best solutions below