Implementing Leader Election with Zookeeper

Update: An alternative implementation can be found in my next blogpost: Single file leader election with retry logic.

In the following post I will demonstrate how to create a distributed application where one of the running processes gets elected as the master process with Apache Zookeepers help. The application is very straightforward, a single process writes a unique identifier to a file. Multiple of these applications can be started but only one is allowed to write to the output file. In case of failure a new leader will be elected.

(For a brief explanation on how Apache Zookeeper works see my previous post)

Recipe

To implement this I followed a recipe from the Apache Zookeeper website, which suggest the following algorithm for leader election:

Let ELECTION be a path of choice of the application. To volunteer to be a leader:

  1. Create znode z with path “ELECTION/n_” with both SEQUENCE and EPHEMERAL flags;
  2. Let C be the children of “ELECTION”, and i be the sequence number of z;
  3. Watch for changes on “ELECTION/n_j”, where j is the smallest sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:

  1. Let C be the new set of children of ELECTION;
  2. If z is the smallest node in C, then execute leader procedure;
  3. Otherwise, watch for changes on “ELECTION/n_j”, where j is the smallest sequence number such that j < i and n_j is a znode in C;

Prerequisites

  • Object representing the Zookeeper server (or the connection to it)
  • Object implementing the Watcher interface: Something the Zookeeper server can send messages to
  • A running program: main() {while(true) doTask()}
  • A running instance of Zookeeper
The whole thing will run on localhost.

Querying Zookeeper

Before we start a quick overview how to query Zookeeper so you can see what’s going on:
The Zookeeper stack contains a command line tool which you can use to connect to a Zookeeper server and query it. It’s kind of similar to mysql client connecting to mysqld. To start the client connecting to a Zookeeper server running on localhost:
./zkCli.sh  -server localhost:2181
And here a list of useful commands to query the data tree, it has tab completion to make life easier:
  • create [path] [data] – creates a znode:
    create /test test_data
  • ls [path] – print a path:
    ls /test
  • stat [path] – get the metadat of a specific znode:
    stat /test
  • delete [path] – deletes a znode
    delete /test

Running the Code

The whole application is available on GitHub 
It’s a maven project and can be build with the following command:
mvn clean install
And can be run from the command line or within your favourite IDE.
Once you’ve started Zookeeper and the Speaker Server you can see whats happening in Zookeeper by issuing the following command in zkCLI:

ls /ELECTION

It will list the nodes created by every instance run.

Design

The whole application exists out of three classes. The first class, the Speaker, does the work and writes it’s process id and an incremental counter to a file (out.txt). It writes to the file if the canSpeak flag is set to true (defaults to false).
The second class is a monitoring class (NodeMonitor). It talks a-synchronous to the Zookeeper server and depending on the response and the state of the connection decides if the Speaker has to start or stop writing to the output file.
The third class is is the server. It instantiates the Speaker and the NodeMonitor, adds the Speaker as a listener to the NodeMonitor and start the whole thing up in a separate fixed scheduled thread. The scheduled thread will run the Speaker every second (1 second is the default but can be specified on the command line).

How it works

Starting the thread:

(SpeakerServer.java)
Speaker speaker = null;
try {
    speaker = new Speaker(args[0]);
    monitor = new NodeMonitor();
    monitor.setListener(speaker);
} catch (Exception e) {
    e.printStackTrace();
    System.exit(1);
}

scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);

First the Speaker (which implements the Runnable interface) gets set as a listener on the monitor. This gives the monitor, who talks talks with the Zookeeper a callback point.
The Monitor defines an interface which a listener (in this case the Speaker) has to implement, so the monitor knows how to talk back to the listener:
(NodeMonitor.java)
public interface NodeMonitorListener {
    public void startSpeaking();
    public void stopSpeaking();
    public String getProcessName();
}
In the NodeMonitor constructor we create a Zookeeper instance:
(NodeMonitor.java):
public NodeMonitor() throws IOException, InterruptedException, KeeperException {
        this.zooKeeper = new ZooKeeper("localhost:2181", 3000, this);
}
After connecting to the Zookeeper server a SyncConnected event get sent back to the Watcher (NodeMonitor implementing Watcher interface) via the process method:
(NodeMonitor.java)
 

public void processNoneEvent(WatchedEvent event) {
    switch (event.getState()) {
        case SyncConnected:
            try {
                createRootIfNotExists();
                zooKeeper.getChildren(ROOT, true, this, null);
                sequenceNumber = createZnode();
...
If the root znode doesn’t exists it gets created and after that we set a watch on the root node (/ELECTION) via zooKeeper.getChildren() method. We pass ourself to this method so ZooKeeper can do a asynchronous callback via the processResult() method. After that we create the znode underneath /ELECTION representing this running instance.
The essence of the a-synchronous callback method use by zookeeper to signal any changes on the children of the watched node:
(NodeMonitor.java)
@Override
public void processResult(int i, String s, Object o, List children) {
....
    if(getLowestNumber(children) == sequenceNumber)
        listener.startSpeaking();
    else
        listener.stopSpeaking();
....
}
If the lowest sequence number equals its own sequence number, the server become the leader and will start talking.
The NodeMonitor implements the AsyncCallback.ChildrenCallback interface. This because the watch has been set via the getChildren() method. Every method who sets a watch has it’s own callback method. E.g. if you use exists() as the method to set the watch you need the AsyncCallback.StatCallback.
It’s a very straight forward application and I hope it clarifies to workings of Zookeeper and how to implement it in your Java application. Any questions, ping me and I’ll be more than  happy to answer your questions.
Advertisements

4 thoughts on “Implementing Leader Election with Zookeeper

  1. Pingback: Zookeeper Single File Leader Election with Retry Logic « Cyberroadie’s Weblog

  2. Nodir

    Hi,

    I get ClassNotFoundException when I run Speaker Server as “mvn exec:java -Dexec.mainClass=”SpeakerServer” How to run it correctly?

    Thanks,
    Nodir.

  3. Nodir

    I figured this out by myself. Solution is:
    $ mvn exec:java -Dexec.mainClass=”net.spike.zookeeper.SpeakerServer” -Dexec.args=”argument1_message 3000″

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s