This article mainly talks about the collections involved in concurrency packages. For ordinary collections, please refer to [java collection overview]
1. What is BlockingQueue
BlockingQueue is a blocking queue. From the word blocking, it can be seen that access to a blocking queue may cause blockage in some cases. There are two main blocked cases:
1. When the queue is full, it will be enqueued.
2. When the queue is empty, it will be out of the queue.
Therefore, when a thread tries to queue an already full queue, it will be blocked unless another thread does the queue operation; similarly, when a thread tries to queue an empty queue, it will be blocked unless another thread has the queue operation.
In Java, the BlockingQueue interface is located in the java.util.concurrent package (provided in the Java 5 version). From the characteristics of the blocking queue introduced above, it can be seen that the blocking queue is thread-safe.
2. How to use BlockingQueue
Blocking queues are mainly used in producer/consumer scenarios. The following picture shows a thread production and a thread consumption scenario:
The thread responsible for production continuously creates new objects and inserts them into the blocking queue until the upper limit of this queue is reached. After the queue reaches its upper limit, the production thread will be blocked until the consumed thread consumes the queue. Similarly, the thread responsible for consumption constantly consumes objects from the queue until the queue is empty. When the queue is empty, the consumption thread will be blocked unless a new object in the queue is inserted.
3. Methods in BlockingQueue interface
There are four sets of methods for blocking queues to perform insert , remove and examine respectively. When the operations corresponding to each set of methods cannot be executed immediately, there will be different reactions. The following table lists these methods in a classified manner:
| - | Throws Exception | Special Value | Blocks | Times Out |
|---|---|---|---|---|
| Insert | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| Remove | remove(o) | poll() | take() | poll(timeout, timeunit) |
| Examine | element() | peek() |
The corresponding characteristics of these four sets of methods are:
1. ThrowsException: If the operation cannot be performed immediately, an exception will be thrown.
2. SpecialValue: If the operation cannot be performed immediately, a special value will be returned, usually true or false
3. Blocks: If the operation cannot be performed immediately, the operation will be blocked
4. TimesOut: If the operation cannot be performed immediately, the operation will be blocked at the specified time. If the specified time is not executed, a special value will be returned, which is usually true or false.
It should be noted that we cannot insert null into the BlockingQueue, otherwise NullPointerException will be reported.
4. BlockingQueue implementation class
BlockingQueue is just an interface in the java.util.concurrent package. When using it specifically, we use its implementation classes. Of course, these implementation classes are also located in the java.util.concurrent package. In Java 6, BlockingQueue implementation classes are mainly as follows:
1. ArrayBlockingQueue
2. DelayQueue
3. LinkedBlockingQueue
4. PriorityBlockingQueue
5. SynchronousQueue
Below we will introduce these implementation classes separately.
4.1 ArrayBlockingQueue
ArrayBlockingQueue is a bounded blocking queue, and its internal implementation is an array. The meaning of boundary means that its capacity is limited, we must specify its capacity size when it is initialized, and the capacity size cannot be changed once it is specified.
ArrayBlockingQueue stores data in a first-in-first-out manner. The newly inserted object is the tail and the newly moved out object is the head. Here is an example of initializing and using ArrayBlockingQueue:
BlockingQueue queue = new ArrayBlockingQueue(1024);queue.put("1");Object object = queue.take();4.2 DelayQueue
What DelayQueue blocks is its internal elements. The elements in DelayQueue must implement the java.util.concurrent.Delayed interface. The definition of this interface is very simple:
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);} The return value of getDelay() method is the hold time before the queue element is released. If 0 or a负值is returned, it means that the element has expired and needs to be released. At this time, DelayedQueue will release this object through its take() method.
As can be seen from the above Delayed interface definition, it also inherits the Comparable interface. This is because the elements in the DelayedQueue need to be sorted. Generally speaking, we sort by priority of the element expiration time.
Example 1: Specify an expiration time for an object
First, we define an element, which needs to implement the Delayed interface
public class DelayedElement implements Delayed { private long expired; private long delay; private String name; DelayedElement(String elementName, long delay) { this. name = elementName; this. delay= delay; expired = ( delay + System. currentTimeMillis()); } @Override public int compareTo(Delayed o) { DelayedElement cached=(DelayedElement) o; return cached.getExpired()> expired?1:-1; } @Override public long getDelay(TimeUnit unit) { return ( expired - System. currentTimeMillis()); } @Override public String toString() { return "DelayedElement [delay=" + delay + ", name=" + name + "]"; } public long getExpired() { return expired; }}Set the expiration time of this element to 3s
public class DelayQueueExample { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedElement> queue= new DelayQueue<>(); DelayedElement ele= new DelayedElement( "cache 3 seconds",3000); queue.put( ele); System. out.println( queue.take()); }}Run this main function and we can find that we need to wait 3 seconds before printing this object.
In fact, there are many application scenarios for DelayQueue, such as timed closing connections, cache objects, timeout processing and other scenarios. Let’s take the student exam as an example to let everyone understand the use of DelayQueue more deeply.
Example 2: Treat all students in the exam as a DelayQueue, whoever finishes the questions first releases them
First, we construct a student object
public class Student implements Runnable,Delayed{ private String name; //Name private long costTime;//Time for the test questions private long finishedTime;//Time for completion public Student(String name, long costTime) { this. name = name; this. costTime= costTime; finishedTime = costTime + System. currentTimeMillis(); } @Override public void run() { System. out.println( name + "Submit the paper, time" + costTime /1000); } @Override public long getDelay(TimeUnit unit) { return ( finishedTime - System. currentTimeMillis()); } @Override public int compareTo(Delayed o) { Student other = (Student) o; return costTime >= other. costTime?1:-1; }}Then, construct a teacher object to take the exam to the students
public class Teacher { static final int STUDENT_SIZE = 30; public static void main(String[] args) throws InterruptedException { Random r = new Random(); //Think of all students as a delay queue DelayQueue<Student> students = new DelayQueue<Student>(); //Construct a thread pool to let students "do their homework" ExecutorService exec = Executors.newFixedThreadPool(STUDENT_SIZE); for ( int i = 0; i < STUDENT_SIZE; i++) { //Initialize the student's name and time to do the test students.put( new Student( "Student" + (i + 1), 3000 + r.nextInt(10000))); } //Start the test while(! students.isEmpty()){ exec.execute( students.take()); } exec.shutdown(); }}Let's take a look at the running results:
Student 2 Submit the paper, 3
Student 1 handing in papers, taking 5
Student 5 Submit the paper, 7
Student 4 Submit the paper, taking 8
Student 3 Submit the paper, 11
Through the running results, we can find that each student will "submit the paper" after the specified start time arrives ( depending on the getDelay() method ), and the paper will be submitted first ( depending on the compareTo() method).
By looking at its source code, you can see that the internal implementation of DelayQueue uses PriorityQueue and a Lock:
4.3 LinkedBlockingQueue
The configuration of the LinkedBlockingQueue blocking queue size is optional. If we specify a size when initializing, it is bounded, and if not specified, it is bounded. It is said to be boundless, but in fact, the default size is Integer.MAX_VALUE capacity. Its internal implementation is a linked list.
Like ArrayBlockingQueue, LinkedBlockingQueue also stores data in a first-in-first-out manner. The newly inserted object is the tail and the newly moved out object is the head. Here is an example of initializing and making LinkedBlockingQueue:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);bounded.put("Value");String value = bounded.take();4.4 PriorityBlockingQueue
PriorityBlockingQueue is a queue without boundaries, and its sorting rules are the same as java.util.PriorityQueue . It should be noted that null objects are allowed to be inserted in PriorityBlockingQueue.
All objects inserted into PriorityBlockingQueue must implement the java.lang.Comparable interface, and the queue priority sorting rules are defined according to our implementation of this interface.
In addition, we can get an iterator from PriorityBlockingQueue, but this iterator does not guarantee iteration in priority order.
Let’s give an example to illustrate. First, we define an object type, which needs to implement the Comparable interface:
public class PriorityElement implements Comparable<PriorityElement> {private int priority;//Define priority PriorityElement(int priority) { //Initialize priority this.priority = priority;}@Overridepublic int compareTo(PriorityElement o) { //Sort by priority size return priority >= o.getPriority() ? 1 : -1;}public int getPriority() { return priority;}public void setPriority(int priority) { this.priority = priority;}@Overridepublic String toString() { return "PriorityElement [priority=" + priority + "]";}}Then we randomly set priority to the queue
public class PriorityBlockingQueueExample {public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<>(); for (int i = 0; i < 5; i++) { Random random=new Random(); PriorityElement ele = new PriorityElement(random.nextInt(10)); queue.put(ele); } while(!queue.isEmpty()){ System.out.println(queue.take()); }}}Check out the running results:
PriorityElement [priority=3]
PriorityElement [priority=4]
PriorityElement [priority=5]
PriorityElement [priority=8]
PriorityElement [priority=9]
4.5 SynchronousQueue
Only one element is allowed inside the SynchronousQueue queue. When a thread inserts an element, it will be blocked unless the element is consumed by another thread.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.