2. Introduction
Multithreading technology mainly solves the problem of multiple threads execution in a processor unit. It can significantly reduce the idle time of the processor unit and increase the throughput capability of the processor unit. However, the overhead of frequent thread creation is very large. So how to reduce this part of the overhead, you need to consider using a thread pool. A thread pool is a thread container, which only executes a rated number of threads at a time. The thread pool is used to manage these rated number of threads.
3. Class structure diagram involving thread pool
Among them, the main one for us to use is the ThreadPoolExecutor class.
4. How to create a thread pool
We generally have the following methods to create thread pools:
1. Use Executors factory class
Executors mainly provides the following methods to create thread pools:
Let’s take a look at the usage example below:
1) newFixedThreadPool (fixed thread pool)
public class FixedThreadPool { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5);// Create a thread pool with a fixed size of 5 for (int i = 0; i < 10; i++) { pool.submit(new MyThread()); } pool.shutdown(); } } public class MyThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + "Execution..."); } }The test results are as follows:
pool-1-thread-1 is executing. . .
pool-1-thread-2 is executing. . .
pool-1-thread-3 is executing. . .
pool-1-thread-2 is executing. . .
pool-1-thread-3 is executing. . .
pool-1-thread-2 is executing. . .
pool-1-thread-2 is executing. . .
pool-1-thread-3 is executing. . .
pool-1-thread-5 is being executed. . .
pool-1-thread-4 is executing. . .
Fixed-size thread pool: Create a thread every time a task is submitted, until the thread reaches the maximum size of the thread pool. The size of the thread pool will remain unchanged once it reaches its maximum value. If a thread ends due to an execution exception, the thread pool will add a new thread.
2) newSingleThreadExecutor (single thread pool)
public class SingleThreadPool { public static void main(String[] args) { ExecutorService pool=Executors.newSingleThreadExecutor();//Create a single thread pool for(int i=0;i<100;i++){ pool.submit(new MyThread()); } pool.shutdown(); } }The test results are as follows:
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
pool-1-thread-1 is executing. . .
Single-threaded thread pool: This thread pool has only one thread working, which means that a single-threaded serial execution of all tasks. If this unique thread ends because of the exception, then there will be a new thread to replace it. This thread pool ensures that the execution order of all tasks is executed in the order of the task submission.
3) newScheduledThreadPool
public class ScheduledThreadPool { public static void main(String[] args) { ScheduledExecutorService pool=Executors.newScheduledThreadPool(6); for(int i=0;i<10000;i++){ pool.submit(new MyThread()); } pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS); pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS); pool.shutdown(); } }The test results are as follows:
pool-1-thread-1 is executing. . .
pool-1-thread-6 is executing. . .
pool-1-thread-5 is being executed. . .
pool-1-thread-4 is executing. . .
pool-1-thread-2 is executing. . .
pool-1-thread-3 is executing. . .
pool-1-thread-4 is executing. . .
pool-1-thread-5 is being executed. . .
pool-1-thread-6 is executing. . .
pool-1-thread-1 is executing. . .
………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………………
pool-1-thread-4 is executing. . .
pool-1-thread-1 is executing. . .
The last two threads of the test result only start executing after delaying 1S. This thread pool supports the requirement of timing and periodic execution of tasks
4) newCachedThreadPool (cacheable thread pool)
public class CachedThreadPool { public static void main(String[] args) { ExecutorService pool=Executors.newCachedThreadPool(); for(int i=0;i<100;i++){ pool.submit(new MyThread()); } pool.shutdown(); } }The test results are as follows:
pool-1-thread-5 is being executed. . .
pool-1-thread-7 is executing. . .
pool-1-thread-5 is being executed. . .
pool-1-thread-16 is in execution. . .
pool-1-thread-17 is executing. . .
pool-1-thread-16 is in execution. . .
pool-1-thread-5 is being executed. . .
pool-1-thread-7 is executing. . .
pool-1-thread-16 is in execution. . .
pool-1-thread-18 is executing. . .
pool-1-thread-10 is executing. . .
Cacheable thread pool: If the size of the thread pool exceeds the thread required to process the task, some idle threads (no task execution in 60 seconds) will be recycled. When the number of tasks increases, this thread pool can intelligently add new threads to handle the task. This thread pool does not limit the thread pool size, which depends entirely on the maximum thread size that the operating system (or JVM) can create.
The official suggests that programmers use the more convenient Executors factory methods Executors.newCachedThreadPool() (unbounded thread pool, which can perform automatic thread recycling), Executors.newFixedThreadPool(int) (fixed-size thread pool) Executors.newSingleThreadExecutor() (single background thread). These thread pools are predefined by default configuration for most usage scenarios.
2. Inherit the ThreadPoolExecutor class and copy the constructor method of the parent class.
Before introducing this method, let’s analyze the previous few underlying codes for creating thread pools?
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } }From the underlying code of the Executors factory class, we can see that the methods provided by the factory class to create thread pools are actually implemented by constructing ThreadPoolExecutor. The ThreadPoolExecutor constructor method code is as follows:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }Then, let’s talk about the ThreadPoolExecutor constructor method. In this construction method, there are mainly the following parameters:
corePoolSize--The number of threads saved in the pool, including free threads.
maximumPoolSize – the maximum number of threads allowed in the pool.
keepAliveTime--When the number of threads is greater than corePoolSize, this is the longest time for the idle thread to wait for a new task.
Unit-- keepAliveTime parameter time unit.
workQueue--The queue used to keep tasks before execution. This queue only maintains Runnable tasks submitted by the execute method.
threadFactory--The factory used by executors to create new threads.
Handler--The handler used when execution is blocked due to the thread scope and queue capacity.
Next, let’s talk about the relationship between these parameters. When the thread pool is just created, there are no threads in the thread pool (note that it is not that a certain number of threads are created as soon as the thread pool is created). When the execute() method is called to add a task, the thread pool will make the following judgment:
1) If the number of threads currently running is less than corePoolSize, then create a new thread immediately to perform this task.
2) If the number of threads currently running is greater than or equal to corePoolSize, then this task will be put into the queue.
3) If the thread pool queue is full, but the number of running threads is less than maximumPoolSize, a new thread will still be created to perform this task.
4) If the queue is full and the number of currently running threads is greater than or equal to maximumPoolSize, the thread pool will handle the current task based on the rejection policy.
5) When a task is executed, the thread will take the next task from the queue to execute. If there is no task to be executed in the queue, then the thread will be idle. If the survival time of keepAliveTime exceeds, the thread will be recycled by the thread pool (Note: Recycling threads are conditional. If the number of threads currently running is greater than corePoolSize, the thread will be destroyed. If it is not greater than corePoolSize, the thread will not be destroyed. The number of threads must be kept within the number of corePoolSize). Why is it not that the thread is recycled as soon as it is idle, but that it needs to wait until it exceeds keepAliveTime before the thread is recycled? The reason is very simple: because the creation and destruction of threads consume a lot, and it cannot be frequently created and destroyed. After exceeding keepAliveTime, it is found that this thread is indeed not used, and it will be destroyed. In this case, unit represents the time unit of keepAliveTime, and the definition of unit is as follows:
public enum TimeUnit { NANOSECONDS { // keepAliveTime in nanoseconds}, MICROSECONDS { // keepAliveTime in microseconds}, MILLISECONDS { // keepAliveTime in milliseconds}, SECONDS { // keepAliveTime in seconds}, MINUTES { // keepAliveTime in minutes}, HOURS { // keepAliveTime in hours}, DAYS { // keepAliveTime in days}; Let’s analyze the source code below. For the above situations, the source codes mainly involved are the following:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } In fact, this code is very simple. It mainly describes that if the current thread pool is smaller than corePoolSize, a new thread is created to handle the task.
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }The above code describes that if the number of current thread pools is less than maximumPoolSize, a thread will be created to execute the task.
5. The queue of thread pool
There are 3 types of thread pool queues:
Direct commit: The default option for work queues is SynchronousQueue, which submits tasks directly to threads without keeping them. Here, if there is no thread available to run the task immediately, attempting to queue the task will fail, thus constructing a new thread. This policy avoids locks when handling request sets that may have internal dependencies. Direct submissions usually require unbounded maximumPoolSizes to avoid rejecting newly submitted tasks. This strategy allows unbounded threads to have the possibility of growth when commands arrive continuously with an average that the queue can handle.
Unbounded Queue: Using an unbounded queue (for example, LinkedBlockingQueue without predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. In this way, the created thread will not exceed corePoolSize. (The value of maximumPoolSize is invalid.) When each task is completely independent of other tasks, that is, the task execution does not affect each other, it is suitable for unbounded queues; for example, in a web page server. This queuing can be used to handle transient burst requests, and this strategy allows unbounded threads to have the possibility of growth when commands arrive continuously exceeding the average that the queue can handle.
Bounded Queue: When using limited maximumPoolSizes, bounded queues (such as ArrayBlockingQueue) help prevent resource exhaustion, but may be difficult to adjust and control. Queue size and maximum pool size may need to trade off each other: Using large queues and small pools can minimize CPU usage, operating system resources, and context switching overhead, but can result in manual reduction in throughput. If tasks are frequently blocked (for example, if they are I/O boundaries), the system may schedule time for more threads than you permit. Using small queues usually requires a larger pool size and has a higher CPU usage, but may encounter unacceptable scheduling overhead, which can also reduce throughput.
Let’s talk about the thread pool queue below, the class structure diagram is as follows:
1) SynchronousQueue
The queue corresponds to the direct submission mentioned above. First of all, SynchronousQueue is unbounded, which means that its ability to store numbers is unlimited. However, due to the characteristics of the Queue itself, after adding elements, you must wait for other threads to take them away before you can continue to add them.
2) LinkedBlockingQueue
The queue corresponds to the unbounded queue above.
3) ArrayBlockingQueue
The queue corresponds to the bounded queue above. ArrayBlockingQueue has the following 3 constructors:
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); if (capacity < c.size()) throw new IllegalArgumentException(); for (Iterator<? extends E> it = c.iterator(); it.hasNext();) add(it.next()); }Let's focus on this fair. Fair represents the competition strategy of queue access threads. When true, task insertion queues comply with FIFO rules. If false, you can "cut the queue". For example, if there are many tasks queuing now, a thread has completed the task and a new task comes. If it is false, this task does not need to be queued in the queue. You can directly cut the queue and then execute it. As shown in the figure below:
6. Thread pool rejection execution strategy
When the number of threads reaches the maximum value, the tasks are still coming at this time, and at this time, I have to refuse to accept the tasks.
ThreadPoolExecutor allows customization of execution policies when adding a task fails. You can call the setRejectedExecutionHandler() method of the thread pool and replace the existing policy with the customized RejectedExecutionHandler object. The default processing strategy provided by ThreadPoolExecutor is to directly discard and throw exception information at the same time. ThreadPoolExecutor provides 4 existing policies, namely:
ThreadPoolExecutor.AbortPolicy: Indicates that the task is rejected and an exception is thrown. The source code is as follows:
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an <tt>AbortPolicy</tt>. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); //Exception throwing} }ThreadPoolExecutor.DiscardPolicy: It means that the task is rejected but no actions are done. The source code is as follows:
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a <tt>DiscardPolicy</tt>. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } // Reject directly, but do nothing}ThreadPoolExecutor.CallerRunsPolicy: Indicates that the task is rejected and the task is directly executed in the caller's thread. The source code is as follows:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); // Execute tasks directly} } }ThreadPoolExecutor.DiscardOldestPolicy: It means that the first task in the task queue is discarded first, and then the task is added to the queue. The source code is as follows:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a <tt>DiscardOldestPolicy</tt> for the given executor. */ public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); // Discard the first task in the queue e.execute(r); // Execute a new task} } }When tasks arrive continuously, a task will be polled from the Queue and then a new task will be executed.
Summarize
The above is a detailed explanation of the JDK's own thread pool introduced by the editor. I hope it will be helpful to everyone. If you have any questions, please leave me a message and the editor will reply to everyone in time. Thank you very much for your support to Wulin.com website!