In the [High Concurrency Java II] multi-threading foundation, we have initially mentioned basic thread synchronization operations. What we want to mention this time is the synchronization control tool in the concurrent package.
1. Use of various synchronization control tools
1.1 ReentrantLock
ReentrantLock feels like an enhanced version of synchronized. The feature of synchronized is that it is simple to use and everything is left to the JVM for processing, but its functions are relatively weak. Before JDK1.5, ReentrantLock's performance was better than synchronized. Due to the optimization of the JVM, the performance of the two in the current JDK version is comparable. If it is a simple implementation, don't deliberately use ReentrantLock.
Compared with synchronized, ReentrantLock is more functionally rich, and it has the characteristics of reentrant, interruptible, limited time, and fair locking.
First, let’s use an example to illustrate the initial usage of ReentrantLock:
package test;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 10000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { Test test = new Test(); Thread t1 = new Thread(test); Thread t2 = new Thread(test); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); }}There are two threads that perform ++ operations on i. In order to ensure thread safety, ReentrantLock is used. From the usage, we can see that compared with synchronized, ReentrantLock is a little more complicated. Because the unlock operation must be performed in finally, if it is not finally unlocked, it is possible that the code has an exception and the lock is not released, and synchronized is released by the JVM.
So what are the excellent characteristics of ReentrantLock?
1.1.1 Re-entry
Single thread can be repeatedly entered, but must be repeatedly exited
lock.lock();lock.lock();try{ i++; } finally{ lock.unlock(); lock.unlock();}Since ReentrantLock is a reentrant lock, you can get the same lock repeatedly, which has a lock-related acquisition counter. If a thread that owns the lock gets the lock again, the acquisition counter is increased by 1, and the lock needs to be released twice to obtain the real release (reentrant lock). This mimics the semantics of synchronized; if the thread enters a synchronized block protected by the monitor that the thread already has, the thread is allowed to continue. When the thread exits the second (or subsequent) synchronized block, the lock is not released. The lock is only released when the thread exits the first synchronized block protected by the monitor that it enters.
public class Child extends Father implements Runnable{ final static Child child = new Child();//In order to ensure that locks unique public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // Call other synchronized methods in your own class} private synchronized void doAnotherThing() { super.doSomething(); // Call the synchronized method of the parent class System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); }}class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); }}We can see that a thread enters a different synchronized method and will not release the locks obtained before. So the output is still sequentially. Therefore synchronized is also a reentrant lock
Output:
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...
1.1.2. Interruptible
Unlike synchronized, ReentrantLock is responsive to interrupts. Interrupt related knowledge view [High concurrency Java 2] Multithreading basics
Ordinary lock.lock() cannot respond to interrupts, lock.lockInterruptibly() can respond to interrupts.
We simulate a deadlock scene and then use interrupts to deal with the deadlock
package test;import java.lang.management.ManagementFactory;import java.lang.management.ThreadInfo;import java.lang.management.ThreadMXBean;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); try { Thread.sleep(500); } catch (Exception e) { // TODO: handle exception } lock1.lockInterruptibly(); } } catch (Exception e) { // TODO: handle exception } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":thread exit"); } } public static void main(String[] args) throws InterruptedException { Test t1 = new Test(1); Test t2 = new Test(2); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t2); thread1.start(); thread2.start(); Thread.sleep(1000); //DeadlockChecker.check(); } static class DeadlockChecker { private final static ThreadMXBean mbean = ManagementFactory .getThreadMXBean(); final static Runnable deadlockChecker = new Runnable() { @Override public void run() { // TODO Auto-generated method stub while (true) { long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null) { ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet()) { for (int i = 0; i < threadInfos.length; i++) { if(t.getId() == threadInfos[i].getThreadId()) { t.interrupt(); } } } } } try { Thread.sleep(5000); } catch (Exception e) { // TODO: handle exception } } } } }; public static void check() { Thread t = new Thread(deadlockChecker); t.setDaemon(true); t.start(); } }}The above code may cause deadlocks, thread 1 gets lock1, thread 2 gets lock2, and then each other wants to get each other's locks.
We use jstack to view the situation after running the above code
A deadlock was indeed discovered.
The DeadlockChecker.check(); method is used to detect deadlocks and then interrupt the deadlock thread. After interruption, the thread exits normally.
1.1.3. Time-limited
If the timeout cannot obtain the lock, it will return false and will not wait permanently to form a dead lock.
Use lock.tryLock(long timeout, TimeUnit unit) to implement time-limitable locks, with parameters being time and units.
Let me give you an example to illustrate that the time can be limited:
package test;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { Thread.sleep(6000); } else { System.out.println("get lock failed"); } } catch (Exception e) { } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { Test t = new Test(); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t1.start(); t2.start(); }}Use two threads to compete for a lock. When a thread acquires the lock, sleep 6 seconds, and each thread only tries to obtain the lock for 5 seconds.
So there must be a thread that cannot obtain the lock. If you cannot get it, you will directly exit.
Output:
get lock failed
1.1.4. Fair lock
How to use:
public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);
Locks in general are unfair. It is not necessarily possible that the thread that comes first can get the lock first, but the thread that comes later will get the lock later. Unfair locks can cause hunger.
A fair lock means that this lock can ensure that the thread comes first and gets the lock first. Although fair locks will not cause hunger, the performance of fair locks will be much worse than that of non-fair locks.
1.2 Condition
The relationship between Condition and ReentrantLock is similar to synchronized and Object.wait()/signal()
The await() method will make the current thread wait and release the current lock. When the signal() is used in other threads or the signalAll() method, the thread will regain the lock and continue to execute. Or when the thread is interrupted, you can also jump out of the waiting. This is very similar to the Object.wait() method.
The awaitUninterruptibly() method is basically the same as the await() method, but it will not wait for the response interrupt during the process. The singal() method is used to wake up a thread waiting. The relative singalAll() method will wake up all threads waiting. This is very similar to the Objct.notify() method.
I won't introduce it in detail here. Let me give you an example to illustrate:
package test;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { Test t = new Test(); Thread thread = new Thread(t); thread.start(); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); }}The above example is very simple. Let a thread await and let the main thread wake it up. condition.await()/signal can only be used after obtaining the lock.
1.3.Semaphore
For locks, it is mutually exclusive. It means that as long as I get the lock, no one can get it again.
For Semaphore, it allows multiple threads to enter the critical section at the same time. It can be considered as a shared lock, but the shared limit is limited. After the limit is used up, other threads that have not obtained the limit will still block outside the critical area. When the amount is 1, it is equivalent to lock
Here is an example:
package test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class Test implements Runnable{ final Semaphore semaphore = new Semaphore(5); @Override public void run() { try { semaphore.acquire(); Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + " done"); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++) { executorService.submit(t); } }}There is a thread pool with 20 threads, and each thread goes to Semaphore's license. There are only 5 licenses for Semaphore. After running, you can see that 5 are output in batches, batches are output.
Of course, one thread can also apply for multiple licenses at once
public void acquire(int permits) throws InterruptedException
1.4 ReadWriteLock
ReadWriteLock is a lock that distinguishes functions. Reading and writing are two different functions: Read-reading is not mutually exclusive, read-write is mutually exclusive, and write-write is mutually exclusive.
This design increases the concurrency and ensures data security.
How to use:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
For detailed examples, you can view the Java implementation of producer and consumer problems and reader and writer problems, and I won't expand it here.
1.5 CountDownLatch
A typical scenario for a countdown timer is a rocket launch. Before the rocket is launched, in order to ensure that everything is foolproof, inspections of various equipment and instruments are often carried out. The engine can only ignite after all inspections are completed. This scenario is very suitable for CountDownLatch. It can make the ignition thread wait for all check threads to complete before executing it
How to use:
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
Schematic diagram:
A simple example:
package test;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test implements Runnable{ static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override public void run() { try { Thread.sleep(2000); System.out.println("complete"); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.execute(t); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); }}The main thread must wait for all 10 threads to execute before outputting "end".
1.6 CyclicBarrier
Similar to CountDownLatch, it is also waiting for some threads to complete before executing them. The difference with CountDownLatch is that this counter can be used repeatedly. For example, suppose we set the counter to 10. Then after gathering the first batch of 10 threads, the counter will return to zero, and then gathering the next batch of 10 threads
How to use:
public CyclicBarrier(int parties, Runnable barrierAction)
barrierAction is the action the system will perform when the counter counts once.
await()
Schematic diagram:
Here is an example:
package test;import java.util.concurrent.CyclicBarrier;public class Test implements Runnable{ private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { // Wait for all soldiers to arrive cyclic.await(); dowork(); // Wait for all soldiers to complete their work cyclic.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void dowork() { // TODO Auto-generated method stub try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println(soldier + ": done"); } public static class BarrierRun implements Runnable { boolean flag; int n; public BarrierRun(boolean flag, int n) { super(); this.flag = flag; this.n = n; } @Override public void run() { if (flag) { System.out.println(n + "task completion"); } else { System.out.println(n + "set completion"); flag = true; } } } public static void main(String[] args) { final int n = 10; Thread[] threads = new Thread[n]; boolean flag = false; CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n)); System.out.println("set"); for (int i = 0; i < n; i++) { System.out.println(i + "report"); threads[i] = new Thread(new Test("soldier" + i, barrier)); threads[i].start(); } }}Print result:
gather
0 reports
1 report
2 reports
3 reports
4 reports
5 reports
6 reports
7 reports
8 reports
9 reports
10 sets complete Soldier 5: done
Soldier 7: done
Soldier 8: done
Soldier 3: done
Soldier 4: done
Soldier 1: done
Soldier 6: done
Soldier 2: done
Soldier 0: done
Soldier 9: done
10 tasks completed
1.7 LockSupport
Provide thread blocking primitive
Similar to suspend
LockSupport.park();
LockSupport.unpark(t1);
Compared with suspend, it is not easy to cause thread freezing.
The idea of LockSupport is somewhat similar to Semaphore. It has an internal license. It takes away this license when parked and applies for this license when unpark. Therefore, if unpark is before park, thread freezing will not occur.
The following code is the suspend sample code in the [High Concurrency Java 2] multi-threading foundation. A deadlock occurs when using suspend.
package test;import java.util.concurrent.locks.LockSupport; public class Test{ static Object u = new Object(); static TestSuspendThread t1 = new TestSuspendThread("t1"); static TestSuspendThread t2 = new TestSuspendThread("t2"); public static class TestSuspendThread extends Thread { public TestSuspendThread(String name) { setName(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); //Thread.currentThread().suspend(); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start();// t1.resume();// t2.resume(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); }}However, using LockSupport will not cause deadlocks.
in addition
park() can respond to interrupts, but does not throw exceptions. The result of the interrupt response is that the return of the park() function can obtain the interrupt flag from Thread.interrupted().
There are many places in JDK that use park, of course, LockSupport implementation is also implemented using unsafe.park().
public static void park() {
unsafe.park(false, 0L);
}
1.8 ReentrantLock implementation
Let’s introduce the implementation of ReentrantLock. The implementation of ReentrantLock is mainly composed of three parts:
ReentrantLock's parent class will have a state variable to represent the synchronous state.
/** * The synchronization state. */ private volatile int state;
Set state to acquire the lock through CAS operation. If set to 1, the lock holder is given to the current thread
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }If the lock is not successful, an application will be made
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }First, try tryAcquire after applying, because another thread may have released the lock.
If you still haven't applied for the lock, add Waiter, which means adding yourself to the waiting queue
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }During this period, there will be many attempts to apply for a lock, and if you still can't apply, you will be hung up.
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }Similarly, if the lock is released and then unpark is not discussed in detail here.
2. Concurrent container and typical source code analysis
2.1 ConcurrentHashMap
We know that HashMap is not a thread-safe container. The easiest way to make HashMap thread-safe is to use
Collections.synchronizedMap, it is a wrapper for HashMap
public static Map m=Collections.synchronizedMap(new HashMap());
Similarly, for List, Set also provides similar methods.
However, this method is only suitable for cases where the concurrency amount is relatively small.
Let's take a look at the implementation of synchronizedMap
private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { if (m==null) throw new NullPointerException(); this.m = m; mutex = this; } SynchronizedMap(Map<K,V> m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) { return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} }It wraps the HashMap inside and then synchronized every operation of the HashMap.
Since each method acquires the same lock (mutex), this means that operations such as put and remove are mutually exclusive, greatly reducing the amount of concurrency.
Let's see how ConcurrentHashMap is implemented
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }There is a Segment segment inside the ConcurrentHashMap, which divides the large HashMap into several segments (small HashMap), and then hash the data on each segment. In this way, the hash operations of multiple threads on different segments must be thread-safe, so you only need to synchronize the threads on the same segment, which realizes the separation of locks and greatly increases the concurrency.
It will be more troublesome when using ConcurrentHashMap.size because it needs to count the data sum of each segment. At this time, you need to add locks to each segment and then do data statistics. This is a small disadvantage after separating the lock, but the size method should not be called at high frequency.
In terms of implementation, we do not use synchronized and lock.lock but trylock as much as possible. At the same time, we have also made some optimizations in the implementation of HashMap. I won't mention it here.
2.2 BlockingQueue
BlockingQueue is not a high-performance container. But it is a very good container for sharing data. It is a typical implementation of producers and consumers.
Schematic diagram:
For details, you can check the Java implementation of producer and consumer problems and reader and writer problems.