在上一章我們講解了ArrayBlockingQueue,用數組形式實現的阻塞隊列。
數組的長度在創建時就必須確定,如果數組長度小了,那麼ArrayBlockingQueue隊列很容易就被阻塞,如果數組長度大了,就容易浪費內存。
而隊列這個數據結構天然適合用鍊錶這個形式,而LinkedBlockingQueue就是使用鍊錶方式實現的阻塞隊列。
一. 鍊錶實現
1.1 Node內部類
/** * 鍊錶的節點,同時也是通過它來實現一個單向鍊錶*/ static class Node<E> { E item; // 指向鍊錶的下一個節點Node<E> next; Node(E x) { item = x; } }有一個變量e儲存數據,有一個變量next指向下一個節點的引用。它可以實現最簡單地單向列表。
1.2 怎樣實現鍊錶
/** * 它的next指向隊列頭,這個節點不存儲數據*/ transient Node<E> head; /** * 隊列尾節點*/ private transient Node<E> last;
要實現鍊錶,必須有兩個變量,一個表示鍊錶頭head,一個表示鍊錶尾last。 head和last都會在LinkedBlockingQueue對象創建的時候被初始化。
last = head = new Node<E>(null);
注意這裡用了一個小技巧,鍊錶頭head節點並沒有存放數據,它指向的下一個節點,才真正存儲了鍊錶中第一個數據。而鍊錶尾last的確儲存了鍊錶最後一個數據。
1.3 插入和刪除節點
/** * 向隊列尾插入節點*/ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // 當前線程肯定獲取了putLock鎖// 將原隊列尾節點的next引用指向新節點node,然後再將node節點設置成隊列尾節點last // 這樣就實現了向隊列尾插入節點last = last.next = node; }在鍊錶尾插入節點很簡單,將原隊列尾last的下一個節點next指向新節點node,再將新節點node賦值給隊列尾last節點。這樣就實現了插入一個新節點。
// 移除隊列頭節點,並返回被刪除的節點數據private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // 當前線程肯定獲取了takeLock鎖// assert head.item == null; Node<E> h = head; // first節點中才存儲了隊列中第一個元素的數據Node<E> first = h.next; h.next = h; // help GC // 設置新的head值,相當於刪除了first節點。因為head節點本身不儲存數據head = first; // 隊列頭的數據E x = first.item; // 移除原先的數據first.item = null; return x; }要注意head並不是鍊錶頭,它的next才是指向鍊錶頭,所以刪除鍊錶頭也很簡單,就是將head.next賦值給head,然後返回原先head.next節點的數據。
刪除的時候,就要注意鍊錶為空的情況。 head.next的值使用enqueue方法添加的。當head.next==last的時候,表示已經刪除到最後一個元素了,當head.next==null的時候,就不能刪除了,因為鍊錶已經為空了。這裡沒有做判斷,是因為在調用dequeue方法的地方已經做過判斷了。
二. 同步鎖ReentrantLock和條件Condition
因為阻塞隊列在隊列為空和隊列已滿的情況下,都必須阻塞等待,那麼就天然需要兩個條件。而為了保證多線程並發安全,又需要一個同步鎖。這個在ArrayBlockingQueue中已經說過了。
這裡我們來說說LinkedBlockingQueue不一樣的地方。
/** 獨占鎖,用於處理插入隊列操作的並發問題,即put與offer操作*/ private final ReentrantLock putLock = new ReentrantLock(); /** 隊列不滿的條件Condition,它是由putLock鎖生成的*/ private final Condition notFull = putLock.newCondition(); /** 獨占鎖,用於處理刪除隊列頭操作的並發問題,即take與poll操作*/ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊列不為空的條件Condition, 它使用takeLock鎖生成的*/ private final Condition notEmpty = takeLock.newCondition();
2.1 putLock與takeLock
我們發現使用了兩把鎖:
按照上面的說法,可能會出現同時插入元素和刪除元素的操作,那麼就不會出現問題麼?
我們來具體分析一個下,對於隊列來說操作分為三種:
因此使用putLock鎖來保持last變量的安全,使用takeLock鎖來保持head變量的安全。
對於都涉及了隊列中元素個數count變量,所以使用AtomicInteger來保證並發安全問題。
/** 隊列中元素的個數,這裡使用AtomicInteger變量,保證並發安全問題*/ private final AtomicInteger count = new AtomicInteger();
2.2 notFull與notEmpty
2.3 控制流程
當插入元素時:
當刪除元素時:
還要注意一下,Condition的signal和await方法必須在獲取鎖的情況下調用。因此就有了signalNotEmpty和signalNotFull方法:
/** * 喚醒在notEmpty條件下等待的線程,即移除隊列頭時,發現隊列為空而被迫等待的線程。 * 注意,因為要調用Condition的signal方法,必須獲取對應的鎖,所以這裡調用了takeLock.lock()方法。 * 當隊列中插入元素(即put或offer操作),那麼隊列肯定不為空,就會調用這個方法。 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 喚醒在notFull條件下等待的線程,即隊列尾添加元素時,發現隊列已滿而被迫等待的線程。 * 注意,因為要調用Condition的signal方法,必須獲取對應的鎖,所以這裡調用了putLock.lock()方法* 當隊列中刪除元素(即take或poll操作),那麼隊列肯定不滿,就會調用這個方法。 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }三. 插入元素方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 記錄插入操作前元素的個數int c = -1; // 創建新節點node Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //表示隊列已滿,那麼就要調用notFull.await方法,讓當前線程等待while (count.get() == capacity) { notFull.await(); } // 向隊列尾插入新元素enqueue(node); // 將當前隊列元素個數加1,並返回加1之前的元素個數。 c = count.getAndIncrement(); // c + 1 < capacity表示隊列未滿,就喚醒可能等待插入操作的線程if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c == 0表示插入之前,隊列是空的。隊列從空到放入一個元素時, // 才喚醒等待刪除的線程// 防止頻繁獲取takeLock鎖,消耗性能if (c == 0) signalNotEmpty(); }以put方法為例,大體流程與我們前面介紹一樣,這裡有一個非常怪異的代碼,當插入完元素時,如果發現隊列未滿,那麼調用notFull.signal()喚醒等待插入的線程。
大家就很疑惑了,一般來說,這個方法應該放在刪除元素(take系列的方法裡),因為當我們刪除一個元素,那麼隊列肯定是未滿的,那麼調用notFull.signal()方法,喚醒等待插入的線程。
這裡這麼做主要是因為調用signal方法,必須先獲取對應的鎖,而在take系列的方法裡使用的鎖是takeLock,那麼想調用notFull.signal()方法,必須先獲取putLock鎖,這樣的話會性能就會下降,所以用了另一種方式。
四. 刪除隊列頭元素
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //表示隊列為空,那麼就要調用notEmpty.await方法,讓當前線程等待while (count.get() == 0) { notEmpty.await(); } // 刪除隊列頭元素,並返回它x = dequeue(); // 返回當前隊列個數,然後將隊列個數減一c = count.getAndDecrement(); // c > 1表示隊列不為空,就喚醒可能等待刪除操作的線程if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } /** * c == capacity表示刪除操作之前,隊列是滿的。只有從滿隊列中刪除一個元素時, * 才喚醒等待插入的線程* 防止頻繁獲取putLock鎖,消耗性能*/ if (c == capacity) signalNotFull(); return x; }為什麼調用notEmpty.signal()方法原因,對比一下我們在插入元素方法中的解釋。
五. 查看隊列頭元素
// 查看隊列頭元素public E peek() { // 隊列為空,返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 獲取隊列頭節點first Node<E> first = head.next; // first == null表示隊列為空,返回null if (first == null) return null; else // 返回隊列頭元素return first.item; } finally { takeLock.unlock(); } }查看隊列頭元素,涉及到head節點,所以必須使用takeLock鎖。
六. 其他重要方法
6.1 remove(Object o)方法
// 從隊列中刪除指定元素o public boolean remove(Object o) { if (o == null) return false; // 因為不是刪除列表頭元素,所以就涉及到head和last兩個變量, // putLock與takeLock都要加鎖fullyLock(); try { // 遍歷整個隊列,p表示當前節點,trail表示當前節點的前一個節點// 因為是單向鍊錶,所以需要記錄兩個節點for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 如果找到了指定元素,那麼刪除節點p if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }從列表中刪除指定元素,因為刪除的元素不一定在列表頭,所以可能會head和last兩個變量,所以必須同時使用putLock與takeLock兩把鎖。因為是單向鍊錶,需要一個輔助變量trail來記錄前一個節點,這樣才能刪除當前節點p。
6.2 unlink(Node<E> p, Node<E> trail) 方法
// 刪除當前節點p,trail代表p的前一個節點void unlink(Node<E> p, Node<E> trail) { // 將當前節點的數據設置為null p.item = null; // 這樣就在鍊錶中刪除了節點p trail.next = p.next; // 如果節點p是隊列尾last,而它被刪除了,那麼就要將trail設置為last if (last == p) last = trail; // 將元素個數減一,如果原隊列是滿的,那麼就調用notFull.signal()方法// 其實這個不用判斷直接調用的,因為這里肯定獲取了putLock鎖if (count.getAndDecrement() == capacity) notFull.signal(); }要在鍊錶中刪除一個節點p,只需要將p的前一個節點trail的next指向節點p的下一個節點next。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。