1. Introduction to distributed locks
Distributed locks are mainly used to protect shared resources across processes, across hosts, and across networks in a distributed environment to achieve mutually exclusive access to ensure data consistency.
2. Architecture introduction
Before introducing the use of Zookeeper to implement distributed locks, first look at the current system architecture diagram
Explanation: The entire area on the left represents a Zookeeper cluster. Locker is a persistent node of Zookeeper, and node_1, node_2, and node_3 are temporary sequential nodes under the persistent node of locker. client_1, client_2, client_n means multiple clients, and Service means shared resources that require mutually exclusive access.
Ideas for the acquisition of distributed locks
1. Overall idea of obtaining distributed locks
When acquiring a distributed lock, create a temporary sequential node under the locker node, and delete the temporary node when releasing the lock. The client calls the createNode method to create temporary sequential nodes under locker, and then calls getChildren("locker") to get all child nodes under locker. Note that no Watcher is required at this time. After the client obtains all child node paths, if it finds that the child node number it created before is the smallest, it is considered that the client has obtained the lock. If you find that the node you created is not the smallest among all the children of the locker, it means that you have not obtained the lock. At this time, the client needs to find the node smaller than it, and then call the exist() method on it, and register the event listener on it. After that, if the node you are concerned is deleted, the client's Watcher will receive the corresponding notification. At this time, you will determine again whether the node you created is the smallest serial number among the locker child nodes. Rugao has obtained the lock. If not, repeat the above steps to continue to obtain a node smaller than you and register to listen. There are still many logical judgments required in the current process.
2. The core algorithm process for acquiring distributed locks
The following is the same flowchart to analyze the complete algorithm for acquiring distributed locks, as follows:
Explanation: When client A wants to acquire a distributed lock, first create a temporary sequential node (node_n) under locker, and then immediately obtain all (first-level) child nodes under locker.
At this time, because multiple clients will compete for locks at the same time, the number of child nodes under locker will be greater than 1. For sequential nodes, the characteristic is that there is a numeric number after the node name. The number number of the node created first is smaller than the one created later. Therefore, the child nodes can be sorted from small to large in the order of the suffix of the node name. In this way, the first one is the sequential node created first. At this time, it represents the client that first strives for the lock! At this time, determine whether the smallest node is the node_n created by client A before. If so, it means that client A has acquired the lock. If not, it means that the lock has been acquired by other clients. Therefore, client A has to wait for it to release the lock, that is, the client B that has acquired the lock deletes the node it created.
At this time, we will know whether client B has released the lock by listening to the deletion event of the sequential nodes smaller than node_n times. If so, client A acquires all the children under locker again and compares them with the node_n nodes created by itself until the node_n created by itself is the smallest sequence number among all the children of locker, which means that client A has acquired the lock!
4. Code implementation of distributed locks based on Zookeeper
1. Define a distributed lock interface
The defined distributed lock interface is as follows:
public interface DistributedLock { /**Acquire the lock, if it is not obtained, wait */ public void acquire() throws Exception; /** * Acquire the lock until timeout* @param time timeout time* @param unit unit unit time parameter unit* @return Whether the lock is obtained* @throws Exception */ public boolean acquire (long time, TimeUnit unit) throws Exception; /** * Release the lock* @throws Exception */ public void release() throws Exception;}2. Define a simple mutex
Define a mutex lock class, implement the lock interface defined above, and inherit a base class BaseDistributedLock. This base class is mainly used to interact with Zookeeper, including a method to try to acquire the lock and a release lock.
/**The specific implementation of the lock interface is mainly achieved by the inherited parent class BaseDistributedLock. The parent class is implemented based on the specific details of Zookeeper to implement distributed locks*/public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock { /* Used to save the node that implements distributed locks in Zookeeper, such as the name is locker: /locker, *This node should be a persistent node. Create temporary sequential nodes under this node to implement distributed locks*/ private final String basePath; /*Lock name prefix. For example, the sequential nodes created under locker start with lock-, which facilitates filtering of irrelevant nodes*The created nodes are similar to: lock-00000001, lock-00000002*/ private staticfinal String LOCK_NAME ="lock-"; /* Used to save the successful sequential nodes created by a client under locker, for subsequent related operations (such as judgment)*/ private String ourLockPath; /** * Used to acquire lock resources, and obtain locks through the parent class's lock acquisition method* @param time to acquire the timeout time of the lock* @param unit time unit time* @return Whether the lock is obtained* @throws Exception */ private boolean internalLock (long time, TimeUnit unit) throws Exception { //If ourLockPath is not empty, it is considered that the lock has been obtained. For details, please refer to the implementation of attemptLock. OurLockPath = attemptLock(time, unit); return ourLockPath !=null; } /** * Pass in the Zookeeper client connection object, and basePath * @param client Zookeeper client connection object* @param basePath basePath is a persistent node*/ public SimpleDistributedLockMutex(ZkClientExt client, String basePath){ /*Calling the constructor of the parent class creates a basePath node in Zookeeper, and sets a prefix for the basePath node child node *Save the reference of basePath to the current class attribute */ super(client,basePath,LOCK_NAME); this.basePath = basePath; } /**Acquiring the lock until the timeout, and an exception is thrown after the timeout*/ public void acquire() throws Exception { //-1 means that the timeout is not set, and the timeout is determined by Zookeeper if (!internalLock(-1,null)){ throw new IOException("Connection is lost! Cannot obtain the lock under the path:'"+basePath+"'!"); } } /** * Acquire the lock with timeout*/ public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } /**Release the lock*/ public void release()throws Exception { releaseLock(ourLockPath); }}3. Details of the implementation of distributed locks
The key logic for acquiring distributed locks is BaseDistributedLock, which implements the details of implementing distributed locks based on Zookeeper.
public class BaseDistributedLock { private final ZkClientExt client; private final String path; private final String basePath; private final String lockName; private static final Integer MAX_RETRY_COUNT = 10; public BaseDistributedLock(ZkClientExt client, String path, String lockName){ this.client = client; this.basePath = path; this.path = path.concat("/").concat(lockName); this.lockName = lockName; } private void deleteOurPath(String ourPath) throws Exception{ client.delete(ourPath); } private String createLockNode(ZkClient client, String path) throws Exception{ return client.createEphemeralSequential(path, null); } /** * Core method for obtaining lock* @param startMillis * @param millisToWait * @param ourPath * @return * @throws Exception */ private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{ boolean haveTheLock = false; boolean doDelete = false; try{ while ( !haveTheLock ) { //This method implements the acquisition of all sequential nodes under the locker node, and sorts from small to large List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length()+1); //Calculate the sorting position of the order nodes created by the client just now in all child nodes of the locker. If the sort is 0, it means that the lock has been obtained. int ourIndex = children.indexOf(sequenceNodeName); /*If the [temporary] order node I created before was not found in getSortedChildren, this means that the node we created may be deleted due to a network flash break. The exception needs to be thrown. Let the previous level handle the *The previous level is to catch the exception and execute the specified number of retrys. See the attemptLock method in the subsequent attemptLock method*/ if ( ourIndex<0 ){ throw new ZkNoNodeException("Not found: " + sequenceNodeName); } //If the node created by the current client is greater than 0 in the locker child node list, it means that other clients have acquired the lock//At this time, the current client needs to wait for other clients to release the lock, boolean isGetTheLock = ourIndex == 0; //How to determine whether other clients have released the lock? Get the node smaller than itself from the child node list and set up a listening session for it String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1); if ( isGetTheLock ){ haveTheLock = true; }else{ //If the smaller node is deleted, it means that the node of the current client should be the smallest, so use CountDownLatch to realize waiting String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch ); final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener previousListener = new IZkDataListener() { //When a small node deletion event occurs, let countDownLatch end and wait//At this time, you need to let the program return to while again and make a new judgment! public void handleDataDeleted(String dataPath) throws Exception { latch.countDown(); } public void handleDataChange(String dataPath, Object data) throws Exception { // ignore } }; try{ //Exception if the node does not exist client.subscribeDataChanges(previousSequencePath, previousListener); if ( millisToWait != null ){ millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ){ doDelete = true; // timed out - delete our node break; } latch.await(millisToWait, TimeUnit.MICROSECONDS); }else{ latch.await(); } } catch ( ZkNoNodeException e ){ //ignore } finally{ client.unsubscribeDataChanges(previousSequencePath, previousListener); } } } } } catch ( Exception e ){ //Exception needs to be deleted doDelete = true; throw e; } finally{ //If you need to delete node if ( doDelete ){ deleteOurPath(ourPath); } } return haveTheLock; } private String getLockNodeNumber(String str, String lockName) { int index = str.lastIndexOf(lockName); if ( index >= 0 ){ index += lockName.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } private List<String> getSortedChildren() throws Exception { try{ List<String> children = client.getChildren(basePath); Collections.sort( children, new Comparator<String>(){ public int compare(String lhs, String rhs){ return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName)); } } ); return children; } catch(ZkNoNodeException e){ client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception{ deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception{ final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; //Net flash break requires retry while ( !isDone ){ isDone = true; try{ //createLockNode is used to create the [temporary] order node for the client to acquire the lock under locker (basePath persistent node). OurPath = createLockNode(client, path); /** * This method is used to determine whether the lock has been obtained, that is, whether the order nodes created by yourself are the smallest among all child nodes of the locker* If the lock is not acquired, wait for the release of the lock to be released, and try again later until the lock is acquired or timed out*/ hasTheLock = waitToLock(startMillis, millisToWait, ourPath); }catch ( ZkNoNodeException e ){ if ( retryCount++ < MAX_RETRY_COUNT ){ isDone = false; }else{ throw e; } } } } if ( hasTheLock ){ return ourPath; } return null; }The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.