Code Gem: Retry Logic in HBase

Found this retry logic whilst reading HBase codebase. It encapsulates retry logic in what I think is an elegant manner. It keeps track of retries and the time between retries grows exponential.

This is how you can use it in your code:

public String doAndRetryThis() throws InterruptedException {
    RetryCounter retryCounter = retryCounterFactory.create();
    while (true) {
        try {
            return doThis();
        } catch (Exception e) {
            if (!retryCounter.shouldRetry()) {
                throw e;
           }
        }
        retryCounter.sleepUntilNextRetry();
        retryCounter.useRetry();
    }
}

And this is how it is implemented:


public class RetryCounter {
 private static final Log LOG = LogFactory.getLog(RetryCounter.class);
 private final int maxRetries;
 private int retriesRemaining;
 private final int retryIntervalMillis;
 private final TimeUnit timeUnit;

public RetryCounter(int maxRetries,
 int retryIntervalMillis, TimeUnit timeUnit) {
 this.maxRetries = maxRetries;
 this.retriesRemaining = maxRetries;
 this.retryIntervalMillis = retryIntervalMillis;
 this.timeUnit = timeUnit;
 }

public int getMaxRetries() {
 return maxRetries;
 }

/**
 * Sleep for a exponentially back off time
 * @throws InterruptedException
 */
 public void sleepUntilNextRetry() throws InterruptedException {
 int attempts = getAttemptTimes();
 long sleepTime = (long) (retryIntervalMillis * Math.pow(2, attempts));
 LOG.info("The " + attempts + " times to retry after sleeping " + sleepTime
 + " ms");
 timeUnit.sleep(sleepTime);
 }

public boolean shouldRetry() {
 return retriesRemaining > 0;
 }

public void useRetry() {
 retriesRemaining--;
 }

 public int getAttemptTimes() {
 return maxRetries-retriesRemaining+1;
 }
}

And it comes with its little Factory:

public class RetryCounterFactory {
 private final int maxRetries;
 private final int retryIntervalMillis;

public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
 this.maxRetries = maxRetries;
 this.retryIntervalMillis = retryIntervalMillis;
 }

public RetryCounter create() {
 return new RetryCounter(
 maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
 );
 }
}

Code can be found here and here

Implementing Leader Election with Zookeeper

Update: An alternative implementation can be found in my next blogpost: Single file leader election with retry logic.

In the following post I will demonstrate how to create a distributed application where one of the running processes gets elected as the master process with Apache Zookeepers help. The application is very straightforward, a single process writes a unique identifier to a file. Multiple of these applications can be started but only one is allowed to write to the output file. In case of failure a new leader will be elected.

(For a brief explanation on how Apache Zookeeper works see my previous post)

Recipe

To implement this I followed a recipe from the Apache Zookeeper website, which suggest the following algorithm for leader election:

Let ELECTION be a path of choice of the application. To volunteer to be a leader:

  1. Create znode z with path “ELECTION/n_” with both SEQUENCE and EPHEMERAL flags;
  2. Let C be the children of “ELECTION”, and i be the sequence number of z;
  3. Watch for changes on “ELECTION/n_j”, where j is the smallest sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:

  1. Let C be the new set of children of ELECTION;
  2. If z is the smallest node in C, then execute leader procedure;
  3. Otherwise, watch for changes on “ELECTION/n_j”, where j is the smallest sequence number such that j < i and n_j is a znode in C;

Prerequisites

  • Object representing the Zookeeper server (or the connection to it)
  • Object implementing the Watcher interface: Something the Zookeeper server can send messages to
  • A running program: main() {while(true) doTask()}
  • A running instance of Zookeeper
The whole thing will run on localhost.

Querying Zookeeper

Before we start a quick overview how to query Zookeeper so you can see what’s going on:
The Zookeeper stack contains a command line tool which you can use to connect to a Zookeeper server and query it. It’s kind of similar to mysql client connecting to mysqld. To start the client connecting to a Zookeeper server running on localhost:
./zkCli.sh  -server localhost:2181
And here a list of useful commands to query the data tree, it has tab completion to make life easier:
  • create [path] [data] – creates a znode:
    create /test test_data
  • ls [path] – print a path:
    ls /test
  • stat [path] – get the metadat of a specific znode:
    stat /test
  • delete [path] – deletes a znode
    delete /test

Running the Code

The whole application is available on GitHub 
It’s a maven project and can be build with the following command:
mvn clean install
And can be run from the command line or within your favourite IDE.
Once you’ve started Zookeeper and the Speaker Server you can see whats happening in Zookeeper by issuing the following command in zkCLI:

ls /ELECTION

It will list the nodes created by every instance run.

Design

The whole application exists out of three classes. The first class, the Speaker, does the work and writes it’s process id and an incremental counter to a file (out.txt). It writes to the file if the canSpeak flag is set to true (defaults to false).
The second class is a monitoring class (NodeMonitor). It talks a-synchronous to the Zookeeper server and depending on the response and the state of the connection decides if the Speaker has to start or stop writing to the output file.
The third class is is the server. It instantiates the Speaker and the NodeMonitor, adds the Speaker as a listener to the NodeMonitor and start the whole thing up in a separate fixed scheduled thread. The scheduled thread will run the Speaker every second (1 second is the default but can be specified on the command line).

How it works

Starting the thread:

(SpeakerServer.java)
Speaker speaker = null;
try {
    speaker = new Speaker(args[0]);
    monitor = new NodeMonitor();
    monitor.setListener(speaker);
} catch (Exception e) {
    e.printStackTrace();
    System.exit(1);
}

scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);

First the Speaker (which implements the Runnable interface) gets set as a listener on the monitor. This gives the monitor, who talks talks with the Zookeeper a callback point.
The Monitor defines an interface which a listener (in this case the Speaker) has to implement, so the monitor knows how to talk back to the listener:
(NodeMonitor.java)
public interface NodeMonitorListener {
    public void startSpeaking();
    public void stopSpeaking();
    public String getProcessName();
}
In the NodeMonitor constructor we create a Zookeeper instance:
(NodeMonitor.java):
public NodeMonitor() throws IOException, InterruptedException, KeeperException {
        this.zooKeeper = new ZooKeeper("localhost:2181", 3000, this);
}
After connecting to the Zookeeper server a SyncConnected event get sent back to the Watcher (NodeMonitor implementing Watcher interface) via the process method:
(NodeMonitor.java)
 

public void processNoneEvent(WatchedEvent event) {
    switch (event.getState()) {
        case SyncConnected:
            try {
                createRootIfNotExists();
                zooKeeper.getChildren(ROOT, true, this, null);
                sequenceNumber = createZnode();
...
If the root znode doesn’t exists it gets created and after that we set a watch on the root node (/ELECTION) via zooKeeper.getChildren() method. We pass ourself to this method so ZooKeeper can do a asynchronous callback via the processResult() method. After that we create the znode underneath /ELECTION representing this running instance.
The essence of the a-synchronous callback method use by zookeeper to signal any changes on the children of the watched node:
(NodeMonitor.java)
@Override
public void processResult(int i, String s, Object o, List children) {
....
    if(getLowestNumber(children) == sequenceNumber)
        listener.startSpeaking();
    else
        listener.stopSpeaking();
....
}
If the lowest sequence number equals its own sequence number, the server become the leader and will start talking.
The NodeMonitor implements the AsyncCallback.ChildrenCallback interface. This because the watch has been set via the getChildren() method. Every method who sets a watch has it’s own callback method. E.g. if you use exists() as the method to set the watch you need the AsyncCallback.StatCallback.
It’s a very straight forward application and I hope it clarifies to workings of Zookeeper and how to implement it in your Java application. Any questions, ping me and I’ll be more than  happy to answer your questions.

Quick Overview of Apache Zookeeper

ZooKeeper is a high available and reliable coordination system. Distributed applications use ZooKeeper to store and mediate updates key configuration information. ZooKeeper can be used for leader election, group membership, and configuration maintenance. In addition ZooKeeper can be used for event notification, locking, and as a priority queue mechanism. (source Zookeeper homepage)

Data storage in Zookeeper

To make Zookeeper a highly available and reliable coordination system, clients can store data in a hierarchical data structure in the same way a file system tree is built. This tree is only kept in memory and can handle up to 50,000 request per second. Zookeeper itself runs in a distributed fashion and the data is available in every running instance. As soon as a write request comes in the receiving Zookeeper will do an atomic broadcast to all the other Zookeepers. Data is guaranteed to never diverge.

Znodes, normal, ephemeral and sequence

Nodes in a Zookeeper tree are called znodes. A znode is a data (stat) struct with statistical data and a byte array to store user data. Zookeeper is not meant to be a database so there are checks to ensure znoeds have less than 1M of data. Here is an overview of the stats:
  • czxid – The zxid of the change that caused this znode to be created.
  • mzxid – The zxid of the change that last modified this znode.
  • crime – The time in milliseconds from epoch when this znode was created.
  • mime – The time in milliseconds from epoch when this znode was last modified.
  • version – The number of changes to the data of this znode.
  • coercion – The number of changes to the children of this znode.
  • aversion – The number of changes to the ACL of this znode.
  • ephemeralOwner – The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  • dataLength – The length of the data field of this znode.
  • numChildren – The number of children of this znode.
Besides the normal znode there is also another type of znode called ephemeral node. This node only exists as long as there is a connection to the client that created it. It disappears as soon as the connection is closed or lost. Ephemeral nodes can not have child nodes.
Every type of node can be created as a sequence node. A sequence node will have a number appended to the node name, every time a child node gets added the number increases by one. With this you can keep track which child nodes under a parent where created first.

Operations on znodes

Clients can create, modify, delete and ask information about nodes in the Zookeeper tree, the operations which can be done are very similar to the ones you can do on a file system.
Here is a list:
  • create – creates a node at a location in the tree
  • delete – deletes a node
  • exists – tests if a node exists at a location
  • get data – reads the data from a node
  • set data – writes data to a node
  • get children – retrieves a list of children of a node
  • sync – waits for data to be propagated

Watching znodes

Once the client is connected and nodes are created the client can set a watch on specific nodes in the tree. The moment these nodes change the Zookeeper server will send an event to the subscribed clients. Watches can be set with the following operations: getData(), getChildren() and exists. One thing to note: events will always arrive in order at the client.

Events

There are five event types, all self explanatory, except the last one:
  • Node children changed
  • Node created
  • Node data changed
  • Node deleted
  • None
The first four all have to do with events happening related to a watched node. The last one is special, it indicates a change in the state of the connection. When you get a None event check the keeper state to so whats going on. Possible values are:
  • AuthFailed – Auth failed state
  • Disconnected – The client is in the disconnected state – it is not connected to any server in the ensemble
  • Expired – The serving cluster has expired this session
  • SyncConnected – The client is in the connected state – it is connected to a server in the ensemble (one of the servers specified in the host connection parameter during ZooKeeper client creation)
As you can guess, this first thing your watcher will get once it connects is a None event with a SyncConnected state.

Conclusion

So far an overview of the main workings of Zookeeper. There is a lot more to it, I haven’t spoken about ACLs, master election, two phase commits, but I hope this overview will you get started.

Why I’m looking at Go!

I watched a Stanford University lecture with as guest speaker Rob Pike, who is a Principle Engineer at Google. He’s part of the team who developed the computer language Go!.

He made some interesting comments on why specific languages exists. First have a look at these statistics (source TIOBE index November 2011):
Types checked compile or runtime:

Category Ratings Nov 2011 Delta Nov 2010
Statically Typed Languages 63.4% +0.5%
Dynamically Typed Languages 36.6% -0.5%

Statically type languages on top. Clear winners. There is some discussion going round which suggest the TIOBE index is biased towards languages which are hard to program in since people ask more questions about it (number of results returned by a search engine is one of the criteria of the index). This of course assumes a statically type languages is de facto difficult.

An interesting point Rob Pike makes is that most languages build as a reaction to these hard to program statically typed languages happen to be dynamically typed. He says ‘Perhaps as a result, non-expert programmers have confused “ease of use” with interpretation and dynamic typing.’

‘Statically typed’ and ‘hard to program’ are not necessarily linked. They are two separate issues.

If you start using Go! the first thing you notice is the super fast compile time. The type checking is clearly not an issue. Even big programs compile almost instantly.

The implicit variables declaration system is easy too:
var x, y, z int
var c, python, java = true, false, “no!”
or within functions
lala := “i am a string”

There are lots of other reasons why Go! is an interesting language, but not having to deal with the tiresome static vs dynamic type discussion is (for me) a very good reason to have a look at this new language.