一、分佈式鎖介紹
分佈式鎖主要用於在分佈式環境中保護跨進程、跨主機、跨網絡的共享資源實現互斥訪問,以達到保證數據的一致性。
二、架構介紹
在介紹使用Zookeeper實現分佈式鎖之前,首先看當前的系統架構圖
解釋:左邊的整個區域表示一個Zookeeper集群,locker是Zookeeper的一個持久節點,node_1、node_2、node_3是locker這個持久節點下面的臨時順序節點。 client_1、client_2、client_n表示多個客戶端,Service表示需要互斥訪問的共享資源。
三、分佈式鎖獲取思路
1.獲取分佈式鎖的總體思路
在獲取分佈式鎖的時候在locker節點下創建臨時順序節點,釋放鎖的時候刪除該臨時節點。客戶端調用createNode方法在locker下創建臨時順序節點,然後調用getChildren(“locker”)來獲取locker下面的所有子節點,注意此時不用設置任何Watcher。客戶端獲取到所有的子節點path之後,如果發現自己在之前創建的子節點序號最小,那麼就認為該客戶端獲取到了鎖。如果發現自己創建的節點並非locker所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,然後對其調用exist()方法,同時對其註冊事件監聽器。之後,讓這個被關注的節點刪除,則客戶端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是locker子節點中序號最小的,如皋是則獲取到了鎖,如果不是則重複以上步驟繼續獲取到比自己小的一個節點並註冊監聽。當前這個過程中還需要許多的邏輯判斷。
2.獲取分佈式鎖的核心算法流程
下面同個一個流程圖來分析獲取分佈式鎖的完整算法,如下:
解釋:客戶端A要獲取分佈式鎖的時候首先到locker下創建一個臨時順序節點(node_n),然後立即獲取locker下的所有(一級)子節點。
此時因為會有多個客戶端同一時間爭取鎖,因此locker下的子節點數量就會大於1。對於順序節點,特點是節點名稱後面自動有一個數字編號,先創建的節點數字編號小於後創建的,因此可以將子節點按照節點名稱後綴的數字順序從小到大排序,這樣排在第一位的就是最先創建的順序節點,此時它就代表了最先爭取到鎖的客戶端!此時判斷最小的這個節點是否為客戶端A之前創建出來的node_n,如果是則表示客戶端A獲取到了鎖,如果不是則表示鎖已經被其它客戶端獲取,因此客戶端A要等待它釋放鎖,也就是等待獲取到鎖的那個客戶端B把自己創建的那個節點刪除。
此時就通過監聽比node_n次小的那個順序節點的刪除事件來知道客戶端B是否已經釋放了鎖,如果是,此時客戶端A再次獲取locker下的所有子節點,再次與自己創建的node_n節點對比,直到自己創建的node_n是locker的所有子節點中順序號最小的,此時表示客戶端A獲取到了鎖!
四、基於Zookeeper的分佈式鎖的代碼實現
1.定義分佈式鎖接口
定義的分佈式鎖接口如下:
public interface DistributedLock { /**獲取鎖,如果沒有得到就等待*/ public void acquire() throws Exception; /** * 獲取鎖,直到超時* @param time超時時間* @param unit time參數的單位* @return是否獲取到鎖* @throws Exception */ public boolean acquire (long time, TimeUnit unit) throws Exception; /** * 釋放鎖* @throws Exception */ public void release() throws Exception;}2.定義一個簡單的互斥鎖
定義一個互斥鎖類,實現以上定義的鎖接口,同時繼承一個基類BaseDistributedLock,該基類主要用於與Zookeeper交互,包含一個嘗試獲取鎖的方法和一個釋放鎖。
/**鎖接口的具體實現,主要藉助於繼承的父類BaseDistributedLock來實現的接口方法* 該父類是基於Zookeeper實現分佈式鎖的具體細節實現*/public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock { /*用於保存Zookeeper中實現分佈式鎖的節點,如名稱為locker:/locker, *該節點應該是持久節點,在該節點下面創建臨時順序節點來實現分佈式鎖*/ private final String basePath; /*鎖名稱前綴,locker下創建的順序節點例如都以lock-開頭,這樣便於過濾無關節點*這樣創建後的節點類似:lock-00000001,lock-000000002*/ private staticfinal String LOCK_NAME ="lock-"; /*用於保存某個客戶端在locker下面創建成功的順序節點,用於後續相關操作使用(如判斷)*/ private String ourLockPath; /** * 用於獲取鎖資源,通過父類的獲取鎖方法來獲取鎖* @param time獲取鎖的超時時間* @param unit time的時間單位* @return是否獲取到鎖* @throws Exception */ private boolean internalLock (long time, TimeUnit unit) throws Exception { //如果ourLockPath不為空則認為獲取到了鎖,具體實現細節見attemptLock的實現ourLockPath = attemptLock(time, unit); return ourLockPath !=null; } /** * 傳入Zookeeper客戶端連接對象,和basePath * @param client Zookeeper客戶端連接對象* @param basePath basePath是一個持久節點*/ public SimpleDistributedLockMutex(ZkClientExt client, String basePath){ /*調用父類的構造方法在Zookeeper中創建basePath節點,並且為basePath節點子節點設置前綴*同時保存basePath的引用給當前類屬性*/ super(client,basePath,LOCK_NAME); this.basePath = basePath; } /**獲取鎖,直到超時,超時後拋出異常*/ public void acquire() throws Exception { //-1表示不設置超時時間,超時由Zookeeper決定if (!internalLock(-1,null)){ throw new IOException("連接丟失!在路徑:'"+basePath+"'下不能獲取鎖!"); } } /** * 獲取鎖,帶有超時時間*/ public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } /**釋放鎖*/ public void release()throws Exception { releaseLock(ourLockPath); }}3. 分佈式鎖的實現細節
獲取分佈式鎖的重點邏輯在於BaseDistributedLock,實現了基於Zookeeper實現分佈式鎖的細節。
public class BaseDistributedLock { private final ZkClientExt client; private final String path; private final String basePath; private final String lockName; private static final Integer MAX_RETRY_COUNT = 10; public BaseDistributedLock(ZkClientExt client, String path, String lockName){ this.client = client; this.basePath = path; this.path = path.concat("/").concat(lockName); this.lockName = lockName; } private void deleteOurPath(String ourPath) throws Exception{ client.delete(ourPath); } private String createLockNode(ZkClient client, String path) throws Exception{ return client.createEphemeralSequential(path, null); } /** * 獲取鎖的核心方法* @param startMillis * @param millisToWait * @param ourPath * @return * @throws Exception */ private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{ boolean haveTheLock = false; boolean doDelete = false; try{ while ( !haveTheLock ) { //該方法實現獲取locker節點下的所有順序節點,並且從小到大排序List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length()+1); //計算剛才客戶端創建的順序節點在locker的所有子節點中排序位置,如果是排序為0,則表示獲取到了鎖int ourIndex = children.indexOf(sequenceNodeName); /*如果在getSortedChildren中沒有找到之前創建的[臨時]順序節點,這表示可能由於網絡閃斷而導致*Zookeeper認為連接斷開而刪除了我們創建的節點,此時需要拋出異常,讓上一級去處理*上一級的做法是捕獲該異常,並且執行重試指定的次數見後面的attemptLock方法*/ if ( ourIndex<0 ){ throw new ZkNoNodeException("節點沒有找到: " + sequenceNodeName); } //如果當前客戶端創建的節點在locker子節點列表中位置大於0,表示其它客戶端已經獲取了鎖//此時當前客戶端需要等待其它客戶端釋放鎖, boolean isGetTheLock = ourIndex == 0; //如何判斷其它客戶端是否已經釋放了鎖?從子節點列表中獲取到比自己次小的哪個節點,並對其建立監聽String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1); if ( isGetTheLock ){ haveTheLock = true; }else{ //如果次小的節點被刪除了,則表示當前客戶端的節點應該是最小的了,所以使用CountDownLatch來實現等待String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch ); final CountDownLatch latch = new CountDownLatch(1); final IZkDataListener previousListener = new IZkDataListener() { //次小節點刪除事件發生時,讓countDownLatch結束等待//此時還需要重新讓程序回到while,重新判斷一次! public void handleDataDeleted(String dataPath) throws Exception { latch.countDown(); } public void handleDataChange(String dataPath, Object data) throws Exception { // ignore } }; try{ //如果節點不存在會出現異常client.subscribeDataChanges(previousSequencePath, previousListener); if ( millisToWait != null ){ millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ){ doDelete = true; // timed out - delete our node break; } latch.await(millisToWait, TimeUnit.MICROSECONDS); }else{ latch.await(); } }catch ( ZkNoNodeException e ){ //ignore }finally{ client.unsubscribeDataChanges(previousSequencePath, previousListener); } } } }catch ( Exception e ){ //發生異常需要刪除節點doDelete = true; throw e; }finally{ //如果需要刪除節點if ( doDelete ){ deleteOurPath(ourPath); } } return haveTheLock; } private String getLockNodeNumber(String str, String lockName) { int index = str.lastIndexOf(lockName); if ( index >= 0 ){ index += lockName.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } private List<String> getSortedChildren() throws Exception { try{ List<String> children = client.getChildren(basePath); Collections.sort( children, new Comparator<String>(){ public int compare(String lhs, String rhs){ return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName)); } } ); return children; }catch(ZkNoNodeException e){ client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception{ deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception{ final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; //網絡閃斷需要重試一試while ( !isDone ){ isDone = true; try{ //createLockNode用於在locker(basePath持久節點)下創建客戶端要獲取鎖的[臨時]順序節點ourPath = createLockNode(client, path); /** * 該方法用於判斷自己是否獲取到了鎖,即自己創建的順序節點在locker的所有子節點中是否最小* 如果沒有獲取到鎖,則等待其它客戶端鎖的釋放,並且稍後重試直到獲取到鎖或者超時*/ hasTheLock = waitToLock(startMillis, millisToWait, ourPath); }catch ( ZkNoNodeException e ){ if ( retryCount++ < MAX_RETRY_COUNT ){ isDone = false; }else{ throw e; } } } if ( hasTheLock ){ return ourPath; } return null;}以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。