Preface
In multi-threaded programming, it is unrealistic to assign a thread to each task, and the overhead and resource consumption of thread creation are very high. Thread pools came into being and became a powerful tool for us to manage threads. Through the Executor interface, Java provides a standard method to decouple the task submission process and execute process, and uses Runnable to represent the task.
Next, let’s analyze the implementation of the Java thread pool framework ThreadPoolExecutor.
The following analysis is based on JDK1.7
life cycle
In ThreadPoolExecutor , the upper 3 bits of CAPACITY are used to represent the running state, which are:
1.RUNNING: Receive new tasks and process tasks in the task queue
2.SHUTDOWN: Tasks that do not receive new tasks but handle task queues
3.STOP: No new tasks are received, no task queue is released, and all tasks in progress are interrupted at the same time
4.TIDYING: All tasks have been terminated, the number of worker threads is 0. When this state is reached, terminated() will be executed.
5.TERMINATED: Terminated() has been executed
State transition diagram
Atomic classes are used to represent state bits in ThreadPoolExecutor
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Thread pool model
Core parameters
corePoolSize: Minimum number of living worker threads (if allowCoreThreadTimeOut is set, then this value is 0)
maximumPoolSize: The maximum number of threads, limited by CAPACITY
keepAliveTime: The survival time of the corresponding thread, the time unit is specified by TimeUnit
workQueue: Work queue, storing tasks to be executed
RejectExecutionHandler: Reject policy, the maximum capacity of the thread pool will be triggered after the thread pool is full: the first three bits in CAPACITY are used as flag bits, that is, the maximum capacity of the worker thread is (2^29)-1
Four models
CachedThreadPool: A cacheable thread pool. If the current size of the thread pool exceeds the processing requirements, the idle thread will be recycled. When the demand increases, new threads can be added. There is no limit on the size of the thread pool.
FixedThreadPool: A fixed-size thread pool. When submitting a task, a thread is created until the maximum number of thread pools is reached. At this time, the size of the thread pool will no longer change.
SingleThreadPool: A single-threaded thread pool, which has only one worker thread to execute tasks. It can ensure that tasks are executed serially in the order in which they are in the queue. If this thread ends abnormally, a new thread will be created to execute tasks.
ScheduledThreadPool: A fixed-size thread pool and performs tasks in a delayed or timed manner, similar to Timer.
Execute task execution
Core logic:
1. The current number of threads < corePoolSize, directly open the new core thread to execute the task addWorker(command, true)
2. Current number of threads >= corePoolSize, and the task is successfully added to the work queue
1). Check whether the current state of the thread pool is in RUNNING
2). If not, the task is rejected
3). If so, determine whether the current number of threads is 0. If it is 0, add a worker thread.
3. Turn on the normal thread execution task addWorker(command, false), and refuse the task if it fails to start. From the above analysis, we can summarize the four stages of thread pool operation:
1).poolSize < corePoolSize and the queue is empty. A new thread will be created to process the submitted tasks.
2).poolSize == corePoolSize. At this time, the submitted task enters the work queue. The worker thread obtains the task execution from the queue. At this time, the queue is not empty and not full.
3).poolSize == corePoolSize and the queue is full. A new thread will also be created to process the submitted task, but poolSize < maxPoolSize
4).poolSize == maxPoolSize and the queue is full, the rejection policy will be triggered
Rejection Policy <br />We mentioned earlier that if a task cannot be executed, it will be rejected. RejectedExecutionHandler is the interface for handling rejected tasks. Here are four rejection strategies.
AbortPolicy: Default policy, terminate task, throw RejectedException
CallerRunsPolicy: Execute the current task on the caller thread without throwing exceptions
DiscardPolicy: Discard the policy, directly discard the task, and do not throw exceptions
Di scandOldersPolicy: Abandon the oldest task, execute the current task, and do not throw exceptions
Worker in thread pool
Worker inherits AbstractQueuedSynchronizer and Runnable. The former provides the worker with lock function, and the latter executes the main method of worker threads runWorker(Worker w) (snap task execution from task queue). Worker references are found in the workers collection and are protected by mainLock.
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
core function runWorker
The following is the simplified logic, note: the run of each worker thread executes the following functions
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { w.lock(); beforeExecute(wt, task); task.run(); afterExecute(task, thrown); w.unlock(); } processWorkerExit(w, completedAbruptly);} 1. Get the task from getTask()
2. Lock the worker
3. Execute beforeExecute(wt, task), which is the extension method provided by ThreadPoolExecutor to subclasses
4. Run the task. If the worker has configured the first task, the first task will be executed first and only once.
5. Execute afterExecute(task, thrown);
6. Unlock the worker
7. If the obtained task is null, close the worker
Get the task getTask
The task queue inside the thread pool is a blocking queue, which is passed in during construction.
private final BlockingQueue<Runnable> workQueue;
getTask() gets the task from the task queue, supports blocking and timeout waiting for tasks. Four situations will cause null to be returned and the worker is closed.
1. The number of existing threads exceeds the maximum number of threads
2. The thread pool is in STOP state
3. The thread pool is in SHUTDOWN state and the work queue is empty
4. Thread waiting task timeout, and the number of threads exceeds the number of retained threads
Core logic: Timed out or blocking the waiting task on the blocking queue. The waiting task timed out will cause the worker thread to be closed.
timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Waiting for a task will time out in two cases:
1. Allow core threads to wait for timeout, that is, allowCoreThreadTimeOut(true)
2. The current thread is a normal thread, at this time wc > corePoolSize
The work queue uses BlockingQueue, so I won’t expand it here. I will write a detailed analysis later.
Summarize
ThreadPoolExecutor is based on the producer-consumer model. The operation of submitting tasks is equivalent to the producer, and the thread of executing tasks is equivalent to the consumer.
Executors provides four methods to construct thread pool model based on ThreadPoolExecutor. In addition, we can directly inherit ThreadPoolExecutor and rewrite beforeExecute and afterExecute methods to customize the thread pool task execution process.
Using bounded queues or unbounded queues needs to be considered according to the specific situation, and the size of the work queue and the number of threads also need to be carefully considered.
The rejection policy is recommended to use CallerRunsPolicy, which does not abandon the task or throw an exception, but instead falls back the task to the caller thread for execution.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.