CountDownLatch source code analysis - await(), the specific content is as follows
The previous article talked about how to use CountDownLatch. This article will talk about the principle of await() from the source code level.
We already know that await can keep the current thread in a blocking state until the latch count is zero (or thread interruption).
Below is its source code.
end.await(); ↓public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}sync is the inner class of CountDownLatch. Here is its definition.
private static final class Sync extends AbstractQueuedSynchronizer { ...}It inherits the AbstractQueuedSynchronizer. AbstractQueuedSynchronizer This class belongs to a very important class in Java threads.
It provides a framework to implement blocking locks and related synchronizers (such as signals, events, etc.) that rely on FIFO waiting queues.
Keep going and jump to the AbstractQueuedSynchronizer class.
sync.acquireSharedInterruptibly(1); ↓public final void acquireSharedInterruptibly(int arg) //AbstractQueuedSynchronizer throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}There are two judgments here. First, determine whether the thread is interrupted, and then make the next judgment. Here we mainly look at the second judgment.
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}It should be noted that the tryAcquireShared method is implemented in Sync.
Although there are implementations of it in AbstractQueuedSynchronizer, the default implementation is to throw an exception.
tryAcquireShared This method is used to query whether the current object's status can be allowed to acquire the lock.
We can see that in Sync, we return the corresponding int value by determining whether state is 0.
So what does state mean?
/** * The synchronization state. */ private volatile int state;
The above code clearly shows that state represents synchronization status.
It should be noted that state uses the volatile keyword to modify it.
The volatile keyword can ensure that the modification of state is updated to main memory immediately. When other threads need to read, the new value will be read in memory.
That is, the visibility of the state is guaranteed. It is the latest data.
What is the state that comes here?
Here we need to take a look at the constructor of CountDownLatch.
CountDownLatch end = new CountDownLatch(2); ↓public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);} ↓Sync(int count) { setState(count);}It turns out that the numbers in the constructor are used to set state.
So we have state == 2 here. tryAcquireShared returns -1. Enter below
doAcquireSharedInterruptibly(arg); ↓private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }OK, this code is a bit long, and several functions are called in it. Let's look at it one by one.
A new class Node appears in the first line.
Node is an internal class in AQS (AbstractQueuedSynchronizer) class, which defines a chain structure. As shown below.
+------+ prev +-----+ +-----+head | | <---- | | <---- | | | tail +-----+ +-----+ +-----+
Remember this structure.
There is also a method in the first line of code addWaiter(Node.SHARED) .
addWaiter(Node.SHARED) //Node.SHARED means that the node is in shared mode ↓private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // private transient volatile Node tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node;}First, a Node is constructed and the current thread is stored. The mode is a shared mode.
tail means that the queue end of the waiting queue is null at this moment. So pred == null enters enq(node);
enq(node) ↓private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}The same tail is null, enter compareAndSetHead.
compareAndSetHead(new Node()) ↓/** * CAS head field. Used only by enq. */private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}This is a CAS operation. If the head is null, the head of the waiting queue will be set to the update value, which is a new node.
tail = head; then tail is no longer null at this time. Enter the next cycle.
This time, first point the prev pointer of node to tail, then set the node to the tail through a CAS operation, and return the tail of the queue, that is, node.
The model of the waiting queue changes as follows
+------+ prev +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
OK, when you get here the await method returns, it is a thread equal to the Node of the current thread.
Return to doAcquireSharedInterruptibly(int arg) and enter the following loop.
for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();}At this time, assuming that state is still greater than 0, then r < 0 at this time, so enter the shouldParkAfterFailedAcquire method.
shouldParkAfterFailedAcquire(p, node) ↓private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //static final int SIGNAL = -1; /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicated retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;} ↓/** * CAS waitStatus field of a node. */private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);}You can see that shouldParkAfterFailedAcquire also goes all the way to compareAndSetWaitStatus.
compareAndSetWaitStatus Set the waitStatus of prev to Node.SIGNAL .
Node.SIGNAL means that the threads in subsequent nodes need to be unparking (similar to being awakened). This method returns false.
After this cycle, the queue model becomes the following state
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Because shouldParkAfterFailedAcquire returns false, we will no longer look at the following conditions. Continue the loop in for (;;).
If the state is still greater than 0, enter again to shouldParkAfterFailedAcquire.
This time, because waitStatus in head is Node.SIGNAL, shouldParkAfterFailedAcquire returns true.
This time I need to see the parkAndCheckInterrupt method.
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }OK, the thread is not interrupted, so, return false. Continue the loop in for (;;).
If state is always greater than 0 and the thread is not interrupted, then it is always in this loop. That is, the referees mentioned in the previous article that they have always been reluctant to announce the end of the game.
So under what circumstances will the loop break out? That is, under what circumstances will state be less than 0? I will explain the next article.
To sum up, the await() method is actually to initialize a queue, add the thread that needs to be waited (state > 0) to a queue, and use waitStatus to mark the thread status of the successor node.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.