1. Basic use of thread pools
1.1. Why do you need a thread pool?
In daily business, if we want to use multi-threading, we will create threads before the business starts, and destroy threads after the business ends. However, for business, the creation and destruction of threads have nothing to do with the business itself, and only cares about the tasks performed by the thread. Therefore, I hope to use as many CPUs as possible to perform tasks, rather than to create and destroy threads that are not related to business. The thread pool solves this problem. The function of the thread pool is to reuse threads.
1.2. What support does JDK provide for us
The related class diagrams in JDK are shown in the figure above.
Several special categories to be mentioned.
The Callable class is similar to the Runable class, but the difference is that Callable has a return value.
ThreadPoolExecutor is an important implementation of thread pools.
Executors is a factory class.
1.3. Use of thread pools
1.3.1. Types of thread pools
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}From a method perspective, it is obvious that FixedThreadPool, SingleThreadExecutor, and CachedThreadPool are different instances of ThreadPoolExecutor, but the parameters are different.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);} Let’s briefly describe the meaning of parameters in the ThreadPoolExecutor constructor.
In this way, looking at the FixedThreadPool mentioned above, the number of cores and maximum number of threads is the same, so that threads will not be created and destroyed during work. When the number of tasks is large and the threads in the thread pool cannot be satisfied, the task will be saved to LinkedBlockingQueue, and the size of LinkedBlockingQueue is Integer.MAX_VALUE. This means that the continuous addition of tasks will make the memory consume more and more.
CachedThreadPool is different. Its core thread number is 0, the maximum number of storage is Integer.MAX_VALUE, and its blocking queue is SynchronousQueue, which is a special queue, and its size is 0. Since the number of core threads is 0, it is necessary to add the task to the SynchronousQueue. This queue can only succeed when one thread adds data from it and another thread gets data from it. Adding data to this queue alone will return a failure. When the return fails, the thread pool starts to expand the thread, which is why the number of threads in CachedThreadPool is not fixed. When the thread is not used for 60s, the thread is destroyed.
1.4. Small examples of thread pool usage
1.4.1. Simple thread pool
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPoolDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + "Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask myTask = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { es.submit(myTask); } }} Since newFixedThreadPool(5) is used, but 10 threads are started, 5 are executed at a time, and it is obvious that the thread reuse is seen. ThreadId is repeated, that is, the first 5 tasks and the last 5 tasks are executed by the same batch of threads.
What is used here
es.submit(myTask);
There is also a way to submit:
es.execute(myTask);
The difference is that submit will return a Future object, which will be introduced later.
1.4.2.ScheduledThreadPool
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPoolDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); //If the previous task has not been completed, the dispatch will not start. ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()/1000); } catch (Exception e) { // TODO: handle exception } } }, 0, 2, TimeUnit.SECONDS);//Execute after starting 0 seconds, and then execute once every 2 seconds in a cycle }}Output:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Since the task execution takes 1 second, the task scheduling must wait for the previous task to complete. That is, every 2 seconds here means that a new task will be started 2 seconds after the previous task is completed.
2. Extend and enhance thread pool
2.1. Callback interface
There are some callback APIs in the thread pool to provide us with extended operations.
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("Prepare to execute"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("Execution completed"); } @Override protected void terminated() { System.out.println("Thread pool exit"); } };We can implement the beforeExecute, afterExecute, and terminated methods of ThreadPoolExecutor to implement log management or other operations before and after thread execution, thread pool exit.
2.2. Rejection strategy
Sometimes, the tasks are very heavy, resulting in too much load on the system. As mentioned above, when the number of tasks increases, all tasks will be placed in the blocking queue of FixedThreadPool, resulting in too much memory consumption and eventually memory overflow. Such situations should be avoided. So when we find that the number of threads exceeds the maximum number of threads, we should give up some tasks. When discarding, we should write down the task instead of throwing it away directly.
There is another constructor in ThreadPoolExecutor.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } We will introduce threadFactory later.
The handler rejects the implementation of the policy, which will tell us what to do if the task cannot be executed.
There are a total of the above 4 strategies.
AbortPolicy: If the task cannot be accepted, an exception is thrown.
CallerRunsPolicy: If the task cannot be accepted, let the calling thread complete.
DiscardOldestPolicy: If the task cannot be accepted, the oldest task will be discarded and maintained by a queue.
DiscardPolicy: If the task cannot be accepted, the task will be discarded.
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is discard"); } }); Of course, we can also implement the RejectedExecutionHandler interface ourselves to define the rejection policy ourselves.
2.3. Customize ThreadFactory
I have just seen that threadFactory can be specified in the constructor of ThreadPoolExecutor.
The threads in the thread pool are all created by the thread factory, and we can customize the thread factory.
Default thread factory:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }3. ForkJoin
3.1. Thoughts
It is the idea of dividing and conquering.
fork/join is similar to MapReduce algorithm. The difference between the two is: Fork/Join is divided into small tasks only when necessary, such as if the task is very large, while MapReduce always starts to perform the first step for segmentation. It seems that Fork/Join is more suitable for a thread level within a JVM, while MapReduce is suitable for distributed systems.
4.2.Using the interface
RecursiveAction: No return value
RecursiveTask: There is a return value
4.3. Simple example
import java.util.ArrayList;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class CountTask extends RecursiveTask<Long>{ private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { super(); this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if(canCompute) { for (long i = start; i <= end; i++) { sum = sum + i; } }else { //Split into 100 small tasks long step = (start + end)/100; ArrayList<CountTask> subTasks = new ArrayList<CountTask>(); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if(lastOne > end ) { lastOne = end; } CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; subTasks.add(subTask); subTask.fork();//Push subtasks to thread pool} for (CountTask t : subTasks) { sum += t.join();//Waiting for all subtasks to end} } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(0, 200000L); ForkJoinTask<Long> result = forkJoinPool.submit(task); try { long res = result.get(); System.out.println("sum = " + res); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } }} The above example describes a task of summing up. Divide the accumulated tasks into 100 tasks, each task only performs a sum of the numbers, and after the final join, the sum calculated by each task is then accumulated.
4.4. Implementation elements
4.4.1.WorkQueue and ctl
Each thread will have a work queue
static final class WorkQueue
In the work queue, there will be a series of fields that manage threads.
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
int narrows; // number of steels
int hint; // steel index hint
short poolIndex; // index of this queue in pool
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
It should be noted here that there is a big difference between JDK7 and JDK8 in the implementation of ForkJoin. What we are introducing here is from JDK8. In the thread pool, sometimes not all threads are executing, some threads will be suspended, and those suspended threads will be stored in a stack. It is represented internally by a linked list.
nextWait will point to the next waiting thread.
The index index of the subscript in the poolIndex thread pool.
eventCount When initialized, eventCount is related to poolIndex. A total of 32 bits, the first bit indicates whether it is activated, and 15 bits indicates the number of times it has been suspended
eventCount, the rest represents poolIndex. Use one field to represent multiple meanings.
Workqueue WorkQueue is represented by ForkJoinTask<?>[] array. Top and base represent both ends of the queue, and the data is between these two.
Maintain ctl (64-bit long type) in ForkJoinPool
volatile long ctl;
* Field ctl is a long packed with:
* AC: Number of active running workers minus target parallelism (16 bits)
* TC: Number of total workers minus target parallelism (16 bits)
* ST: true if pool is terminated (1 bit)
* EC: the wait count of top waiting thread (15 bits)
* ID: poolIndex of top of Treiber stack of waiters (16 bits)
AC represents the active thread count minus the parallelism degree (probably the number of CPUs)
TC means the total number of threads minus parallelism
ST indicates whether the thread pool itself is activated
EC represents the number of suspended threads at the top waiting time
ID indicates the poolIndex waiting for thread at the top
It is obvious that ST+EC+ID is what we just called eventCount.
So why do you have to synthesize a variable with 5 variables? In fact, the capacity occupies about the same with 5 variables.
The readability of using a variable code will be much worse.
So why use a variable? In fact, this is the most clever thing, because these 5 variables are a whole. In multi-threading, if 5 variables are used, then when modifying one of the variables, how to ensure the integrity of the 5 variables. Then using a variable will solve this problem. If solved with locks, performance will be degraded.
Using a variable ensures the consistency and atomicity of the data.
The changes to ForkJoin squadron ctl are all done using CAS operations. As mentioned in the previous series of articles, CAS is a lock-free operation and has good performance.
Since CAS operations can only target one variable, this design is optimal.
4.4.2. Work theft
Next, we will introduce the workflow of the entire thread pool.
Each thread calls runWorker
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue for (int r = w.hint; scan(w, r) == 0; ) { r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } } The scan() function is to scan for tasks to be done.
r is a relatively random number.
private final int scan(WorkQueue w, int r) { WorkQueue[] ws; int m; long c = ctl; // for consistency check if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { for (int j = m + m + 1, ec = w.eventCount;;) { WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t; if ((q = ws[(r - j) & m]) != null && (b = q.base) - q.top < 0 && (a = q.array) != null) { long i = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null) { if (ec < 0) helpRelease(c, ws, w, q, b); else if (q.base == b && U.compareAndSwapObject(a, i, t, null)) { U.putOrderedInt(q, QBASE, b + 1); if ((b + 1) - q.top < 0) signalWork(ws, q); w.runTask(t); } } break; } else if (--j < 0) { if ((ec | (e = (int)c)) < 0) // inactive or terminating return awaitWork(w, c, ec); else if (ctl == c) { // try to inactivate and enqueue long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); w.nextWait = e; w.eventCount = ec | INT_SIGN; if (!U.compareAndSwapLong(this, CTL, c, nc)) w.eventCount = ec; // back out } break; } } } return 0; } Let's take a look at the scan method. One parameter of scan is WorkQueue. As mentioned above, each thread will have a WorkQueue, and the WorkQueue of multiple threads will be saved in workQueues. r is a random number. Use r to find a WorkQueue and have tasks to be done in WorkQueue.
Then, through the WorkQueue base, get the base offset.
b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..
Then get the last task through the offset and run this task
t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..
Through this rough analysis, we found that after the current thread calls the scan method, it will not execute the tasks in the current WorkQueue, but will obtain other WorkQueue tasks through a random number r. This is one of the main mechanisms of ForkJoinPool.
The current thread will not only focus on its own tasks, but will prioritize other tasks. This prevents hunger from happening. This prevents some threads from being unable to complete tasks in time due to stuck or other reasons, or a thread has a large amount of tasks, but other threads have nothing to do.
Then let's take a look at the runTask method
final void runTask(ForkJoinTask<?> task) { if ((currentSteal = task) != null) { ForkJoinWorkerThread thread; task.doExec(); ForkJoinTask<?>[] a = array; int md = mode; ++nsteals; currentSteal = null; if (md != 0) pollAndExecAll(); else if (a != null) { int s, m = a.length - 1; ForkJoinTask<?> t; while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) != null) { top = s; t.doExec(); } } if ((thread = owner) != null) // no need to do in finally clause thread.afterTopLevelExec(); } }There is an interesting name: currentSteal, the stolen task is indeed what I just explained.
task.doExec();
This task will be completed.
After completing other people's tasks, you will complete your own tasks.
Get the first task by getting the top
while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null){ top = s; t.doExec();}Next, use a graph to summarize the process of the thread pool just now.
For example, there are two threads T1 and T2. T1 will obtain the last task of T2 through the base of T2 (of course, it is actually the last task of a thread through a random number r), and T1 will also perform its first task through its own top. On the contrary, T2 will do the same.
The tasks you take for other threads start from base, and the tasks you take for yourself start from top. This reduces conflict
If no other tasks are found
else if (--j < 0) { if ((ec | (e = (int)c)) < 0) // inactive or terminating return awaitWork(w, c, ec); else if (ctl == c) { // try to inactivate and enqueue long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); w.nextWait = e; w.eventCount = ec | INT_SIGN; if (!U.compareAndSwapLong(this, CTL, c, nc)) w.eventCount = ec; // back out } break; } Then first, the value of ctl will be changed through a series of runs, nc will be obtained, and then the new value will be assigned with CAS. Then call awaitWork() to enter the waiting state (called unsafe's park method mentioned in the previous series of articles).
What we need to explain here is to change the ctl value. Here, first, AC-1 in ctl, and AC occupies the top 16 bits of ctl, so it cannot be directly -1, but instead achieves the effect of making the top 16 bits of ctl -1 through AC_UNIT (0x100000000000000) of the first 16 bits of ctl.
As mentioned earlier, the eventCount saves the poolIndex, and through the poolIndex and nextWait in WorkQueue, you can traverse all waiting threads.