1. What is a blocking queue?
A queue is a data structure that has two basic operations: adding an element at the end of the queue and removing an element from the head of the queue. The difference between blocking teams and ordinary queues is that ordinary queues will not block the current thread. When facing similar consumer-producer models, additional synchronization strategies and inter-thread wake-up strategies must be implemented. Using a blocking queue will block the current thread. When the queue is empty, the operation of obtaining elements from the queue will be blocked. When the queue is full, the operation of adding elements to the queue will also be blocked.
2. The main blocking queue and its methods
The java.util.concurrent package provides several main blocking queues, mainly as follows:
1) ArrayBlockingQueue: A blocking queue based on array implementation. When creating an ArrayBlockingQueue object, its capacity must be specified. It can also be specified for access policies. By default, it is not fair, that is, it does not guarantee that the thread with the longest waiting time will be able to access the queue first.
2) LinkedBlockingQueue: A blocking queue based on a linked list. If the capacity size is not specified when creating a LinkedBlockingQueue object, the default size is Integer.MAX_VALUE.
3) The above two queues are first-in-first-out queues, but PriorityBlockingQueue is not. It will sort the elements according to the priority of the elements and dequeue in priority order. Each element dequeue is the highest priority element. Note that this blocking queue is an unbounded blocking queue, that is, there is no upper limit on the capacity (you can know through the source code that it does not have a signal flag full of the container). The first two types are bounded queues.
4) DelayQueue: Based on PriorityQueue, a delay blocking queue. The element in the DelayQueue can only obtain the element from the queue when the specified delay time has reached. DelayQueue is also an unbounded queue, so the operation (producer) of inserting data into the queue will never be blocked, and only the operation (consumer) of obtaining data will be blocked.
Blocking queues include most methods in non-blocking queues, and provide several other very useful methods:
The put method is used to store elements to the tail of the queue, and if the queue is full, wait;
The take method is used to get elements from the queue, and if the queue is empty, wait;
The offer method is used to store elements at the tail of the queue. If the queue is full, it will wait for a certain time. When the time period is reached, if the insertion has not been successful, it will return false; otherwise it will return true;
The poll method is used to get elements from the first queue. If the queue is empty, it will wait for a certain time. When the time period is reached, if it is retrieved, it will return null; otherwise, it will return the obtained element;
Here is a code:
import java.util.concurrent.ArrayBlockingQueue;/** * @author Author: Xu JianE-mail:[email protected] * @version Created: March 20, 2016 at 12:52:53 pm * Class Description*/public class BlockingQueue{ public static void main(String[] args) throws InterruptedException { java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5); for (int i = 0; i < 10; i++) { // Add the specified element to this queue blockingQueue.put("Add element" + i); System.out.println("Added element to the blocking queue:" + i); } System.out.println("The program ends this time and will exit ----"); }}
When the number of limit blocking queues is 5, after adding 5 elements, the process will be blocked outside the queue and wait, and the program does not terminate at this time.
When the queue is full, we remove the header element and we can continue to add elements to the blocking queue. The code is as follows:
public class BlockingQueue{ public static void main(String[] args) throws InterruptedException { java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5); for (int i = 0; i < 10; i++) { // Add the specified element to this queue blockingQueue.put("Add element" + i); System.out.println("Added element to the blocking queue:" + i); if(i>=4) System.out.println("Remove the header element" +blockingQueue.take()); } System.out.println("The program ends this time and will exit ----"); }The execution results are as follows:
3. Implementation principle of blocking queue <br />The following mainly looks at the implementation principle of ArrayBlockingQueue.
First, let’s take a look at the member variables of the ArrayBlockingQueue class:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** Underlying storage structure-array*/ final Object[] items; /** Team head element subscript*/ int takeIndex; /** Team tail element subscript*/ int putIndex; /** Total number of queue elements*/ int count; /** Reentrantlock*/ final ReentrantLock lock; /** notEmpty wait condition*/ private final Condition notEmpty; /** notFull wait condition*/ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;As you can see, the ArrayBlockingQueue used to store elements is actually an array.
Let’s take a look at the implementation of two important methods of ArrayBlockingQueue, put() and take():
public void put(E e) throws InterruptedException { //Check first whether e is empty checkNotNull(e); //Acquire the lock final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //When the queue is full, enter the condition and wait while (count == items.length) notFull.await(); //The queue is not satisfied, perform the queue operation enqueue(e); } finally { //Release the lock lock.unlock(); } } Let’s look at the specific joining operation:
private void enqueue(E x) { final Object[] items = this.items; //Entermination items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //Total number of queues +1 count++; //Select a thread randomly in the waiting set of the notempty condition to unblock its blocking state notEmpty.signal(); } Here is the source code of the take() method:
public E take() throws InterruptedException { //Acquire the lock final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //The queue is empty while (count == 0) //The thread joins the notEmpty condition waiting set notEmpty.await(); //Not empty, exit the queue return dequeue(); } finally { //Release the lock lock.unlock(); } } 4. Application of blocking queues: Implementing the consumer-producer model
/** * @author Author: Xu JianE-mail:[email protected] * @version Created time: March 20, 2016 at 2:21:55 pm * Class description: Consumer-producer mode implemented by blocking queue*/public class Test{ private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consumption(); } private void consumption() { while (true) { try { queue.take(); System.out.println("Take an element from the queue, and the queue remains" + queue.size() + "elements"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { try { queue.put(1); System.out.println("Insert an element into the queue, the remaining space in the queue: "+ (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }}
The above is all about this article, I hope it will be helpful to everyone's learning.