Technical background of thread pool
In object-oriented programming, creating and destroying objects is time-consuming, because creating an object requires memory resources or other more resources. This is even more so in Java, where the virtual machine will try to track each object so that it can be garbage collected after the object is destroyed.
Therefore, one way to improve the efficiency of service programs is to minimize the number of times of creating and destroying objects, especially some very resource-consuming objects creation and destruction. How to use existing objects to serve is a key issue that needs to be solved. In fact, this is the reason why some "pooling resources" technologies are produced.
For example, many common components commonly seen in Android are generally inseparable from the concept of "pool", such as various image loading libraries and network request libraries. Even if Meaasge in Android's messaging mechanism uses Meaasge.obtain(), it is the object in the Meaasge pool used, so this concept is very important. The thread pooling technology introduced in this article also conforms to this idea.
Advantages of thread pool:
1. Reuse threads in the thread pool to reduce the performance overhead caused by object creation and destruction;
2. It can effectively control the maximum concurrency number of threads, improve system resource utilization, and avoid excessive resource competition and avoid blockage;
3. Ability to perform simple management of multiple threads, making the use of threads simple and efficient.
Thread Pool Framework Executor
The thread pool in Java is implemented through the Executor framework. The Executor framework includes classes: Executor, Executors, ExecutorService, ThreadPoolExecutor, Callable and Future, FutureTask usage, etc.
Executor: There is only one method for all thread pool interfaces.
public interface Executor { void execute(Runnable command); }ExecutorService: Adding the behavior of Executor is the most direct interface of Executor implementation class.
Executors: Provides a series of factory methods for creating the thread pool, and the returned thread pools all implement the ExecutorService interface.
ThreadPoolExecutor: The specific implementation class of thread pools. The various thread pools generally used are implemented based on this class. The construction method is as follows:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}corePoolSize: The number of core threads in the thread pool, and the number of threads running in the thread pool will never exceed corePoolSize, and can survive by default. You can set allowCoreThreadTimeOut to True, the number of core threads is 0, and keepAliveTime controls the timeout time of all threads.
maximumPoolSize: the maximum number of threads allowed by the thread pool;
keepAliveTime: refers to the timeout time when the idle thread ends;
unit: is an enum, representing the unit of keepAliveTime;
workQueue: represents the BlockingQueue<Runnable queue that stores the task.
BlockingQueue: BlockingQueue is a tool mainly used to control thread synchronization under java.util.concurrent. If the BlockQueue is empty, the operation of fetching something from the BlockingQueue will be blocked and will not be awakened until the BlockingQueue enters the thing. Similarly, if the BlockingQueue is full, any operation that attempts to store things in it will be blocked and will not be awakened to continue operations until there is space in the BlockingQueue. Blocking queues are often used in scenarios of producers and consumers. Producers are threads that add elements to queues, and consumers are threads that take elements from queues. A blocking queue is the container where the producer stores elements, and the consumer only takes elements from the container. Specific implementation classes include LinkedBlockingQueue, ArrayBlockingQueued, etc. Generally, the internal blocking and wake-up are achieved through Lock and Condition (learning and use of display locks and Conditions).
The working process of thread pool is as follows:
When the thread pool was first created, there was no thread inside. The task queue is passed in as a parameter. However, even if there are tasks in the queue, the thread pool will not execute them immediately.
When adding a task by calling execute() method, the thread pool will make the following judgment:
If the number of running threads is less than corePoolSize, create a thread to run this task immediately;
If the number of running threads is greater than or equal to corePoolSize, then put this task in the queue;
If the queue is full at this time and the number of running threads is less than maximumPoolSize, then you still need to create a non-core thread to run the task immediately;
If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, the thread pool will throw an exception RejectExecutionException.
When a thread completes a task, it takes the next task from the queue to execute.
When a thread has nothing to do, and after a certain period of time (keepAliveTime), the thread pool will judge that if the number of threads currently running is greater than corePoolSize, then the thread will be stopped. So after all tasks of the thread pool are completed, it will eventually shrink to the size of corePoolSize.
Creation and use of thread pools
The generation thread pool uses the static method of tool class Executors. The following are several common thread pools.
SingleThreadExecutor: Single background thread (the buffer queue is unbounded)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }Create a single thread pool. This thread pool has only one core thread working, which is equivalent to a single thread performing all tasks in serial. If this unique thread ends because of the exception, then there will be a new thread to replace it. This thread pool ensures that the execution order of all tasks is executed in the order of the task submission.
FixedThreadPool: Only the thread pool of core threads, with fixed size (the buffer queue is unbounded).
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Create a fixed-size thread pool. Each time a task is submitted, a thread is created until the thread reaches the maximum size of the thread pool. The thread pool size remains the same once it reaches its maximum value. If a thread ends due to an execution exception, the thread pool will add a new thread.
CachedThreadPool: Unbounded thread pool, can perform automatic thread recycling.
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }If the size of the thread pool exceeds the thread required to process the task, some of the idle threads (no task execution in 60 seconds) will be recycled. When the number of tasks increases, this thread pool can intelligently add new threads to process the task. This thread pool does not limit the thread pool size, which depends entirely on the maximum thread size that the operating system (or JVM) can create. SynchronousQueue is a blocking queue with buffer of 1.
ScheduledThreadPool: A core thread pool with fixed core thread pool, unlimited size. This thread pool supports the need to perform tasks periodically and periodically.
public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }Create a thread pool that performs tasks periodically. If idle, the non-core thread pool will be recycled within DEFAULT_KEEPALIVEMILLIS time.
There are two most commonly used methods of submitting tasks in thread pools:
execute:
ExecutorService.execute(Runnable runable);
Submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Results);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
The same applies to the implementation of submit(Callable callable) and the same applies to submit(Runnable runnable).
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); FutureTask<T> ftask = newTaskFor(task); execute(ftask); return ftask;}It can be seen that submit is a task that returns a result, and will return a FutureTask object, so that the result can be obtained through the get() method. The final call to submit is also execute(Runnable runable). submit only encapsulates the Callable object or Runnable into a FutureTask object. Because FutureTask is a Runnable, it can be executed in execute. For how Callable objects and Runnable are encapsulated into FutureTask objects, see the use of Callable, Future, FutureTask.
The principle of thread pool implementation
If you only talk about the use of thread pools, then this blog has no great value. At best, it is just a process of familiarizing yourself with the Executor-related API. The implementation process of thread pool does not use the Synchronized keyword, but uses Volatile, Lock and synchronous (blocking) queues, Atomic related classes, FutureTask, etc., because the latter has better performance. The understanding process can learn the idea of concurrent control in the source code well.
The advantages of thread pooling mentioned at the beginning are summarized as follows:
Thread reuse
Control the maximum number of concurrencies
Manage threads
1. Thread multiplexing process
To understand the principle of thread multiplexing, you should first understand the thread life cycle.
During the life cycle of a thread, it needs to go through five states: New, Runnable, Running, Blocked and Dead.
Thread creates a new thread through new. This process is to initialize some thread information, such as thread name, id, group to which the thread belongs, etc., which can be considered to be just an ordinary object. After calling Thread's start(), the Java virtual machine creates a method call stack and program counter for it, and at the same time, hasBeenStarted to true, and then there will be an exception when calling the start method.
The thread in this state does not start running, but only means that the thread can run. As for when the thread starts running, it depends on the scheduler in the JVM. When the thread obtains the CPU, the run() method will be called. Don't call Thread's run() method yourself. Then, switch between ready-run-blocking according to the CPU scheduling, until the run() method ends or other methods stop the thread and enter the dead state.
Therefore, the principle of implementing thread reuse should be to keep the thread alive (ready, running or blocking). Next, let’s take a look at how ThreadPoolExecutor implements thread reuse.
The main Worker class in ThreadPoolExecutor controls thread reuse. Take a look at the simplified code of the Worker class, so it is easy to understand:
private final class Worker implements Runnable {final Thread thread;Runnable firstTask;Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}final void runWorker(Worker w) {Runnable task = w.firstTask;w.firstTask = null;while (task != null || (task = getTask()) != null){task.run();}}Worker is a Runnable and has a thread at the same time. This thread is the thread to be opened. When creating a new Worker object, a new Thread object is created at the same time, and the Worker itself is passed into TThread as a parameter. In this way, when the start() method of Thread is called, the run() method of Worker is actually running. Then in runWorker(), there is a while loop, which keeps getting the Runnable object from getTask() and executes it in sequence. How does getTask() get the Runnable object?
Still the simplified code:
private Runnable getTask() { if (some special cases) { return null; }Runnable r = workQueue.take();return r;}This workQueue is the BlockingQueue queue that stores tasks when initializing ThreadPoolExecutor. The queue stores the Runnable tasks to be executed. Because BlockingQueue is a blocking queue, BlockingQueue.take() gets empty, and enters the waiting state until a new object in BlockingQueue is added to wake up the blocked thread. Therefore, in general, the run() method of Thread will not end, but will continue to execute Runnable tasks from workQueue, which achieves the principle of thread reuse.
2. Control the maximum number of concurrencies
So when is Runnable put into workQueue? When is the Worker created? When is the Thread in Worker called start() to open a new thread to execute the Worker run() method? The above analysis shows that the runWorker() in Worker performs tasks one after another, serially, so how does concurrency manifest itself?
It is easy to think that you will do some of the above tasks when you execute (Runnable runnable). Let's see how it is done in execute.
execute:
Simplified code
public void execute(Runnable command) { if (command == null) throw new NullPointerException();int c = ctl.get();// Current number of threads < corePoolSizeif (workerCountOf(c) < corePoolSize) {// Directly start a new thread. if (addWorker(command, true))return;c = ctl.get();}// Number of active threads>= corePoolSize// runState is RUNNING && Queue not full if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// Check again whether it is RUNNING status// If the non-RUNNING status is removed from the workQueue and reject if (!isRunning(recheck) && remove(command))reject(command);// Use the strategy specified by the thread pool to reject the task// Two cases: // 1. Non-RUNNING state rejects new tasks // 2. The queue is full and failed to start a new thread (workCount > maximumPoolSize)} else if (!addWorker(command, false))reject(command);}addWorker:
Simplified code
private boolean addWorker(Runnable firstTask, boolean core) {int wc = workerCountOf(c);if (wc >= (core ? corePoolSize : maximumPoolSize)) {return false;}w = new Worker(firstTask);final Thread t = w.thread;t.start();}According to the code, let’s look at the above-mentioned situation of adding tasks during thread pool work:
* If the number of running threads is less than corePoolSize, create a thread to run this task immediately;
* If the number of running threads is greater than or equal to corePoolSize, then put this task in the queue;
* If the queue is full at this time and the number of running threads is less than maximumPoolSize, then you still need to create a non-core thread to run the task immediately;
* If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, the thread pool will throw an exception RejectExecutionException.
This is why Android's AsyncTask is executed in parallel and exceeds the maximum number of tasks and throws RejectExecutionException. For details, please refer to the latest version of AsyncTask source code interpretation and the dark side of AsyncTask
If a new thread is successfully created through addWorker, start() and use firstTask as the first task executed in run() in this worker.
Although the task of each worker is serial processing, if multiple workers are created, because they share a workQueue, they will be processed in parallel.
Therefore, the maximum concurrency number is controlled according to corePoolSize and maximumPoolSize. The general process can be represented by the figure below.
The above explanation and pictures can be well understood.
If you are engaged in Android development and are familiar with the principles of Handler, you may think that this picture is quite familiar. Some of the processes are very similar to those used by Handler, Looper, and Meaasge. Handler.send(Message) is equivalent to execute(Runnuble). The Meaasge queue maintained in Looper is equivalent to BlockingQueue. However, you need to maintain this queue by synchronization. The loop() function in Looper loops to take Meaasge from the Meaasge queue and the runWork() in Worker continuously gets Runnable from BlockingQueue.
3. Manage threads
Through the thread pool, we can manage thread reuse, control concurrency number, and destroy processes. The thread reuse and control concurrency have been discussed above, and the thread management process has been interspersed in it, which is easy to understand.
There is a ctl AtomicInteger variable in ThreadPoolExecutor. Two contents are saved through this variable:
The number of all threads. Each thread is in a state where the lower 29 bits of threads are stored and the higher 3 bits of runState is stored. Different values are obtained through bit operations.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//Get the thread's state private static int runStateOf(int c) {return c & ~CAPACITY;}//Get the number of Workers private static int workerCountOf(int c) {return c & CAPACITY;}//Get the number of Workers private static int workerCountOf(int c) {return c & CAPACITY;}//Term whether the thread is running private static boolean isRunning(int c) {return c < SHUTDOWN;}Here, the thread pool shutdown process is mainly analyzed by shutdown and shutdownNow(). First of all, the thread pool has five states to control task addition and execution. The following three main types are introduced:
RUNNING status: The thread pool is running normally, and can accept new tasks and process tasks in the queue;
SHUTDOWN status: No new tasks are accepted, but tasks in the queue will be executed;
STOP status: No new tasks are accepted anymore, and the task shutdown is not processed in the queue. This method will set runState to SHUTDOWN, and will terminate all idle threads, while the threads that are still working are not affected, so the task person in the queue will be executed.
The shutdownNow method sets runState to STOP. The difference between the shutdown method, this method will terminate all threads, so the tasks in the queue will not be executed.
Summary: Through the analysis of the ThreadPoolExecutor source code, we have a general understanding of the process of creating thread pools, adding tasks, and executing thread pools. If you are familiar with these processes, it will be easier to use thread pools.
The some of the concurrency control learned from it and the use of the task processing of producer-consumer models will be of great help to understand or solve other related problems in the future. For example, the Handler mechanism in Android, and the Messager queue in Looper is also OK to use a BlookQueue to handle it. This is the gain of reading the source code.
The above is the information sorting out the Java thread pool. We will continue to add relevant information in the future. Thank you for your support for this website!