CountDownLatch source code analysis - countDown()
The previous article talked about the principle of await() in CountDownLatch from the source code level. This article talks about countDown() .
public void countDown() { //CountDownLatch sync.releaseShared(1);} ↓public final boolean releaseShared(int arg) { //AQS if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;} ↓protected boolean tryReleaseShared(int releases) { //CountDownLatch.Sync // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; }}Through the constructor CountDownLatch end = new CountDownLatch(2); state is set to 2, so c == 2, nextc = 2-1,
Then set state to 1 through the following CAS operation.
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }At this time, nextc is not 0, and returns false. Wait until the countDown() method is called twice, state == 0, nextc ==0, and returns true at this time.
Enter the doReleaseShared() method.
doReleaseShared(); ↓private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}Recalling the waiting queue model at this time.
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
At this time, the head is not null or tail. waitStatus == Node.SIGNAL, so enter the judgment if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)).
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) ↓ /** * CAS waitStatus field of a node. */private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);}This CAS operation sets state to 0, which means that waitStatus in the head is 0 at this time. The queue model is as follows
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
This method returns true. Enter unparkSuccessor(h);
unparkSuccessor(h); ↓private void unparkSuccessor(Node node) { /* * If status is negative (ie, possible needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);}s is the successor node of the head, that is, the node with the current thread. s != null , and s.waitStatus ==0 , so enter LockSupport.unpark(s.thread);
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }That is, the thread that unlocks blocked. The referee was allowed to blow the whistle!
The principle of countDown() is very clear.
Every time the countDown() method is executed, state is reduced by 1. Until state == 0, the threads blocked in the queue begin to be released, and the threads in subsequent nodes are released according to the state of waitStatus in the predecessor node.
OK, go back to the question of the previous article, when will the following loop break out (the loop in the await method)
for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();}At this time, state == 0, so enter the setHeadAndPropagate method.
setHeadAndPropagate(node, r); ↓private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }} ↓private void setHead(Node node) { head = node; node.thread = null; node.prev = null;}This method changes the successor node of head into head. After this method, the next node of node is set to null, and the model becomes the following figure
prev +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
That is, node head tail and other things are set to null, waiting for GC to recycle. At this time, return, jump out of the for loop, and the queue is cleared.
Here is a demonstration of the whole process
setHeadAndPropagate(node, r); +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- thread =null | <---- node(tail) | currentThread | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- node(tail) | currentThread | +-------------------------------+ ↓ +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
The core of CountDownLatch is a blocking thread queue, which is a queue constructed from a linked list, which contains thread and waitStatus, where waitStatus describes the successor node thread status.
state is a very important flag. When constructing, it is set to the corresponding n value. If n != 0, the blocking queue will be blocked all the time unless the thread is interrupted.
Each time the countDown() method is called, state-1 is used, and the await() method is used to add the thread calling the method to the blocking queue until state==0, and the thread can not be released.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.