In the previous chapter, we explained the blocking queue implemented in ArrayBlockingQueue, using array form.
The length of the array must be determined when creating. If the array length is small, the ArrayBlockingQueue queue will easily be blocked. If the array length is large, it will easily waste memory.
The queue data structure is naturally suitable for the form of linked lists, and LinkedBlockingQueue is a blocking queue implemented using linked lists.
1. Linked list implementation
1.1 Node internal class
/** * The node of the linked list is also used to implement a one-way linked list*/ static class Node<E> { E item; // Point to the next node of the linked list Node<E> next; Node(E x) { item = x; } }There is a variable e that stores data, and there is a variable next reference pointing to the next node. It can implement the simplest one-way list.
1.2 How to implement link list
/** * Its next point to the queue head, and this node does not store data*/ transient Node<E> head; /** * queue tail node*/ private transient Node<E> last;
To implement a linked list, there must be two variables, one represents the head of the linked list and the other represents the last of the linked list. Both head and last are initialized when the LinkedBlockingQueue object is created.
last = head = new Node<E>(null);
Note that a small trick is used here. The head node of the linked list does not store data. The next node it points to truly store the first data in the linked list. The last at the end of the linked list does indeed store the last data in the linked list.
1.3 Insert and delete nodes
/** * Insert node to the tail of the queue*/ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // The current thread must have obtained the putLock lock// Point the next reference of the original queue tail node to the new node node, and then set the node node to the tail node of the queue last // This realizes inserting node to the tail of the queue last = last.next = node; }It is very simple to insert a node at the end of the linked list. Point the next node of the original queue last to the new node node, and then assign the new node node to the last node at the end of the queue. This enables inserting a new node.
// Remove the queue head node and return the deleted node data private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // The current thread must have obtained the takeLock lock// assert head.item == null; Node<E> h = head; // The data of the first element in the queue is stored in the first node Node<E> first = h.next; h.next = h; // help GC // Setting the new head value is equivalent to deleting the first node. Because the head node itself does not store data head = first; // The data of the queue header E x = first.item; // Remove the original data first.item = null; return x; }Note that head is not the header, its next point to the header, so it is very simple to delete the header, which is to assign head.next to head and then return the data of the original head.next node.
When deleting, pay attention to the situation where the linked list is empty. The value of head.next is added using the enqueue method. When head.next==last, it means that the last element has been deleted. When head.next==null, it cannot be deleted because the linked list is already empty. There is no judgment here because the judgment has been made at the place where the dequeue method is called.
2. Synchronous lock ReentrantLock and condition
Because the blocking queue must be blocked and waited when the queue is empty and the queue is full, then two conditions are naturally required. In order to ensure the safety of multi-threaded concurrency, a synchronization lock is needed. This has been said in ArrayBlockingQueue.
Here we will talk about the different things about LinkedBlockingQueue.
/** Exclusive lock, used to handle concurrency problems of inserting queue operations, that is, put and offer operations*/ private final ReentrantLock putLock = new ReentrantLock(); /** Condition Condition that the queue is not satisfied with, it is generated by putLock lock */ private final Condition notFull = putLock.newCondition(); /** Exclusive lock, used to handle concurrency problems of deleting queue head operations, that is, take and poll operations*/ private final ReentrantLock takeLock = new ReentrantLock(); /** Condition Condition that the queue is not empty, it uses the private final Condition generated by takeLock lock */ private final Condition notEmpty = takeLock.newCondition();
2.1 putLock and takeLock
We found two locks used:
According to the above statement, there may be operations of inserting and deleting elements at the same time, so will there be no problem?
Let’s analyze it in detail. For queues, operations are divided into three types:
Therefore, use putLock lock to keep the last variable safe, and use takeLock lock to keep the head variable safe.
For all the count variables involved in the number of elements in the queue, AtomicInteger is used to ensure concurrency security issues.
/** The number of elements in the queue, use the AtomicInteger variable here to ensure concurrency security issues*/ private final AtomicInteger count = new AtomicInteger();
2.2 notFull and notEmpty
2.3 Control process
When inserting an element:
When deleting elements:
Also note that the signal and await methods of Condition must be called when a lock is acquired. Therefore, there are signalNotEmpty and signalNotFull methods:
/** * Wake up the thread waiting under the notEmpty condition, that is, when removing the queue header, the thread that is found to be empty and forced to wait. * Note that because to call the signal method of Condition, the corresponding lock must be obtained, the takeLock.lock() method is called here. * When an element is inserted into the queue (i.e. put or offer operation), the queue is definitely not empty, and this method will be called. */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Wake up the thread waiting under the notFull condition, that is, when the element is added at the end of the queue, the thread that is full and forced to wait. * Note that because to call the signal method of Condition, the corresponding lock must be obtained, so the putLock.lock() method is called here* When the element (i.e. take or poll operation) is deleted in the queue, the queue will definitely be unsatisfied and will call this method. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }3. Insert element method
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Record the number of elements before insertion operation int c = -1; // Create a new node Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // Indicate that the queue is full, then you need to call the notFull.await method to make the current thread wait while (count.get() == capacity) { notFull.await(); } // Insert a new element enqueue(node); // Add 1 to the number of elements in the current queue and return the number of elements before adding 1. c = count.getAndIncrement(); // c + 1 < capacity means that the queue is not full, and the thread that may be waiting for the insertion operation is awakened if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c == 0 means that the queue is empty before insertion. When the queue is empty to an element being placed, // Wake up the thread waiting for deletion// Prevent frequent acquisition of takeLock locks, consume performance if (c == 0) signalNotEmpty(); }Taking the put method as an example, the general process is the same as we introduced earlier. Here is a very weird code. When the element is inserted, if you find that the queue is not full, then call notFull.signal() to wake up the thread waiting for the insertion.
Everyone is very confused. Generally speaking, this method should be placed in the delete element (take series method), because when we delete an element, the queue must be under-full, then call the notFull.signal() method to wake up the thread waiting for insertion.
This is mainly done here because when calling the signal method, the corresponding lock must be obtained first. The lock used in the take series method is takeLock. If you want to call the notFull.signal() method, you must first obtain the putLock lock. This will cause performance to decrease, so another method is used.
4. Delete the queue header element
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // means that the queue is empty, then you need to call the notEmpty.await method to make the current thread wait while (count.get() == 0) { notEmpty.await(); } // Delete the queue header element and return it x = dequeue(); // Return the number of the current queues, and then subtract the number of the queues by one c = count.getAndDecrement(); // c > 1 means that the queue is not empty, so it wakes up the thread that may be waiting for the delete operation if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } /** * c == capacity means that the queue is full before the delete operation. * Wake up the thread waiting for insertion only when an element is deleted from the full queue* Prevents frequent acquisition of putLock locks, consuming performance*/ if (c == capacity) signalNotFull(); return x; }Why is the notEmpty.signal() method called, compare our explanation in the insert element method.
5. View queue header elements
// View the queue header element public E peek() { // The queue is empty, return null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // Get the queue header node first Node<E> first = head.next; // first == null means that the queue is empty, return null if (first == null) return null; else // Return the queue header element return first.item; } finally { takeLock.unlock(); } }View the queue header element, which involves the head node, so takeLock lock must be used.
VI. Other important methods
6.1 remove(Object o) method
// Remove the specified element from the queue o public boolean remove(Object o) { if (o == null) return false; // Because it is not to delete the list header element, the two variables head and last are involved, // putLock and takeLock must be locked fullyLock(); try { // Traverse the entire queue, p represents the current node, and trail represents the previous node of the current node// Because it is a one-way linked list, two nodes need to be recorded for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // If the specified element is found, then delete the node if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }Delete the specified element from the list. Because the deleted element is not necessarily in the head of the list, there may be two variables head and last, so you must use both putLock and takeLock locks at the same time. Because it is a one-way linked list, an auxiliary variable trail is needed to record the previous node so that the current node p can be deleted.
6.2 unlink(Node<E> p, Node<E> trail) method
// Delete the current node p, and the trail represents the previous node of p void unlink(Node<E> p, Node<E> trail) { // Set the data of the current node to null p.item = null; // This deletes the node p trail.next = p.next; // If node p is the last last of the queue and it is deleted, then set the trail to last if (last == p) last = trail; // Subtract the number of elements by one, if the original queue is full, then call the notFull.signal() method// In fact, this is called directly without judgment, because the putLock lock must be obtained here if (count.getAndDecrement() == capacity) notFull.signal(); }To delete a node p in the linked list, you only need to point the next of the previous node trail of p to the next node next of node p.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.