Blocking queues in Java
1. What is a blocking queue?
A blocking queue (BlockingQueue) is a queue that supports two additional operations. These two additional operations are:
When the queue is empty, the thread that fetches the element will wait for the queue to become non-empty.
When the queue is full, the thread that stores the elements will wait for the queue to be available.
Blocking queues are often used in scenarios of producers and consumers. Producers are threads that add elements to queues, and consumers are threads that take elements from queues. A blocking queue is the container where the producer stores elements, and the consumer only takes elements from the container.
2. Blocking queue in Java
Seven blocking queues are provided in the JDK:
ArrayBlockingQueue
ArrayBlockingQueue is a bounded blocking queue implemented using arrays. This queue sorts elements according to the first-in-first-out (FIFO) principle. By default, visitors are not guaranteed to access fairly. The so-called fairly accessible queue refers to all blocked producer threads or consumer threads. When the queue is available, the queue can be accessed in the order of blocking. That is, the producer thread that blocks first can insert elements into the queue first, and the consumer thread that blocks first can obtain elements from the queue first. Normally, throughput is reduced in order to ensure fairness . We can create a fair blocking queue using the following code:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
The fairness of its access is achieved through the ReentrantLock lock.
LinkedBlockingQueue
LinkedBlockingQueue is a bounded blocking queue implemented with linked lists. The default and maximum length of this queue is Integer.MAX_VALUE. This queue sorts elements according to the first-in-first-out principle.
PriorityBlockingQueue
PriorityBlockingQueue is an unbounded queue that supports priority. By default, elements are arranged in natural order, and the ordering rules of elements can also be specified through the comparator comparator. The elements are arranged in ascending order.
DelayQueue
DelayQueue is an unbounded blocking queue that supports delayed acquisition of elements. Queue is implemented using PriorityQueue. Elements in the queue must implement the Delayed interface, and when creating an element, you can specify how long it takes to get the current element from the queue. Elements can only be extracted from the queue when the delay expires. We can use DelayQueue in the following application scenarios:
Design of cache system: DelayQueue can be used to save the validity period of cache elements, and a thread can be used to query the DelayQueue. Once the element can be obtained from the DelayQueue, it means that the validity period of cache has arrived.
Scheduled task schedule. Use DelayQueue to save the tasks and execution time that will be executed on the day. Once the task is obtained from the DelayQueue, it will start executing. For example, TimerQueue is implemented using DelayQueue.
How to implement the Delayed interface
We can refer to the ScheduledFutureTask class in ScheduledThreadPoolExecutor. This class implements the Delayed interface. First: When creating an object, use time to record when the object can be used before recording. The code is as follows:
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement();}Then use getDelay to query how long the current element needs to be delayed. The code is as follows:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }Through the constructor, we can see that the unit of the delay time parameter ns is nanoseconds. It is best to use nanoseconds when designing it yourself, because you can specify any unit when gettingDelay. Once nanoseconds are used as the unit, the delay time is less than nanoseconds, it will be troublesome. When using, please note that when the time is less than the current time, getDelay will return a negative number.
Finally, we can use time to specify the order in the queue, for example: let the longest delay time be placed at the end of the queue.
public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS)-other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }How to implement delayed blocking queues
The implementation of delay blocking queues is very simple. When the consumer obtains elements from the queue, if the element does not reach the delay time, it blocks the current thread.
long delay = first.getDelay(TimeUtil.NANOSECONDS); if(delay<=0){ return q.poll ;//blocking queue}else if(leader!=null){ //lead represents a thread waiting for a message from the blocking queue available.await(); //Let the thread enter the waiting signal}else {//When the leader is null, set the current thread to leaderThread thisThread = Thread.currentThread();try{leader = thisThread;//Use the awaitNanos() method to make the current thread wait for the received signal or wait for the delay time available.awaitNanos(delay);} finally{ if(leader==thread){ leader=null; } }}SynchronousQueue
SynchronousQueue is a blocking queue that does not store elements. Each put operation must wait for a take operation, otherwise the elements cannot be added. SynchronousQueue can be regarded as a passer, responsible for passing data processed by the producer thread directly to the consumer thread. The queue itself does not store any elements, which is very suitable for transitive scenarios. For example, data used in one thread is passed to another thread for use. The throughput of SynchronousQueue is higher than that of
LinkedBlockingQueue and ArrayBlockingQueue.
It supports fair access queues. By default, it is still an unfair policy mechanism
LinkedTransferQueue
LinkedTransferQueue is an unbounded blocking TransferQueue queue composed of linked list structure. Compared with other blocking queues, LinkedTransferQueue has more tryTransfer and transfer methods.
transfer method
If a consumer is currently waiting to receive an element (when the consumer uses the take() method or the time-limited poll() method), the transfer method can immediately transfer the element passed by the producer to the consumer. If no consumer is waiting for the receiving element, the transfer method stores the element in the tail node of the queue and waits until the element is consumed by the consumer before returning.
tryTransfer method
It is used to test whether the elements introduced by the producer can be directly transmitted to the consumer. If no consumer is waiting for the receiving element, false is returned. The difference between the transfer method is that the tryTransfer method returns immediately regardless of whether the consumer receives it or not. The transfer method must wait until the consumer consumes it before returning.
For the tryTransfer(E e, long timeout, TimeUnit unit) method with time limit, it tries to pass the element passed by the producer directly to the consumer, but if there is no consumer consuming the element, it will wait for the specified time before returning. If the timeout has not consumed the element, it will return false, and if the element is consumed within the timeout time, it will return true.
LinkedBlockingDeque
LinkedBlockingDeque is a bidirectional blocking queue composed of linked list structure. The so-called two-way queue refers to the fact that you can insert and remove elements from both ends of the queue. Because the double-ended queue has an additional entry to the operation queue, the competition is reduced by half when multiple threads join the queue at the same time. Compared with other blocking queues, LinkedBlockingDeque has more addFirst, addLast, offerFirst, offerLast, peekFirst, peekLast and other methods. The method ends with the first word, indicates insertion, acquisition, or removal of the first element of the double-ended queue. A method ending with the Last word, indicating that the last element of the double-ended queue is inserted, obtained or removed. In addition, the insertion method add is equivalent to addLast, and the removal method remove is equivalent to removeFirst. However, the take method is equivalent to takeFirst. I don’t know if it is a bug in Jdk, and it is clearer to use the method with the First and Last suffixes when using it. The capacity of the queue can be initialized when initializing LinkedBlockingDeque to prevent it from swelling when it is re-enlarged. In addition, the bidirectional blocking queue can be used in the "work theft" mode.
Thank you for reading, I hope it can help you. Thank you for your support for this site!