In the previous chapter, we introduced the blocking queue BlockingQueue. Below we introduce its commonly used implementation class ArrayBlockingQueue.
1. Use arrays to implement queues
Because of the special requirements of the data structure of the queue, it is naturally suitable to be implemented in the form of a linked list. Two variables are used to record the header and the end of the linked list respectively. When deleting or inserting the queue, just change the header or end of the linked list. Moreover, the linked list is linked in a reference manner, so its capacity is almost unlimited.
So how to use arrays to implement queues, we need four variables: Object[] array to store elements in the queue, headIndex and tailIndex record the queue head and tail, and count record the number of queues.
A very clever way is used here. We know that when an element is inserted into the queue, it takes up a position in the array. When an element is deleted, the position of the array is actually free, indicating that a new element can be inserted in this position.
So before we insert a new element, we must check whether the queue is full, and before we delete the element, we must check whether the queue is empty.
2. Important member variables in ArrayBlockingQueue
/** Store the elements in the queue*/ final Object[] items; /** The position of the queue head*/ int takeIndex; /** The position of the queue tail*/ int putIndex; /** The number of elements in the current queue*/ int count; /** Used to ensure the security of multi-threaded shared variables*/ final ReentrantLock lock; /** When the queue is empty, the wait method of notEmpty will be called to make the current thread wait*/ private final Condition notEmpty; /** When the queue is full, the wait method of notFull will be called, causing the current thread to wait*/ private final Condition notFull;
There are more lock, notEmpty, and notFull variables to implement multi-thread safety and thread waiting conditions. How do they operate?
2.1 The role of lock
Because ArrayBlockingQueue operates under multiple threads, when modifying member variables such as items, takeIndex, putIndex and count, multi-threaded safety issues must be considered. Therefore, lock exclusive lock is used here to ensure the safety of concurrent operations.
2.2 The role of notEmpty and notFull
Because blocking queues must be implemented, when the queue is empty or the queue is full, the queue read or insert operation must wait. So we thought of the Condition object under the concurrency framework and use it to control it.
In AQS, we introduce the role of this class:
3. Add element method
3.1 add(E e) and offer(E e) methods:
// Call the method in the AbstractQueue parent class. public boolean add(E e) { // Implement if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } // Add new element to the end of the queue. Return true means the addition is successful, false means the addition failed, and no exception is thrown public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threaded modification of member attributes lock.lock(); try { // The queue is full, and the addition of elements fails, return false. if (count == items.length) return false; else { // Call the enqueue method to insert the element into the queue enqueue(e); return true; } } finally { lock.unlock(); } }The add method is implemented by calling the offer method. In the offer method, it is necessary to first determine whether the queue is full. If it is full, it will directly return false without blocking the current thread. If you are not satisfied, call the enqueue method to insert the element into the queue.
Note: Using lock.lock() here ensures that only one thread modifys member variables at the same time to prevent concurrent operation problems. Although it will also block the current thread, it is not a conditional waiting, it is just because the lock is held by other threads, and the method operation time in ArrayBlockingQueue is not long, which is equivalent to not blocking the thread.
3.2 put method
// Add a new element to the end of the queue. If the queue is full, the current thread will wait. Response to interrupt exception public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threads modify the security of member attributes lock.lockInterruptibly(); try { // If the queue is full, call the notFull.await() method to let the current thread wait until the queue is not full while (count == items.length) notFull.await(); // Call the enqueue method to insert the element into the queue enqueue(e); } finally { lock.unlock(); } }The general process of the offer method is the same as the offer method. However, when the queue is full, the notFull.await() method will be called to make the current thread block and wait until the queue is removed by other threads. When the queue is unsatisfied, the waiting thread will be awakened.
3.3 offer(E e, long timeout, TimeUnit unit) method
/** * Add a new element to the end of the queue. If there is no space available in the queue, the current thread will wait. * If the waiting time exceeds timeout, then false is returned, indicating that the addition failed*/ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); // Calculate the time value of the maximum waiting total nanos long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Use lock to ensure that multi-thread modification of member attributes lock.lockInterruptibly(); try { // The queue is full while (count == items.length) { // nanos <= 0 means that the maximum waiting time has arrived, so there is no need to wait any longer. Return false, indicating that the addition failed. if (nanos <= 0) return false; // Call the notFull.awaitNanos(nanos) method, the timeout nanos time will be automatically awakened. // If it is woken up in advance, then the remaining time is returned nanos = notFull.awaitNanos(nanos); } // Call the enqueue method to insert the element into the queue enqueue(e); return true; } finally { lock.unlock(); } }The general flow of put method is the same as the put method, it is just calling the notFull.awaitNanos(nanos) method to make the current thread wait and set the waiting time.
4. Delete element method
4.1 Remove() and poll() methods:
// Call the method in the AbstractQueue parent class. public E remove() { // Implementation by calling poll E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); }// Delete the first element of the queue (i.e. the queue header) and return it. If the queue is empty, it does not throw an exception, but returns null. public E poll() { final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threaded modification of member attributes lock.lock(); try { // If count == 0 and the list is empty, return null. Otherwise, call the dequeue method to return the list header element (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }The remove method is implemented by calling the poll() method. In the poll() method, if the list is empty, it returns null, otherwise the dequeue method is called to return the list header element.
4.2 take() method
/** * Return and remove the first element of the queue. If the queue is empty, the front thread will wait. Response interrupt exception*/ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threads modify the security of member attributes lock.lockInterruptibly(); try { // If the queue is empty, call the notEmpty.await() method to let the current thread wait. // Until another thread inserts elements into the queue, the thread will be awakened. while (count == 0) notEmpty.await(); // Call the dequeue method to return the list header element return dequeue(); } finally { lock.unlock(); } }When the take() method is empty, the current thread must wait until another thread inserts a new element into the queue, and the thread will be awakened.
4.3 poll(long timeout, TimeUnit unit) method
/** * Return and remove the first element of the queue. If the queue is empty, the front thread will wait. * If the waiting time exceeds timeout, then false is returned to indicate that the element is failed*/ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // Calculate the maximum waiting time value in total nanos long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threaded modification of member attributes lock.lockInterruptibly(); try { // The queue is empty while (count == 0) { // nanos <= 0 means that the maximum waiting time has arrived, so there is no need to wait anymore, return null. if (nanos <= 0) return null; // Call the notEmpty.awaitNanos(nanos) method to make the schedule thread wait and set the timeout time. nanos = notEmpty.awaitNanos(nanos); } // Call the dequeue method to return the list header element return dequeue(); } finally { lock.unlock(); } }Just like the take() method process, it is just called the awaitNanos(nanos) method to make the current thread wait and set the waiting time.
5. Methods to view elements
5.1 element() and peek() methods
// Call the method in the AbstractQueue parent class. public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } // View queue header elements public E peek() { final ReentrantLock lock = this.lock; // Use lock to ensure that multi-threaded modification of member attributes lock.lock(); try { // Return the element of the current queue header return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }VI. Other important methods
6.1 enqueue and dequeue methods
// Insert element x into the tail of the queue private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // The current putIndex position element must be null final Object[] items = this.items; items[putIndex] = x; // If putIndex is equal to items.length, then reset putIndex to 0 if (++putIndex == items.length) putIndex = 0; // Add one count++ to the number of queues; // Because an element is inserted, the current queue is definitely not empty. Then wake up a thread waiting for under notEmpty condition notEmpty.signal(); } // Delete the element of the queue header and return it private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; // Get the element of the current queue header @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // Set the current queue header position to null items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // minus the number of queues by one count--; if (itrs != null) itrs.elementDequeued(); // Because an element is deleted, the queue is definitely not satisfied, so wake up a thread waiting under the notFull condition notFull.signal(); return x; }These two methods represent, inserting elements into and removing elements from the queue, respectively. And they want to wake up the waiting thread. After inserting an element, the queue must not be empty, so the thread waiting under the notEmpty condition must be awakened. After deleting the element, the queue must be unsatisfied, so the thread waiting under the notFull condition must be awakened.
6.2 remove(Object o) method
// Delete the object o element in the queue, and delete at most one public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; // Use lock to ensure the security of multi-threaded modification of member attributes final ReentrantLock lock = this.lock; lock.lock(); try { // Delete only when there is a value in the queue. if (count > 0) { // The next position at the end of the queue final int putIndex = this.putIndex; // The position of the queue header int i = takeIndex; do { // The current position element is the same as the deleted element if (o.equals(items[i])) { // Delete the i position element removeAt(i); // Return true return true; } if (++i == items.length) i = 0; // When i==putIndex means that all elements have been traversed} while (i != putIndex); } return false; } finally { lock.unlock(); } } Delete the specified object o from the queue, then you have to traverse the queue and delete the first element that is the same as object o. If there is no object o element in the queue, then return false to delete failed.
Here are two points to note:
How to traverse a queue is to traverse from the head of the queue to the end of the queue. It depends on the two variables that takeIndex and putIndex.
Why is Object[] items = this.items; this code is not placed in the synchronous lock lock code block. Items are member variables. When there are so many threads, will there be no concurrency problems?
This is because items are reference variables, not basic data types, and our insertion and deletion operations on queues are all for this item array, and the reference to the array is not changed. Therefore, in the lock code, items will get the latest modifications to it by other threads. But if the int putIndex = this.putIndex; method locks the code block outside, a problem will occur.
removeAt(final int removeIndex) method
// Delete the element in the queue removeIndex position void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; // It means that it is much easier to delete the element as the list header, which is similar to the dequeue method process if (removeIndex == takeIndex) { // Remove the element in the removeIndex position items[takeIndex] = null; // When the end of the array, you need to go to the array header position if (++takeIndex == items.length) takeIndex = 0; // minus the number of queues by one count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; // The end of the queue has not yet reached the end of the queue, then the element in the next position is overwritten by the element in the previous position if (next != putIndex) { items[i] = items[next]; i = next; } else { // Set the tail element of the queue null items[i] = null; // Reset the value of putIndex this.putIndex = i; break; } } // Decrease the number of queues by count--; if (itrs != null) itrs.removedAt(removeIndex); } // Because an element is deleted, the queue is definitely not satisfied, so wake up a thread waiting under the notFull condition notFull.signal(); }Delete elements in the specified location in the queue. It should be noted that the array after deletion can still maintain the queue form, which is divided into two situations:
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.