2. Введение
Технология многопоточности в основном решает проблему выполнения нескольких потоков в процессоре. Это может значительно сократить время холостого хода процессора и увеличить возможность пропускной способности процессора. Тем не менее, накладные расходы частых созданий резьбы очень велики. Итак, как уменьшить эту часть накладных расходов, вам нужно рассмотреть возможность использования пула потоков. Пул потоков - это контейнер потока, который выполняет только номинальное количество потоков за раз. Пул потоков используется для управления этим номинальным количеством потоков.
3. Схема структуры класса с участием пула потоков
Среди них основным для нас является класс ThreadPoolexecutor.
4. Как создать пул потоков
Как правило, у нас есть следующие методы для создания пулов потоков:
1. Используйте фабричный класс исполнителей
Исполнители в основном предоставляют следующие методы для создания пулов потоков:
Давайте посмотрим на пример использования ниже:
1) newfixedThreadpool (пул с фиксированной резьбой)
public class fixdthreadpool {public static void main (string [] args) {executorerservice pool = executors.newfixedthreadpool (5); // Создать пул потоков с фиксированным размером 5 для (int i = 0; i <10; i ++) {pool.submit (new mythread ()); } pool.shutdown (); }} открытый класс Mythread Extends Thread {@Override public void run () {System.out.println (thread.currentThread (). getName () + "выполнение ..."); }}Результаты теста следующие:
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-2 выполняется. Полем Полем
Pool-1-Thread-3 выполняется. Полем Полем
Pool-1-Thread-2 выполняется. Полем Полем
Pool-1-Thread-3 выполняется. Полем Полем
Pool-1-Thread-2 выполняется. Полем Полем
Pool-1-Thread-2 выполняется. Полем Полем
Pool-1-Thread-3 выполняется. Полем Полем
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-4 выполняется. Полем Полем
Пул потоков с фиксированным размером: создавайте поток каждый раз, когда отправляется задача, пока поток не достигнет максимального размера пула потоков. Размер пула потоков останется неизменным, как только он достигнет своего максимального значения. Если поток заканчивается из -за исключения выполнения, пул потоков добавит новый поток.
2) NewsingLethreadExeCutor (пул отдельных потоков)
открытый класс SingleThreadPool {public static void main (string [] args) {executorerservice pool = executors.newsingleThreadExeCutor (); // Создать один пул потоков для (int i = 0; i <100; i ++) {pool.submit (new mythread ()); } pool.shutdown (); }}Результаты теста следующие:
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Однопоточный пул потоков: в этом пуле потоков работает только один поток, что означает, что однополосное последовательное выполнение всех задач. Если этот уникальный поток заканчивается из -за исключения, то будет новый поток, чтобы заменить его. Этот пул потоков гарантирует, что порядок выполнения всех задач выполняется в порядке подачи задачи.
3) GewschedThreadpool
public Class preseedthreadpool {public static void main (string [] args) {warededExecutorservice pool = experators.newscheduledThreadpool (6); for (int i = 0; i <10000; i ++) {pool.submit (new mythread ()); } pool.schedule (new Mythread (), 1000, TimeUnit.milliseconds); pool.schedule (new Mythread (), 1000, timeUnit.milliseconds); pool.shutdown (); }}Результаты теста следующие:
Pool-1-Thread-1 выполняется. Полем Полем
Pool-1-Thread-6 выполняется. Полем Полем
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-4 выполняется. Полем Полем
Pool-1-Thread-2 выполняется. Полем Полем
Pool-1-Thread-3 выполняется. Полем Полем
Pool-1-Thread-4 выполняется. Полем Полем
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-6 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
…………………………………………………………………………………………………………………………………………………………………………………… ……………………………………………………………………………………………………………………………………………………………………………………
Pool-1-Thread-4 выполняется. Полем Полем
Pool-1-Thread-1 выполняется. Полем Полем
Последние два потока результата теста начинают выполнять только после задержки 1s. Этот пул потоков поддерживает требование времени и периодического выполнения задач
4) NewcachedThreadpool (бассейн кэшированных потоков)
открытый класс CachedThreadpool {public static void main (string [] args) {receectorService pool = executors.newcachedthreadpool (); for (int i = 0; i <100; i ++) {pool.submit (new mythread ()); } pool.shutdown (); }}Результаты теста следующие:
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-7 выполняется. Полем Полем
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-16 в исполнении. Полем Полем
Pool-1-Thread-17 выполняется. Полем Полем
Pool-1-Thread-16 в исполнении. Полем Полем
Pool-1-Thread-5 выполняется. Полем Полем
Pool-1-Thread-7 выполняется. Полем Полем
Pool-1-Thread-16 в исполнении. Полем Полем
Pool-1-Thread-18 выполняется. Полем Полем
Pool-1-Thread-10 выполняется. Полем Полем
Кэш -пул потоков: если размер пула потоков превышает поток, необходимый для обработки задачи, некоторые потоки холостого хода (без выполнения задачи за 60 секунд) будут переработаны. Когда количество задач увеличивается, этот пул потоков может разумно добавлять новые потоки для выполнения задачи. Этот пул потоков не ограничивает размер пула потоков, который полностью зависит от максимального размера потока, который может создать операционная система (или JVM).
Чиновник предполагает, что программисты используют более удобные заводские методы исполнителей. Newcachedthreadpool () (безграничный пул потоков, который может выполнять автоматическую переработку потока), experators.newfixedThreadpool (int) (пул потока фиксированного размера). Nextors.newsingleThrexeCutor () (единый фоновый поток). Эти пулы потоков предопределены по конфигурации по умолчанию для большинства сценариев использования.
2. Унаследовать класс ThreadPoolexeCutor и скопируйте метод конструктора родительского класса.
Прежде чем ввести этот метод, давайте проанализируем предыдущие несколько базовых кодов для создания пулов потоков?
Исполнители открытого класса {public static executorservice newfixedthreadpool (int nthreads) {return new ThreadPoolexeCutor (nthreads, nThreads, 0l, TimeUnit.milliseconds, new LinkedBlockingqueue <Runnable> ()); } public static executorservice newsingleThreadExeCutor () {return new FinalizedElegatedExeCutorService (New ThreadPoolexeCutor (1, 1, 0l, TimeUnit.Milliseconds, New LinkedBlockingqueue <Runnable> ())); }}Из базового кода завода исполнителей мы видим, что методы, предоставляемые заводским классом для создания пулов потоков, фактически реализованы путем построения ThreadPoolexeCutor. Код метода конструктора ThreadPoolexeCutor выглядит следующим образом:
public threadpoolexecutor (int corepoolsize, int maximumpoolsize, long keepalivetime, время TimeUnit, BlockQueue <Runnable> Workqueue, Threadfactory ThreatFactory, DecuceedExecutionHandler Handler) {if (corepoolsize <0 || maximumpoolsize <= 0 || Maximumpize <CorePoolSize || AllosalargumentException (); if (workqueue == null || thinkfactory == null || handler == null) бросить новый nullpointerexception (); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; это. this.keepaliveTime = Unit.tonanos (KeepAliveTime); this.threadfactory = treadfactory; this.handler = обработчик; }Затем давайте поговорим о методе конструктора ThreadPoolexeCutor. В этом методе строительства существуют в основном следующие параметры:
CorePoolsize-количество потоков, сохраненных в бассейне, включая бесплатные потоки.
Maximumpoolsize - максимальное количество потоков, разрешенных в бассейне.
KeepAliveTime-Когда количество потоков больше, чем CorePoolsize, это самое длительное время для поток холостого хода, чтобы ждать новой задачи.
Блок- единица параметров PAMALIVETIME.
Workqueue-очередь, используемая для выполнения задач перед выполнением. В этой очереди поддерживается только выполняемые задачи, представленные методом выполнения.
ThreatFactory-фабрика, используемая исполнителями для создания новых потоков.
Хэндлер-обработчик, используемый при выполнении, блокируется из-за прицела потока и емкости очереди.
Далее, давайте поговорим о отношениях между этими параметрами. Когда пул потоков просто создан, в пуле потоков нет потоков (обратите внимание, что не то, что определенное количество потоков создается, как только создается пул потоков). Когда метод execute () вызван для добавления задачи, пул потоков вынесет следующее суждение:
1) Если количество работающих в настоящее время потоков меньше, чем CorePoolsize, то немедленно создайте новый поток для выполнения этой задачи.
2) Если количество работающих в настоящее время потока больше или равно или равно, что эта задача будет помещена в очередь.
3) Если очередь пула потоков заполнена, но количество запущенных потоков меньше, чем MaximumPoolsize, новая поток все еще будет создан для выполнения этой задачи.
4) Если очередь заполнена, а количество запускаемых в настоящее время потоков больше или равно maximumpoolsize, пул потоков будет обрабатывать текущую задачу на основе политики отклонения.
5) Когда выполняется задача, поток возьмет следующую задачу из очереди для выполнения. Если в очереди нет задачи, которая будет выполнена, то поток будет простальным. Если время выживания превышает время, то нить будет переработана пулом потоков (примечание: потоки утилизации являются условными. Если количество, работающих в настоящее время, больше, чем CorePoolsize, поток будет уничтожено. Если он не будет больше, чем CorePoolSize, поток не будет разрушено. Количество потоков должно быть сохранено в количестве CorePoolSize). Почему не то, чтобы поток переработана, как только она будет холодной, но она должна ждать, пока она не превысит время healwAliveTeme до переработки поток? Причина очень проста: потому что создание и разрушение потоков потребляют много, и его нельзя часто создавать и уничтожать. После превышения KeepaliveTime обнаружено, что этот поток действительно не используется, и она будет уничтожена. В этом случае единица представляет собой временную единицу KeepaliveTime, а определение единицы заключается в следующем:
public enum TimeUnit { NANOSECONDS { // keepAliveTime in nanoseconds}, MICROSECONDS { // keepAliveTime in microseconds}, MILLISECONDS { // keepAliveTime in milliseconds}, SECONDS { // keepAliveTime in seconds}, MINUTES { // keepAliveTime in minutes}, HOURS { // KeepAliveTime в часы}, дни {// KeepALiveTime в дни}; Давайте проанализируем исходный код ниже. Для приведенных выше ситуаций исходные коды в основном задействованы следующие:
Private Boolean AddifunderCorePoolsize (Runnable FirstTask) {Thread T = null; окончательный reentrantlock mainlock = this.mainlock; mainlock.lock (); try {if (poolsize <corepoolsize && runstate == running) t = addthread (firsttask); } наконец {mainlock.unlock (); } if (t == null) вернуть false; t.start (); вернуть истину; } На самом деле, этот код очень прост. В основном в нем описывается, что если текущий пул потоков меньше, чем corepoolsize, создается новый поток для выполнения задачи.
Private Boolean Addifundermaximumpoolsize (Runnable FirstTask) {Thread T = null; окончательный reentrantlock mainlock = this.mainlock; mainlock.lock (); try {if (poolsize <maximumpoolsize && runstate == running) t = addthread (firsttask); } наконец {mainlock.unlock (); } if (t == null) вернуть false; t.start (); вернуть истину; }Приведенный выше код описывает, что, если количество текущих пулов потоков меньше, чем максимально, будет создано поток для выполнения задачи.
5. очередь пула ниток
Есть 3 типа очередей за пулом потоков:
Прямой коммит: опция по умолчанию для рабочих очередей является SynchronousQueue, которая подчиняется задачам непосредственно в потоки, не сохраняя их. Здесь, если нет доступного потока, чтобы немедленно запустить задачу, попытка стоять в очереди задача потерпит неудачу, создавая тем самым новый поток. Эта политика избегает замков при обработке наборов запросов, которые могут иметь внутренние зависимости. Прямые представления обычно требуют неограниченных максимализаций, чтобы избежать отклонения вновь представленных задач. Эта стратегия позволяет неограниченным потокам иметь возможность роста, когда команды непрерывно поступают со средним значением, с которым может обрабатывать очередь.
Основанная очередь: использование неограниченной очереди (например, LinkedBlockingqueue без предопределенной емкости) приведет к тому, что новые задачи ожидают в очереди, когда все потоки CorePoolSize будут заняты. Таким образом, созданный поток не будет превышать CorePoolsize. (Значение Maximumpoolsize является недействительным.) Когда каждая задача полностью не зависит от других задач, то есть выполнение задачи не влияет друг на друга, она подходит для неограниченных очередей; Например, на сервере веб -страницы. Эта очередь может быть использована для обработки запросов на переходные взрыва, и эта стратегия позволяет неограниченным потокам иметь возможность роста, когда команды поступают непрерывно превышать среднее значение, которое может обрабатывать очередь.
Ограниченная очередь: при использовании ограниченных максимализаций, ограниченные очереди (такие как ArrayBlockingQueue) помогают предотвратить истощение ресурсов, но могут быть трудно регулировать и контролировать. Размер очереди и максимальный размер пула может потребоваться обменять друг друга: использование больших очередей и небольших пулов может минимизировать использование ЦП, ресурсы операционной системы и переключение контекста, но может привести к ручным снижению пропускной способности. Если задачи часто блокируются (например, если они являются границами ввода -вывода), система может запланировать время для большего количества потоков, чем вы позволяете. Использование небольших очередей обычно требует большего размера бассейна и имеет более высокое использование ЦП, но может столкнуться с неприемлемыми накладными расписаниями, что также может снизить пропускную способность.
Давайте поговорим о очереди бассейна резьбы ниже, диаграмма структуры класса выглядит следующим образом:
1) Synchronousqueue
Очередь соответствует прямому подчинению, упомянутому выше. Прежде всего, Synchronousqueue не ограничена, что означает, что его способность хранить числа не ограничена. Однако из -за характеристик самой очереди, после добавления элементов, вы должны ждать, пока другие потоки заберут их, прежде чем вы сможете продолжить добавлять их.
2) LinkedBlockingqueue
Очередь соответствует неограниченной очереди выше.
3) Arrayblockingqueue
Очередь соответствует ограниченной очереди выше. ArrayBlockingQueue имеет следующие 3 конструктора:
public arrayblockingqueue (int емкость) {this (емкость, false); } public arrayblockquequeue (int емкость, логическая ярмарка) {if (емкость <= 0) бросить новый allosalargumentException (); this.items = (e []) новый объект [емкость]; lock = new Reentrantlock (Fair); notempty = lock.newCondition (); natufull = lock.newCondition (); } public ArrayBlockingQueue (int емкость, логическая ярмарка, коллекция <? Extends e> c) {this (емкость, ярмарка); if (емкость <c.size ()) бросить new allogalargumentException (); for (iterator <? Extends e> it = c.iterator (); it.hasnext ();) add (it.next ()); }Давайте сосредоточимся на этой ярмарке. Ярмарка представляет конкурентную стратегию потоков доступа к очереди. Когда это правда, очереди вставки задач соответствуют правилам FIFO. Если ложь, вы можете «разрезать очередь». Например, если сейчас есть много задач очереди, поток выполнил задачу, и появится новая задача. Если это ложь, эту задачу не должна быть в очереди в очереди. Вы можете напрямую вырезать очередь, а затем выполнить ее. Как показано на рисунке ниже:
6. Стратегия выполнения отказа
Когда количество потоков достигает максимального значения, в настоящее время задачи все еще идут, и в настоящее время я должен отказаться принять задачи.
ThreadPoolexeCutor позволяет настраивать политики выполнения при сбое задачи. Вы можете вызвать метод SetRectedExecutionHandler () пула потоков и заменить существующую политику настраиваемым объектом DecuceedExecutionHandler. Стратегия обработки по умолчанию, предоставленная ThreadPoolexeCutor, заключается в том, чтобы напрямую отбросить и бросить информацию об исключении одновременно. ThreadPoolexeCutor предоставляет 4 существующих политика, а именно:
ThreadPoolexeCutor.Abortpolicy: указывает, что задача отклоняется и исключено. Исходный код заключается в следующем:
Общественный статический класс Abortpolicy Reframents DecuceedExecutionHandler { /*** Создает <tt> Abortpolicy < /tt>. * / public abortpolicy () {} / *** Всегда бросает DecueedExecutionException. * @param r Задача задача, запрошенная для выполнения * @param e Исполнитель, пытающийся выполнить эту задачу * @throws reuceededexecutionexception всегда. */ public void decuceedExecution (runnable r, threadpoolexecutor e) {бросить новый decueedExecutionexception (); // бросание исключений}}ThreadPoolexeCutor.discardPolicy: это означает, что задача отклоняется, но никаких действий не предпринимается. Исходный код заключается в следующем:
Общественный статический класс DiscardPolicy Refrments DecuceedExecutionHandler { /*** Создает <tt> DiscardPolicy < /tt>. * / public discardpolicy () {} / *** ничего не делает, что приводит к отброшению задачи r. * @param r Заполнимая задача, запрошенная для выполнения * @param e Исполнитель, пытающийся выполнить эту задачу */ public void drecectedexecution (runnable r, threadpoolexecutor e) {} // отвергайте напрямую, но ничего не делайте}}}}ThreadPoolexeCutor.callerrunSpolicy: указывает, что задача отклоняется, и задача напрямую выполняется в потоке вызывающего абонента. Исходный код заключается в следующем:
Общественный статический класс Callerrunspolicy Refrances DecuceedExecutionHandler { /*** Создает <tt> callerrunspolicy < /tt>. * / public callerrunspolicy () {} / ** * выполняет задачу R в потоке вызывающего абонента, если исполнитель * не был выключен, и в этом случае задача отброшена. * @param r Задача задача, запрошенная для выполнения * @param e Исполнитель, пытающийся выполнить эту задачу */ public void decueedExecution (runnable r, threadpoolexecutor e) {if (! eisshutdown ()) {r.run (); // выполнять задачи напрямую}}}ThreadPoolexeCutor.discardoldestpolicy: это означает, что первая задача в очереди задачи сначала отбрасывается, а затем задача добавляется в очередь. Исходный код заключается в следующем:
Публичный статический класс DecardestPolicy реализует DecuceedExecutionHandler { /*** Создает <tt> discordEldestpolicy < /tt> для данного исполнителя. */ public discordestPolicy () {} public void decueedExecution (runnable r, threadpoolexecutor e) {if (! e.isshutdown ()) {e.getqueue (). opl (); // отказ от первой задачи в очереди execute (r); // выполнить новую задачу}}}Когда задачи будут приходить непрерывно, из очереди будет выполнено задача, а затем будет выполнена новая задача.
Суммировать
Выше приведено подробное объяснение собственного пула потоков JDK, представленного редактором. Я надеюсь, что это будет полезно для всех. Если у вас есть какие -либо вопросы, пожалуйста, оставьте мне сообщение, и редактор ответит всем вовремя. Большое спасибо за вашу поддержку сайту wulin.com!