We use Java 3.4.6 version in this example. The example is convenient for everyone to discuss if you don’t understand after learning it.
The ZooKeeper Java binding for developing applications is mainly composed of two Java packages:
org.apache.zookeeper
org.apache.zookeeper.data
The org.apache.zookeeper package consists of the interface definition monitored by ZooKeeper and various callback handlers for ZooKeeper. It defines the main classes of the ZooKeeper client class library as well as static definitions of many ZooKeeper event types and states. The org.apache.zookeeper.data package defines features related to data registers (also known as znodes), such as access control lists (ACLs), IDs, stats, etc.
The org.apache.zookeeper.server, org.apache.zookeeper.server.quorum and org.apache.zookeeper.server.upgrade packages in the ZooKeeper Java API are part of the server implementation. The org.apache.zookeeper.client package is used to query the status of the ZooKeeper server.
Prepare for the development environment
Apache ZooKeeper is a complex software, so it requires running many other class libraries. The dependency library is included in the lib directory as a jar file in the ZooKeeper distribution. The core ZooKeeper jar file name is zookeeper-3.4.6.jar, located in the home directory.
To develop a Java ZooKeeper application, we must set the classpath to the ZooKeeper jar, and all third-party libraries that ZooKeeper depends on. There is a zkEnv.sh file in the bin directory, which can be used to set CLASSPATH.
We need to set the script as follows and execute the following statement on the command line:
$ ZOOBINDIR=${ZK_HOME}/bin$ source ${ZOOBINDIR}/zkEnv.shThe shell variable ZK_HOME is set to the path to install ZooKeeper, in my settings it is /usr/share/zookeeper. After that, the CLASSPATH variable is set correctly, in my system like this:
$ echo $CLASSPATH /usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:
In Windows operating system, you need to run the zkEnv.cmd script. You can now use the CLASSPATH variable to compile and run Java programs written using the ZooKeeper API. The zkEnv.sh script can be found in the .bashrc file of the home directory in Uni/Linux to avoid using it every time the shell session is started.
The second first ZooKeeper program
To introduce the ZooKeeper Java API, let's start with a very simple program that can connect to the ZooKeeper instance in localhost and if the connection is successful, it will print a list of znodes under the root path of the ZooKeeper namespace.
The code for this program is as follows:
/*Our First ZooKeeper Program*/import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooKeeper;public class HelloZooKeeper {public static void main(String[] args) throws IOException {String hostPort = "localhost:2181";String zpath = "/";List <String> zooChildren = new ArrayList<String>();ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);if (zk != null) {try {zooChildren = zk.getChildren(zpath, false);System.out.println("Znodes of '/': ");for (String child: zooChildren) {//print the childrenSystem.out.println(child);}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}}Before building and executing the previous code snippet, let's see what it does specifically. The code starts with the import statement. Using these statements, we import the packages required by each component of the program. As mentioned earlier, the org.apache.zookeeper package contains all the classes and interfaces required by the client to interact with the ZooKeeper server. After importing the package, a class named HelloZooKeeper is defined. Since we are connecting to the ZooKeeper instance running in the same system, define the host and port strings as localhost:2181 in the main method. The line of code zk = new ZooKeeper(hostPort, 2000, null) calls the ZooKeeper constructor, which attempts to connect to the ZooKeeper server and returns a reference. For client programs that connect to the ZooKeeper server instance and maintain that connection, a real-time session is required. In this example, the reference returned by the zk object instantiated by the constructor represents the session. The ZooKeeper API is built around this reference, and each method call requires a reference to execute.
The constructor of the ZooKeeper class uses the following code to create a reference to the ZooKeeper instance:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
The parameters used are as follows:
connectString: Comma-separated host: a list of port numbers, each corresponding to a ZooKeeper server. For example, 10.0.0.1:2001, 10.0.0.2:2002 and 10.0.0.3:2003 represent valid host:port matching pairs for ZooKeeper ensembles of three nodes. sessionTimeout: This is the session timeout in milliseconds. This is the time when ZooKeeper did not get a heartbeat from the client before announcing the end of the session. watcher: A watcher object that is notified if created when state changes and node events occur. This watcher object needs to be created separately through a user-defined class, which implements the Watcher interface and passes the instantiated object to the ZooKeeper constructor. Client applications can receive notifications of various types of events, such as missing connections, expiration of sessions, etc.
The ZooKeeper Java API defines an additional constructor with three parameters to specify more advanced operations. The code is as follows:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
In the above constructor of the ZooKeeper class, if set to true, the boolean canBeReadOnly parameter allows the created client to enter read-only mode in the case of network partition. Read-only mode is a scenario where the client cannot find any majority of servers, but there is a accessible partition server to connect to it in read-only mode, which allows read requests to the server, while write requests are not allowed. The client continues to try to connect to most servers in the background while still remaining read-only. A partition server is just a subset of the ZooKeeper group, which is formed due to network allocation in the cluster. Most servers make up most of the quorums in ensemble.
The following constructor shows the definition of two additional parameters:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
This constructor allows the ZooKeeper client object to create two additional parameters:
sessionId: In the case where the client reconnects to the ZooKeeper server, a specific session ID can be used to refer to the previously connected session sessionPasswd: If the specified session requires a password, you can specify it here
The following constructor is a combination of the first two calls:
ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)
This constructor is a combination of the first two calls, allowing reconnection to the specified session with read-only mode enabled.
Note
The detailed Java API documentation for the ZooKeeper class can be queried at http://zookeeper.apache.org/doc/r3.4.6/api/index.html.
Now, back to our ZooKeeper program. After calling the constructor, if the connection is successful, we will get a reference to the ZooKeeper server. We pass the reference to the getChildren method through the following code:
zooChildren = zk.getChildren(zpath, false)
The getChildren(String path, boolean watch) method of the ZooKeeper class returns the list of children of znode on the given path. We just iterate over the list returned by this method and print the string to the console.
Name the program HelloZooKeeper.java and compile our program as follows:
$ javac -cp $CLASSPATH HelloZooKeeper.java
Before we run the program, we need to start the ZooKeeper server instance using the following command:
$ ${ZK_HOME}/bin/zkServer.sh startRun the program as follows:
$ java -cp $CLASSPATH HelloZooKeeper
The executor will print a log message on the console, displaying the ZooKeeper version, Java version, Java classpath, server architecture, etc. used. Here are some of these log messages:
Log messages generated by the ZooKeeper Java API are very useful for debugging. It provides us with information about the client connecting to the ZooKeeper server, establishing a session and other background. The last three log messages shown above tell us how the client starts the connection using the parameters specified in the program, and how the server assigns the session ID to the client after a successful connection.
Finally, the program execution finally outputs the following in the console:
We can use the ZooKeeper shell to verify the correctness of the program:
$ $ZK_HOME/bin/zkCli.sh -server localhost
Congratulations! We just successfully wrote our first ZooKeeper client program.
Second, implement the Watcher interface
ZooKeeper Watcher monitoring enables clients to receive notifications from ZooKeeper servers and handle these events when they occur. The ZooKeeper Java API provides a public interface called Watcher that the client event handler class must implement to receive event notifications about events from the ZooKeeper server. Programmatically, applications using such clients handle these events by registering callback objects with the client.
We will implement the Watcher interface to handle events generated by ZooKeeper when data associated with znode is changed.
The Watcher interface is declared as follows in the org.apache.zookeeper package:
public interface Watcher {void process(WatchedEvent event);}To demonstrate the znode data monitor (Watcher), there are two Java classes: DataWatcher and DataUpdater. The DataWatcher will run all the time and listen for NodeDataChange events from the ZooKeeper server in the /MyConfig specified znode path. The DataUpdater class will periodically update the data fields in this znode path, which will generate events, and upon receiving these events, the DataWatcher class will print the changed data to the console.
The following is the code of the DataWatcher.java class:
import java.io.IOException;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;public class DataWatcher implements Watcher, Runnable {private static String hostPort = "localhost:2181";private static String zooDataPath = "/MyConfig";byte zoo_data[] = null;ZooKeeper zk;public DataWatcher() {try {zk = new ZooKeeper(hostPort, 2000, this);if (zk != null) {try {//Create the znode if it doesn't exist, with the following code:if (zk.exists(zooDataPath, this) == null) {zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}} catch (IOException e) {e.printStackTrace();}} public void printData() throws InterruptedException, KeeperException {zoo_data = zk.getData(zooDataPath, this, null);String zString = new String(zoo_data);// The Following code prints the current content of the znode to the console:System.out.printf("/nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);}@Overridepublic void process(WatchedEvent event) {System.out.printf("/nEvent Received: %s", event.toString());//We will process only events of type NodeDataChangedif (event.getType() == Event.EventType.NodeDataChanged) {try {printData();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}} public static void main(String[] args)throws InterruptedException, KeeperException {DataWatcher dataWatcher = new DataWatcher();dataWatcher.printData();dataWatcher.run();}public void run() {try {synchronized (this) {while (true) {wait();}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}}Let's take a look at the code of the DataWatcher.java class to understand the implementation of a ZooKeeper monitor. The DataWatcher public class implements the Watcher interface and the Runnable interface, and intends to run the monitor as a thread. The main method creates an instance of the DataWatcher class. In the previous code, the DataWatcher constructor attempts to connect to a ZooKeeper instance running on the localhost. If the connection is successful, we use the following code to check whether the znode path/MyConfig exists:
if (zk.exists(zooDataPath, this) == null) {If the znode does not exist in the ZooKeeper namespace, the exists method call returns null and try to create it as a persistent znode using the code as follows:
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Next is the process method, which is declared in the Watcher interface of org.apache.ZooKeeper and implemented by the DataWatcher class using the following code:
public void process(WatchedEvent event) {For simplicity, in the process method, the events received from the ZooKeeper instance are printed and only events of type NodeDataChanged are further processed as follows:
if (event.getType() == Event.EventType.NodeDataChanged)
When any update or change occurs in the data field of the znode path/MyConfig and an event of type NodeDataChanged is received, the printData method is called to print the current content of the znode. When executing a getData call on znode, we set a monitor again, which is the second parameter of the method, as shown in the following code:
zoo_data = zk.getData(zooDataPath, this, null);
Monitoring events are one-time triggers sent to clients that set monitoring. In order to continuously receive further event notifications, clients should reset the monitor.
DataUpdater.java is a simple class that connects to the ZooKeeper instance running the localhost and updates the data field of the znode path/MyConfig with a random string. Here we choose to update the znode with a universal unique identifier (UUID) string, because subsequent UUID generator calls will guarantee the generation of unique strings.
The DataUpdater.java class code is as follows:
import java.io.IOException;import java.util.UUID;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;public class DataUpdater implements Watcher {private static String hostPort = "localhost:2181";private static String zooDataPath = "/MyConfig";ZooKeeper zk;public DataUpdater() throws IOException {try {zk = new ZooKeeper(hostPort, 2000, this);} catch (IOException e) {e.printStackTrace();}}// updates the znode path /MyConfig every 5 seconds with a new UUID string.public void run() throws InterruptedException, KeeperException {while (true) {String uuid = UUID.randomUUID().toString();byte zoo_data[] = uuid.getBytes();zk.setData(zooDataPath, zoo_data, -1);try {Thread.sleep(5000); // Sleep for 5 secs} catch(InterruptedException e) {Thread.currentThread().interrupt();}}} public static void main(String[] args) throwsIOException, InterruptedException, KeeperException {DataUpdater dataUpdater = new DataUpdater();dataUpdater.run();}@Overridepublic void process(WatchedEvent event) {System.out.printf("/nEvent Received: %s", event.toString());}}The above code causes the ZooKeeper server to trigger a NodeDataChanged event. Since DataWatcher sets monitoring for this znode path, it receives notifications of data change events. It then retrieves the updated data, resets the monitoring, and prints the data on the console.
Use the following command to compile the DataWatcher and DataUpdater classes:
$ javac cp $CLASSPATH DataWatcher.java$ javac cp $CLASSPATH DataUpdater.java
To execute the monitor and update program, two terminal windows need to be opened. I want to run the monitor first because it creates the znode of /MyConfig (if it has not been created in the ZooKeeper namespace). Before running the monitor, make sure that the ZooKeeper server is running on the local host.
In one of the terminal windows, execute the watcher class by running the following command:
$ java cp $CLASSPATH DataWatcher
Output a message similar to the one shown in the following screenshot:
As shown in the previous screenshot, the znode path/MyConfig is created by the DataWatcher class. It also prints the contents of the znode, but not in the console, because we don't set any data when creating the znode. When znode is created, the monitor in the class receives an event notification of type NodeCreated, which is printed in the console. The DataWatcher class continues to run and listens for events on the /MyConfig node from the ZooKeeper server.
Let's run the DataUpdater class in another terminal window:
$ java -cp $CLASSPATH DataUpdater
After logging the initial ZooKeeper-specific log message to the console, the DataUpdater class runs without prompting. It sets a new UUID string into the data field of the ZooKeeper path/MyConfig. So, see that every 5 seconds, the output displayed in the screenshot below is printed in the terminal window running DataWatch:
DataWatcher can also be tested using the ZooKeeper shell. Continue to run the DataWatcher class in the terminal as before, and call the ZooKeeper shell in another terminal and run the command shown in the following screenshot:
In the terminal where DataWatcher is running, the following message is printed:
Three examples - cluster monitor
Popular services provided through the Internet, such as email, file service platforms, online games, etc., are served by hundreds or thousands of servers that are highly available across multiple data centers, which are often geographically separated. In such a cluster, some dedicated server nodes are set up to monitor the activity of the servers that host services or applications in the production network. In a cloud computing environment, such monitoring nodes that are also used to manage the cloud environment are called cloud controllers. An important job of these controller nodes is to detect failures in production servers in real time and notify administrators accordingly, and take necessary measures such as failing over applications on the failed server to another server, ensuring fault tolerance and high availability.
In this section, we will use the ZooKeeper Java client API to develop a minimalist distributed cluster monitor model. Using ZooKeeper's ephemeral znode concept to build this monitoring model is quite simple and elegant, as described in the following steps:
Each production server runs a ZooKeeper client as a daemon. This process connects to the ZooKeeper server and creates an ephemeral znode with a name (preferably its network name or hostname) under the predefined path of the /ZooKeeper namespace (such as /Members). The cloud controller node runs the ZooKeeper monitor process, which monitors paths/Members and listens for events of type NodeChildrenChanged. This monitor process runs as a service or daemon, and sets or resets monitoring on the path, and implements its logic to call appropriate modules to take necessary actions for monitoring events. Now, if the production server is shut down due to a hardware failure or a software crash, the ZooKeeper client process is terminated, causing the session between the server and the ZooKeeper service to be terminated. Since the ephemeral znode's properties are unique, the ZooKeeper service will automatically delete the znode in the path/Members whenever the client connection is closed. The deletion of znode in the path raises the NodeChildrenChanged event, so the observer process in the cloud controller is notified. By calling the getChildren method in the path/Members, you can determine which server node has been closed. The controller node can then take appropriate measures, such as performing recovery logic to restart the failed service in another server. This logic can be built to work in real time, ensuring close to zero downtime and highly available services.
To implement this cluster monitoring model, we will develop two Java classes. The ClusterMonitor class will continuously run the monitor to monitor paths/Members in the ZooKeeper tree. After processing the raised event, we will print the znode list in the console and reset the monitoring. Another class ClusterClient will start the connection to the ZooKeeper server and create an ephemeral znode under /Members.
To simulate a cluster with multiple nodes, we start multiple clients on the same computer and create an ephemeral znode using the process ID of the client process. By viewing the process identity, the ClusterMonitor class can determine which client process has been closed and which processes are still there. In practical cases, client processes usually create ephemeral znode using the hostname of the currently running server. The source codes for these two classes are shown below.
The ClusterMonitor.java class is defined as follows:
import java.io.IOException;import java.util.List;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;public class ClusterMonitor implements Runnable {private static String membershipRoot = "/Members";private final Watcher connectionWatcher;private final Watcher childrenWatcher;private ZooKeeper zk;boolean alive=true;public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {connectionWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {System.out.printf("/nEvent Received: %s", event.toString());}}};childrenWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.printf("/nEvent Received: %s", event.toString());if (event.getType() == Event.EventType.NodeChildrenChanged) {try {//Get current list of child znode, //reset the watchList<String> children = zk.getChildren( membershipRoot, this);wall("!!Cluster Membership Change!!!");wall("Members: " + children);} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {Thread.currentThread().interrupt();alive = false;throw new RuntimeException(e);}}}};zk = new ZooKeeper(HostPort, 2000, connectionWatcher);// Ensure the parent znode existsif(zk.exists(membershipRoot, false) == null) {zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// Set a watch on the parent znodeList<String> children = zk.getChildren(membershipRoot, childrenWatcher);System.err.println("Members: " + children);}public synchronized void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}public void wall (String message) {System.out.printf("/nMESSAGE: %s", message);}public void run() {try {synchronized (this) {while (alive) {wait();}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {this.close();}}public static void main(String[] args) throws IOException, InterruptedException, KeeperException {if (args.length != 1) {System.err.println("Usage: ClusterMonitor <Host:Port>");System.exit(0);}String hostPort = args[0];new ClusterMonitor(hostPort).run();}}The ClusterClient.java class is defined as follows:
import java.io.IOException;import java.lang.management.ManagementFactory;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;public class ClusterClient implements Watcher, Runnable {private static String membershipRoot = "/Members";ZooKeeper zk;public ClusterClient(String hostPort, Long pid) {String processId = pid.toString();try {zk = new ZooKeeper(hostPort, 2000, this);} catch (IOException e) {e.printStackTrace();}if (zk != null) {try {zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}} public synchronized void close() {try {zk.close();}catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void process(WatchedEvent event) {System.out.printf("/nEvent Received: %s", event.toString());}public void run() {try {synchronized (this) {while (true) {wait();}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {this.close();}}public static void main(String[] args) {if (args.length != 1) {System.err.println("Usage: ClusterClient <Host:Port>");System.exit(0);}String hostPort = args[0];//Get the process idString name = ManagementFactory.getRuntimeMXBean().getName();int index = name.indexOf('@');Long processId = Long.parseLong(name.substring(0, index));new ClusterClient(hostPort, processId).run();}}Use the following command to compile these two classes:
$ javac -cp $CLASSPATH ClusterMonitor.java$ javac -cp $CLASSPATH ClusterClient.java
To execute the cluster monitoring model, open two terminals. In one of the terminals, run the ClusterMonitor class. In another terminal, multiple instances are executed by running the ClusterClient class in the background.
In the first terminal, execute the ClusterMonitor class:
$ java -cp $CLASSPATH ClusterMonitorlocalhost:2181
As shown in the previous example, you see the debug log message from the client API. Finally, the ClusterMonitor class starts monitoring the event and enter the following content:
Now execute five instances of the ClusterClient class to simulate five nodes of a cluster. ClusterClient creates an ephemeral znode using its own process ID in the /Members path of the ZooKeeper tree:
$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &[1] 4028$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &[2] 4045$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &[3] 4057$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &[4] 4072$ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &[5] 4084
Corresponding to this, it will be observed that the ClusterMonitor class detects these new ClusterClient class instances, as it is monitoring events on the /Members path of the ZooKeeper tree. This simulates a node joining event in a real cluster. The output can be seen in the terminal of the ClusterMonitor class, which is similar to what is shown in the screenshot below:
Now, if a ClusterClient.java process is killed, the session it maintains with the ZooKeeper server will be terminated. Therefore, the ephemeral znode created by the client will be deleted. Deletion will trigger the NodeChildrenChanged event, which will be captured by the ClusterMonitor class. This simulates a scenario where a node leaves in the cluster.
Let's terminate the ClusterClient process with ID 4084:
$ kill -9 4084
The following screenshot shows the output in the terminal of the ClusterMonitor class. It lists the currently available processes and their process IDs that emulate the real-time server:
The example implementation of the simple and elegant cluster monitoring model above demonstrates the true power of ZooKeeper. Without ZooKeeper, developing such a model that can monitor node activity in real time would be a real daunting task.