Semaphore 主要用於限量控制並發執行代碼的工具類, 其內部通過一個permit 來進行定義並發執行的數量。
/** * 使用非公平版本構件Semaphore */public KSemaphore(int permits){ sync = new NonfairSync(permits);}/** * 指定版本構件Semaphore */public KSemaphore(int permits, boolean fair){ sync = fair ? new FairSync(permits) : new NonfairSync(permits);} /** AQS 的子類主要定義獲取釋放lock */abstract static class Sync extends KAbstractQueuedSynchronizer{ private static final long serialVersionUID = 1192457210091910933L; /** * 指定permit 初始化Semaphore */ Sync(int permits){ setState(permits); } /** * 返回剩餘permit */ final int getPermits(){ return getState(); } /** * 獲取permit */ final int nonfairTryAcquireShared(int acquires){ for(;;){ int available = getState(); int remaining = available - acquires; // 判斷獲取acquires 的剩餘permit 數目if(remaining < 0 || compareAndSetState(available, remaining)){ // cas改變state return remaining; } } } /** * 釋放lock */ protected final boolean tryReleaseShared(int releases){ for(;;){ int current = getState(); int next = current + releases; if(next < current){ // overflow throw new Error(" Maximum permit count exceeded"); } if(compareAndSetState(current, next)){ // cas改變state return true; } } } final void reducePermits(int reductions){ // 減少permits for(;;){ int current = getState(); int next = current - reductions; if(next > current){ // underflow throw new Error(" Permit count underflow "); } if(compareAndSetState(current, next)){ return; } } } /** 將permit 置為0 */ final int drainPermits(){ for(;;){ int current = getState(); if(current == 0 || compareAndSetState(current, 0)){ return current; } } }} /** * 調用acquireSharedInterruptibly 響應中斷的方式獲取permit */public void acquire() throws InterruptedException{ sync.acquireSharedInterruptibly(1);}/** * 調用acquireUninterruptibly 非響應中斷的方式獲取permit */public void acquireUninterruptibly(){ sync.acquireShared(1);}/** * 嘗試獲取permit */public boolean tryAcquire(){ return sync.nonfairTryAcquireShared(1) >= 0;}/** * 嘗試的獲取permit, 支持超時與中斷*/public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException{ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/** * 支持中斷的獲取permit */public void acquire(int permits) throws InterruptedException{ if(permits < 0){ throw new IllegalArgumentException(); } sync.acquireSharedInterruptibly(permits);}/** * 不響應中斷的獲取permit */public void acquireUninterruptibly(int permits){ if(permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits);}/** * 嘗試獲取permit */public boolean tryAcquire(int permits){ if(permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0;}/** * 嘗試支持超時機制, 支持中斷的獲取permit */public boolean tryAcquire(int permits, long timout, TimeUnit unit) throws InterruptedException{ if(permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timout));} /** * 釋放permit */public void release(){ sync.releaseShared(1);}/** * 釋放permit */public void release(int permits){ if(permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits);} /** * 返回可用的permit */public int availablePermits(){ return sync.getPermits();}/** * 消耗光permit */public int drainPermits(){ return sync.drainPermits();}/** * 減少reduction 個permit */protected void reducePermits(int reduction){ if(reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction);}/** * 判斷是否是公平版本*/public boolean isFair(){ return sync instanceof FairSync;}/** * 返回AQS 中Sync Queue 裡面的等待線程*/public final boolean hasQueuedThreads(){ return sync.hasQueuedThreads();}/** * 返回AQS 中Sync Queue 裡面的等待線程長度*/public final int getQueueLength(){ return sync.getQueueLength();}/** * 返回AQS 中Sync Queue 裡面的等待線程*/protected Collection<Thread> getQueueThreads(){ return sync.getQueuedThreads();}以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。