Through the analysis of the previous three articles, we have a deep understanding of the internal structure and some design concepts of AbstractQueuedSynchronizer, and we know that AbstractQueuedSynchronizer maintains a synchronization state and two queuing areas, which are synchronous queues and conditional queues respectively. Let’s use public toilets as an analogy. The synchronization queue is the main queue area. If the public toilets are not open, everyone who wants to enter the toilet has to queue here. The condition queue is mainly set for condition waiting. Let's imagine that if a person finally successfully obtains the lock and enters the toilet through queue, but finds that he does not bring toilet paper before convenience. Although he is helpless when encountering this situation, it must also accept this fact. At this time, it has to go out and prepare the toilet paper first (enter the condition queue to wait). Of course, before going out, the lock has to be released so that others can come in. After preparing the toilet paper (the conditions are met), it has to return to the synchronous queue to queue again. Of course, not all people entering the room did not bring toilet paper. There may be other reasons that they must interrupt the operation and queue up in the condition queue first. Therefore, there can be multiple condition queues, and different condition queues are set according to different waiting conditions. The condition queue is a one-way linked list. The Condition interface defines all operations in the condition queue. The ConditionObject class inside the AbstractQueuedSynchronizer implements the Condition interface. Let's take a look at what operations are defined by the Condition interface.
public interface Condition { //Waiting for response to thread interruption void await() throws InterruptedException; //Waiting for not responding to thread interruption void awaitUninterruptibly(); //Setting condition waiting for relative time (no spin) long awaitNanos(long nanosTimeout) throws InterruptedException; //Setting condition waiting for relative time (spin) boolean await(long time, TimeUnit unit) throws InterruptedException; //Setting condition waiting for absolute time boolean awaitUntil(Date deadline) throws InterruptedException; //Wake up the head node void signal() in the condition queue; //Wake up all nodes of the condition queue void signalAll(); }Although the Condition interface defines so many methods, it is divided into two categories in total. The method that starts with await is the method that thread enters the condition queue and waits, and the method that starts with signal is the method that "wakes up" the thread in the condition queue. It should be noted here that calling the signal method may or may not wake up the thread. When the thread will be awakened depends on the situation, as will be discussed later, but calling the signal method will definitely move the thread from the conditional queue to the tail of the synchronization queue. For the sake of convenience of narration, we will not worry about it for the time being. We will call the signal method the operation of the wake-up conditional queue thread. Please note that there are 5 types of await methods, namely, the response thread interrupt waiting, the non-response thread interrupt waiting, the relative time non-spin waiting, the relative time spin waiting, and the absolute time waiting; there are only two types of signal methods, namely, the operation of only awakening the condition queue head node and awakening all nodes of the condition queue. The same type of methods are basically the same. Due to space limitations, it is impossible and not necessary to talk about these methods carefully. We only need to understand one representative method and then look at other methods to understand them. So in this article I will only talk about the await method and signal method in detail. Other methods will not be discussed in detail, but will post source code for your reference.
1. Wait for response to the condition of thread interrupt
//Waiting in response to the condition of thread interrupt public final void await() throws InterruptedException { //If the thread is interrupted, an exception is thrown if (Thread.interrupted()) { throw new InterruptedException(); } //Add the current thread to the tail of the condition queue Node node = addConditionWaiter(); //Full release the lock before entering the condition wait int savedState = fullyRelease(node); int interruptMode = 0; //The thread has been conditionally waiting in the while loop while (!isOnSyncQueue(node)) { //The thread that is conditionally waiting is suspended here, There are several cases where the thread is awakened: //1. The forward node of the synchronization queue has been cancelled//2. Set the state of the forward node of the synchronization queue to SIGNAL failed//3. The current node is awakened after the forward node releases the lock. //The current thread immediately checks whether it is interrupted. If so, it means that the node cancels the condition waiting. At this time, the node needs to be moved out of the condition queue if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //After the thread wakes up, it will acquire the lock in exclusive mode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //This operation is mainly to prevent threads from interrupting before signal, resulting in no disconnection from the condition queue if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //Interrupt processing that responds to interrupt mode if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); }}When a thread calls the await method, the current thread will be wrapped as a node node and placed in the tail of the condition queue. In the addConditionWaiter method, if the condition queue end node is found to be cancelled, the unlinkCancelledWaiters method will be called to clear all cancelled nodes of the condition queue. This step is the preparation for inserting nodes. After ensuring that the status of the tail node is also CONDITION, a new node will be created to wrap the current thread and put it in the tail of the condition queue. Note that this process just adds nodes to the tail of the synchronization queue without suspending threads.
Step 2: Release the lock completely
//Full release the lock final int fullyRelease(Node node) { boolean failed = true; try { //Get the current synchronization state int savedState = getState(); //Use the current synchronization state to release the lock if (release(savedState)) { failed = false; //If the lock is released successfully, return savedState; } else { //If the lock is released fails, throw a runtime exception throw new IllegalMonitorStateException(); } } finally { //Ensure that the node is set to the cancel state if (failed) { node.waitStatus = Node.CANCELLED; } }}After wrapping the current thread into a node and adding it to the tail of the condition queue, the fullyRelease method is called to release the lock. Note that the method named fullyRelease is used to completely release the lock, because the lock is reentrant, so you need to release the lock before conditional waiting, otherwise others will not be able to acquire the lock. If the lock is released fails, a runtime exception will be thrown. If the lock is released successfully, it will return to the previous synchronization state.
Step 3: Make conditions waiting
//The thread has been waiting in the while loop while (!isOnSyncQueue(node)) { //The threads that are waiting for condition are suspended here. There are several cases where the thread is awakened: //1. The forward node of the synchronization queue has been cancelled//2. Set the state of the forward node of the synchronization queue to SIGNAL failed//3. The current node is awakened after the forward node releases the lock. LockSupport.park(this); //The current thread wakes up immediately to check whether it is interrupted. If so, it means that the node cancels the condition waiting. At this time, the node needs to be moved out of the condition queue if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; }}//Check the thread interruption situation when condition waiting private int checkInterruptWhileWaiting(Node node) { //The interrupt request is before the signal operation: THROW_IE //The interrupt request is after the signal operation: REINTERRUPT //No interrupt request was received during this period: 0 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}//Transfer the node that cancels the condition waiting from the condition queue to the synchronization queue final boolean transferAfterCancelledWait(Node node) { //If this CAS operation is successful, it means that the interrupt occurs before the signal method if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //After the status modification is successful, put the node into the tail of the synchronization queue enq(node); return true; } //This indicates that the CAS operation failed, indicating that the interrupt occurs after the signal method while (!isOnSyncQueue(node)) { //If the sinal method has not transferred the node to the synchronization queue, wait for Thread.yield(); } return false;}After the above two operations are completed, it will enter the while loop. You can see that the while loop first calls LockSupport.park(this) to hang the thread, so the thread will be blocked here all the time. After calling the signal method, just transfer the node from the conditional queue to the synchronization queue. Whether the thread will be awakened depends on the situation. If you find that the forward node in the synchronization queue is cancelled when transferring a node, or the state of the forward node is updated to SIGNAL failed, both cases will immediately wake up the thread. Otherwise, the thread that is already in the synchronization queue will not be awakened at the end of the signal method, but will wait until its forward node wakes up. Of course, in addition to calling the signal method to wake up, the thread can also respond to interrupts. If the thread receives an interrupt request here, it will continue to execute. You can see that after the thread wakes up, it will immediately check whether it is awakened by interrupt or through signal method. If it is awakened by interrupt, it will also transfer this node to the synchronization queue, but it is achieved by calling the transferAfterCancelledWait method. After the final execution of this step, the interrupt will be returned and the while loop will be jumped out.
Step 4: Operation after the node is removed from the condition queue
//After the thread wakes up, it will acquire the lock in exclusive mode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT;}//This operation is mainly to prevent the thread from interrupting before signal and causing no contact with the condition queue if (node.nextWaiter != null) { unlinkCancelledWaiters();}//Interrupt processing that responds to interrupt mode if (interruptMode != 0) { reportInterruptAfterWait(interruptMode);}//After ending the condition waiting, it will make corresponding processing based on the interrupt situation private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //If the interrupt mode is THROW_IE, an exception is thrown if (interruptMode == THROW_IE) { throw new InterruptedException(); //If the interrupt mode is REINTERRUPT, it will hang itself} else if (interruptMode == REINTERRUPT) { selfInterrupt(); }}When the thread terminates the while loop, that is, the condition waits, it will return to the synchronization queue. Whether it is because of calling the signal method back or because of thread interruption, the node will eventually be in the synchronous queue. At this time, the acquireQueued method will be called to perform the operation of acquiring locks in the synchronization queue. We have already discussed this method in detail in the exclusive mode article. In other words, after the node comes out of the condition queue, it obediently goes to the set of locks in exclusive mode. After this node acquires the lock again, it will call the reportInterruptAfterWait method to respond accordingly based on the interrupt situation during this period. If the interrupt occurs before the signal method, interruptMode is THROW_IE, and an exception will be thrown after the lock is obtained again; if the interrupt occurs after the signal method, interruptMode is REINTERRUPT, and it will be interrupted again after the lock is obtained again.
2. Waiting for non-response to thread interrupts
//Waiting public final void awaitUninterruptibly() { //Add the current thread to the tail of the condition queue Node node = addConditionWaiter(); //Full release the lock and return the current synchronization state int savedState = fullyRelease(node); boolean interrupted = false; //The nodes are conditionally waiting in the while loop while (!isOnSyncQueue(node)) { //All threads in the condition queue are suspended here LockSupport.park(this); //The thread wakes up and finds that the interrupt will not respond immediately if (Thread.interrupted()) { interrupted = true; } } if (acquireQueued(node, savedState) || interrupted) { // Respond to all interrupt requests here, if one of the following two conditions is met, it will hang itself //1. The thread receives the interrupt request while the condition is waiting //2. The thread receives the interrupt request in the acquireQueued method selfInterrupt(); }}3. Set the relative time condition waiting (no spin)
//Set the timing condition waiting (relative time), and do not perform spin waiting public final long awaitNanos(long nanosTimeout) throws InterruptedException { //If the thread is interrupted, an exception is thrown if (Thread.interrupted()) { throw new InterruptedException(); } //Add the current thread to the tail of the condition queue Node node = addConditionWaiter(); //Full release the lock before entering the condition waiting int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { //Judge whether the timeout is used up if (nanosTimeout <= 0L) { //If timeout has been completed, you need to execute the cancel condition waiting operation transferAfterCancelledWait(node); break; } //Hang the current thread for a period of time, the thread may be awakened during this period, or it may wake up by itself LockSupport.parkNanos(this, nanosTimeout); //Check the interrupt information first after the thread wakes up if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } long now = System.nanoTime(); //Timeout time minus the waiting time of the condition nanosTimeout -= now - lastTime; lastTime = now; } //After the thread wakes up, it will acquire the lock in exclusive mode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //Because the transferAfterCancelledWait method does not empty nextWaiter, all you need to clean up here if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //Interrupt processing that responds to interrupt mode if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //Return the remaining time return nanosTimeout - (System.nanoTime() - lastTime);}4. Set the relative time condition waiting (spin)
//Set the timed condition waiting (relative time), perform spin waiting public final boolean await(long time, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException(); } //Get the milliseconds of the timeout long nanosTimeout = unit.toNanos(time); //If the thread is interrupted, an exception is thrown if (Thread.interrupted()) { throw new InterruptedException(); } //Add the current thread to the tail of the condition queue Node node = addConditionWaiter(); //Full release the lock before entering the condition to wait int savedState = fullyRelease(node); //Get the milliseconds of the current time long lastTime = System.nanoTime(); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { //If timeout is timedout, you need to perform the cancel condition waiting operation if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } //If the timeout time is greater than the spin time, the thread will be suspended for a period of time if (nanosTimeout >= spinForTimeoutThreshold) { LockSupport.parkNanos(this, nanosTimeout); } //After the thread wakes up, checks the interrupt information first if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } long now = System.nanoTime(); //Timeout time each time subtracts the time of the condition waiting nanosTimeout -= now - lastTime; lastTime = now; } //After the thread wakes up, it will acquire the lock in exclusive mode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } // Since the transferAfterCancelledWait method does not empty nextWaiter, all you need to clean up here if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //Interrupt processing that responds to interrupt mode if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //Return whether the timeout flag returns !timedout;}5. Set the absolute time condition waiting
//Set the timed condition wait (absolute time) public final boolean awaitUntil(Date deadline) throws InterruptedException { if (deadline == null) { throw new NullPointerException(); } //Get the milliseconds of absolute time long abstime = deadline.getTime(); //If the thread is interrupted, an exception is thrown if (Thread.interrupted()) { throw new InterruptedException(); } //Add the current thread to the tail of the condition queue Node node = addConditionWaiter(); //Full release the lock before entering the condition wait int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { //If timeout, you need to perform a cancel condition waiting operation if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } //Hang the thread for a period of time, during which the thread may be awakened, or it may be time to wake up by itself LockSupport.parkUntil(this, abstime); //Check the interrupt information first after the thread wakes up ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { break; } } //After the thread wakes up, it will acquire the lock in exclusive mode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } //Because the transferAfterCancelledWait method does not empty nextWaiter, all you need to clean up here if (node.nextWaiter != null) { unlinkCancelledWaiters(); } //Interrupt processing that responds to interrupt mode if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } //Return whether the timeout flag returns !timedout;}6. Wake up the head node in the conditional queue
//Wake up the next node in the condition queue public final void signal() { //Judge whether the current thread holds the lock if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } Node first = firstWaiter; //If there is a queueer in the condition queue if (first != null) { //Wake up the head node in the condition queue doSignal(first); }}//Wake up the head node in the condition queue private void doSignal(Node first) { do { //1. Move the firstWaiter reference one by one if ( (firstWaiter = first.nextWaiter) == null) { lastWaiter = null; } //2. Empty the reference of the successor node of the head node first.nextWaiter = null; //3. Transfer the head node to the synchronization queue, and it is possible to wake up the thread after the transfer is completed //4. If the transferForSignal operation fails, wake up the next node} while (!transferForSignal(first) && (first = firstWaiter) != null);}//Transfer the specified node from the condition queue to the synchronization queue final boolean transferForSignal(Node node) { //Set the waiting state from CONDITION to 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //If the operation to update the status fails, return false directly //It may be that the transferAfterCancelledWait method changed the state first, causing this CAS operation to fail return false; } //Add this node to the tail of the synchronization queue Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) { //The current thread will be awakened when the following situation occurs//1. The forward node is in the cancel state//2. The state of the updating forward node is SIGNAL operation failed LockSupport.unpark(node.thread); } return true;}It can be seen that the ultimate core of the signal method is to call the transferForSignal method. In the transferForSignal method, first use CAS operation to set the node's state from CONDITION to 0, and then call the enq method to add the node to the tail of the synchronization queue. We see the next if judgment statement. This judgment statement is mainly used to determine when the thread will be awakened. If these two situations occur, the thread will be immediately awakened. One is when it is discovered that the state of the previous node is cancelled, and the other is when the state of the previous node fails to update. Both cases will immediately wake up the thread, otherwise it will be done by simply transferring the node from the conditional queue to the synchronization queue, and will not immediately wake up the thread in the node. The signalAll method is roughly similar, except that it loops through all nodes in the conditional queue and transfers them to the synchronous queue. The method of transferring nodes still calls the transferForSignal method.
7. Wake up all nodes of the condition queue
//Wake up all nodes behind the condition queue public final void signalAll() { //Judge whether the current thread holds the lock if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } //Get the condition queue header node Node first = firstWaiter; if (first != null) { //Wake up all nodes of the condition queue doSignalAll(first); }}//Wake up all nodes of the condition queue private void doSignalAll(Node first) { //First empty the references of the header node and the tail node lastWaiter = firstWaiter = null; do { //Get the reference of the successor node first = first.nextWaiter; //Empty the subsequent reference of the node to be transferred first.nextWaiter = null; //Transfer the node from the conditional queue to the synchronization queue transferForSignal(first); //Point the reference to the next node first = next; } while (first != null);}At this point, our entire AbstractQueuedSynchronizer source code analysis is over. I believe that through these four analyses, everyone can better master and understand AQS. This category is indeed very important because it is the cornerstone of many other synchronization categories. Due to the limited level and expression ability of the author, if there are no clear statements or inadequate understanding, please correct them in time and discuss and learn together. You can leave a message to read the problem below. If you need AQS comment source code, you can also contact the author to request it.
Note: All the above analysis is based on JDK1.7, and there will be differences between different versions, readers need to pay attention.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.