1. Основное использование пулов потоков
1.1. Зачем вам бассейн потоков?
В ежедневном бизнесе, если мы хотим использовать многопоточное, мы создадим потоки до начала бизнеса, и уничтожим потоки после заканчивания бизнеса. Тем не менее, для бизнеса создание и уничтожение нитей не имеют ничего общего с самим бизнесом, и заботится только о задачах, выполняемых потоком. Поэтому я надеюсь использовать как можно больше процессоров для выполнения задач, а не для создания и уничтожения потоков, которые не связаны с бизнесом. Пул потоков решает эту проблему. Функция пула потоков заключается в повторном использовании потоков.
1.2. Какая поддержка предоставляет нам JDK
Связанные классовые диаграммы в JDK показаны на рисунке выше.
Несколько специальных категорий, которые будут упомянуты.
Класс CALLABLE похож на класс -запуска, но разница в том, что Callable имеет возвратное значение.
ThreadPoolexeCutor является важной реализацией пулов потоков.
Исполнители - фабричный класс.
1.3. Использование бассейнов потоков
1.3.1. Типы бассейнов потоков
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolexeCutor (1, 1, 0l, TimeUnit.milliseconds, New LinkedBlockqueue <Runnable> ()));} public Static executorService newcachedThreadpool () {return newpoolexecutor (0, integer.max_value, 60l, timeunit.seconds, newger.max_value, 60l, timeunit. Synchronousqueue <Nrunnable> ());}С точки зрения метода, очевидно, что FixedThreadPool, SingleThreadExeCutor и CachedThreadPool являются разными экземплярами ThreadPoolexeCutor, но параметры разные.
public ThreadPoolexeCutor (int corePoolsize, int maximumpoolsize, Long KeepaliveTime, TimeUnit Unit, Blockquequeue <Runnable> workqueue) {this (corepoolsize, maximumpoolsize, keepalivetime, Unit, rabqueue, executors.defaultepactory (), Defulthandler); Давайте кратко опишем значение параметров в конструкторе ThreadPoolexeCutor.
Таким образом, глядя на фиксированный sthreadpool, упомянутый выше, количество ядер и максимальное количество потоков одинаково, так что потоки не будут созданы и уничтожены во время работы. Когда количество задач велико, а потоки в пуле резьбы не могут быть выполнены, задача будет сохранена в LinkedBlockingqueue, а размер LinkedBlockingQueue - это integer.max_value. Это означает, что непрерывное добавление задач заставит память потреблять все больше и больше.
CachedThreadpool отличается. Его номер основного потока составляет 0, максимальное количество хранилища INTEGER.MAX_VALUE, а его очередь блокировки - синхронная раза, которая является специальной очередью, а ее размер - 0. Поскольку количество основных потоков составляет 0, необходимо добавить задачу в Synchronouseue. Эта очередь может добиться успеха только тогда, когда один поток добавляет из него данные, а другой поток получает данные из него. Добавление данных в эту очередь только вернет сбой. Когда возврат не удается, пул резьбы начинает расширять поток, поэтому количество потоков в CachedThreadpool не зафиксировано. Когда поток не используется в течение 60 -х годов, нить уничтожается.
1.4. Небольшие примеры использования бассейна резьбы
1.4.1. Простой бассейн потоков
Import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class threadpooldemo {public Static Class MyTask реализует Runnable {@Override public void run () {System.out.println (System.CurrentTimeMillis () + ». try {thread.sleep (1000); } catch (Exception e) {e.printstackTrace (); }}} public static void main (string [] args) {mytask mytask = new mytask (); Executorservice es = executors.newfixedthreadpool (5); for (int i = 0; i <10; i ++) {es.submit (mytask); }}} Поскольку используется NewFixedThreadpool (5), но запускается 10 потоков, 5 выполняются одновременно, и очевидно, что повторное использование потока наблюдается. ThreadID повторяется, то есть первые 5 задач, а последние 5 задач выполняются одной и той же партией потоков.
Что здесь используется
es.submit (mytask);
Есть также способ представить:
es.execute (mytask);
Разница в том, что представление вернет будущий объект, который будет представлен позже.
1.4.2.scheduledThreadpool
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPoolDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); // Если предыдущая задача не была выполнена, диспетчера не запустится. ses.schedulewithfixeddelay (new Runnable () {@override public void run () {try {thread.sleep (1000); System.out.println (System.currentTimeMillis ()/1000);} Catch (Exception e) {// todo: excection}}}}, 0, 2, 2,/время. Секунды, а затем выполнять один раз каждые 2 секунды в цикле}}Выход:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Поскольку выполнение задачи занимает 1 секунду, планирование задач должно ждать выполнения предыдущей задачи. То есть каждые 2 секунды здесь означает, что новая задача будет начата через 2 секунды после выполнения предыдущей задачи.
2. Распространение и усиление пула потоков
2.1. Интерфейс обратного вызова
В пуле потоков есть некоторые API -обратного вызова, чтобы предоставить нам расширенные операции.
Executorservice es = new ThreadPoolexeCutor (5, 5, 0l, TimeUnit.seconds, New LinkedBlockQueue <Runnable> ()) {@Override Protected void передэксек (поток t, runnable r) {System.out.println («Подготовка к выполнению»); } @Override Protected void AfterExecute (Runnable R, Throwable T) {System.out.println ("выполнение завершено"); } @Override protected void urdinated () {System.out.println ("Пул потоков"); }};Мы можем реализовать методы RetherExecute, Fehtexecute и Amersed of ThreadPoolexeCutor для реализации управления журналами или других операций до и после выполнения потока, выход пула потоков.
2.2. Стратегия отклонения
Иногда задачи очень тяжелые, что приводит к слишком большой нагрузке на систему. Как упомянуто выше, когда количество задач увеличивается, все задачи будут размещены в очереди блокировки Fixedthreadpool, что приведет к слишком большому потреблению памяти и в конечном итоге переполнено памяти. Таких ситуаций следует избегать. Поэтому, когда мы обнаруживаем, что количество потоков превышает максимальное количество потоков, мы должны отказаться от некоторых задач. При отбросе мы должны записать задачу вместо того, чтобы выбрасывать ее напрямую.
В ThreadPoolexeCutor есть еще один конструктор.
public threadpoolexecutor (int corepoolsize, int maximumpoolsize, long keepalivetime, время TimeUnit, BlockQueue <Runnable> Workqueue, Threadfactory ThreatFactory, DecuceedExecutionHandler Handler) {if (corepoolsize <0 || maximumpoolsize <= 0 || Maximumpize <CorePoolSize || AllosalargumentException (); if (workqueue == null || thinkfactory == null || handler == null) бросить новый nullpointerexception (); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; это. this.keepaliveTime = Unit.tonanos (KeepAliveTime); this.threadfactory = treadfactory; this.handler = обработчик; } Мы представим ThreadFactory позже.
Обработчик отвергает реализацию политики, которая расскажет нам, что делать, если задача не может быть выполнена.
Есть 4 стратегии выше.
Abortpolicy: если задача не может быть принята, исключение брошено.
Callerrunspolicy: Если задача не может быть принята, пусть вызовая потока завершит.
DESCARDOLDESTPOLICIY: Если задача не может быть принята, самая старая задача будет отброшена и поддерживается очередью.
DiscardPolicy: если задача не может быть принята, задача будет отброшена.
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is discard"); } }); Конечно, мы также можем самостоятельно реализовать интерфейс DrecueedExecutionHandler, чтобы сами определить политику отказа.
2.3. Настроить Treadfactory
Я только что видел, что Threadfactory может быть указан в конструкторе ThreadPoolexeCutor.
Потоки в пуле потоков создаются на заводе потока, и мы можем настроить фабрику потока.
Фабрика потока по умолчанию:
Статический класс defaultThreadFactory реализует ThreadFactory {Private Static Final AtomicInteger PoolNumber = New AtomicInteger (1); частная группа Final ThreadGroup; Частный финальный AtomicInteger Threadnumber = новый AtomicInteger (1); частная финальная строка NamePrefix; DefaultThreadFactory () {SecurityManager S = System.GetSecurityManager (); Group = (S! = NULL)? S.GetThreadGroup (): Thread.CurrentThread (). getThreadGroup (); nameprefix = "pool-" + poolnumber.getandincrement () + "-Thread-"; } public Thread NewThread (Runnable R) {Thread T = новый поток (группа, r, nameprefix + threadnumber.getandincrement (), 0); if (t.isdaemon ()) t.setdaemon (false); if (t.getPriority ()! = Thread.norm_priority) t.setPriority (Thread.norm_priority); возврат t; }}3. Форкжин
3.1. Мысли
Это идея деления и завоевания.
Вилка/соединение аналогична алгоритму MapReduce. Разница между ними: fork/gin делятся на небольшие задачи только при необходимости, например, если задача очень большая, в то время как MapReduce всегда начинает выполнять первый шаг для сегментации. Похоже, что вилка/соединение более подходит для уровня потока внутри JVM, в то время как MapReduce подходит для распределенных систем.
4.2. Использование интерфейса
Рецирсививация: нет возврата
Recurrusivetask: есть возвратная стоимость
4.3. Простой пример
Импорт java.util.arraylist; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkjointask; import java.util.concurrent.recursivetask; public countast venders recursivatask <long> {private int final int threshold = 10000; частный длинный старт; частный длинный конец; public counttask (Long Start, Long End) {super (); this.start = start; this.end = end; } @Override защищен Long Compute () {long Sum = 0; Boolean cancompute = (end - start) <порог; if (cancompute) {for (long i = start; i <= end; i ++) {sum = sum+i; }} else {// разделить на 100 небольших задач Long Step = (start + end)/100; ArrayList <coundTask> subtasks = new ArrayList <CountTask> (); длинный pos = start; for (int i = 0; i <100; i ++) {long lotone = pos+step; if (lastone> end) {lastone = end; } Counttask subtask = new Counttask (pos, lastone); pos + = step + 1; subtasks.add (subtask); subtask.fork (); // Нажмите подзадачи в пул потока} для (counttask t: subtasks) {sum += t.join (); // ждать всех подзадач, чтобы закончить}} return sum; } public static void main (string [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); Counttask task = new CountSk (0, 200000L); Forkjointask <long> result = forkjoinpool.submit (задача); try {long res = result.get (); System.out.println ("sum =" + res); } catch (Exception e) {// todo: обрабатывать исключение explyStstackTrace (); }}} Приведенный выше пример описывает задачу суммирования. Разделите накопленные задачи на 100 задач, каждая задача выполняет только сумму чисел, а после окончательного соединения накапливается сумма, рассчитанная по каждой задаче.
4.4. Элементы реализации
4.4.1.WorkQueue и CTL
В каждом потоке будет очередь
Статический финальный класс Workqueue
В очереди в работе будет ряд полей, которые управляют потоками.
volatile int eventCount; // кодированный счет инактивации; <0, если неактивно
int nextwait; // закодированная запись официанта следующего события
int gnirows; // количество сталей
int подсказка; // Подсказка стального индекса
короткий пуллиндекс; // Индекс этой очереди в бассейне
окончательный короткий режим; // 0: lifo,> 0: fifo, <0: общий
нестабильный int qlock; // 1: заблокирован, -1: прекратить; еще 0
нестабильная база Int; // Индекс следующего слота для опроса
int top; // Индекс следующего слота для толчка
Forkjointask <?> [] Массив; // элементы (изначально не раскрываются)
Окончательный пул по поводу форкюинпуля; // содержащий бассейн (может быть нулевой)
Окончательный владелец ForkjoinworkerThread; // Владение потоком или нулевым, если общий
нестабильная нить Паркер; // == Владелец во время вызова в парк; еще нуль
volatile forkjointask <?> currentjoin; // задача присоединяется к Awaitjoin
Forkjointask <?> CurrentSteal; // текущая нелокальная задача выполняется
Здесь следует отметить, что существует большая разница между JDK7 и JDK8 в реализации Forkjoin. То, что мы представляем здесь, это от JDK8. В пуле потоков, иногда не все потоки выполняются, некоторые потоки будут приостановлены, и эти подвесные потоки будут храниться в стеке. Он представлен внутри связанного списка.
Nextwait укажет на следующую резьбу для ожидания.
Индекс индекса индекса в пуле потоков Boolindex.
EventCount При инициализировании, EventCount связан с Poolindex. В общей сложности 32 бита, первый бит показывает, активируется ли он, и 15 бит указывают на количество раз, когда он был приостановлен
EventCount, остальное представляет Boolindex. Используйте одно поле, чтобы представлять несколько значений.
Workqueue Workqueue представлена Forkjointask <?> [] Array. Верх и база представляют оба конца очереди, и данные между этими двумя.
Поддерживать CTL (64-битный длинный тип) в Forkjoinpool
нестабильный длинный CTL;
* Полевое CTL давно упаковано:
* AC: Количество активных работников, за исключением целевого параллелизма (16 бит)
* TC: количество общих работников за вычетом целевого параллелизма (16 бит)
* ST: True, если пул прекращен (1 бит)
* EC: количество ожидания в верхней потоке ожидания (15 бит)
* ID: Poolindex вершины стека официантов (16 бит)
AC представляет собой активное количество резьбов за вычетом степени параллелизма (вероятно, количество процессоров)
TC означает общее количество потоков минус параллелизм
ST указывает, активируется ли сам бассейн резьбов
EC представляет количество приостановленных потоков в верхнее время ожидания
Идентификатор показывает, что Poolindex ожидает потока в верхней части
Очевидно, что ST+EC+ID - это то, что мы только что назвали EventCount.
Так почему вы должны синтезировать переменную с 5 переменными? Фактически, емкость занимает то же самое с 5 переменными.
Читаемость использования кода переменной будет намного хуже.
Так зачем использовать переменную? На самом деле, это самая умная вещь, потому что эти 5 переменных являются целым. В многопользовании, если используются 5 переменных, то при изменении одной из переменных, как обеспечить целостность 5 переменных. Затем использование переменной решит эту проблему. Если решить с помощью замков, производительность будет ухудшена.
Использование переменной обеспечивает согласованность и атомичность данных.
Изменения в эскадрильи FORKJOIN CTL выполняются с использованием операций CAS. Как упоминалось в предыдущей серии статей, CAS является операцией без блокировки и имеет хорошую производительность.
Поскольку операции CAS могут ориентироваться только на одну переменную, эта конструкция является оптимальной.
4.4.2. Работа воровство
Далее мы представим рабочий процесс всего пула потоков.
Каждая потока вызывает бегущего работника
Final void Runworker (Workqueue w) {w.growarray (); // выделять очередь для (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} Функция Scan () состоит в том, чтобы сканировать для выполнения задач.
r - относительно случайное число.
Частный финальный Scan (Workqueue w, int r) {workqueue [] ws; int m; длинный c = ctl; // Для проверки согласованности if ((ws = rabqueues)! = null && (m = ws.length - 1)> = 0 && w! = null) {for (int j = m + m + 1, ec = w.eventcount ;;) {workqueue q; int b, e; Forkjointask <?> [] A; Forkjointask <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = Q.Array)! = null) {long i = ((a.length - 1) & b) << ashift) + abase; if ((t = ((forkjointask <?>) u.getObjectvolatile (a, i)))! = null) {if (ec <0) helprelease (c, ws, w, q, b); else if (q.base == b && u.compareandswapobject (a, i, t, null)) {u.putorderedint (q, qbase, b + 1); if ((b + 1) - q.top <0) сигнальная обработка (ws, q); w.runtask (t); } } перерыв; } else if (-j <0) {if (ec | (e = (int) c)) <0) // неактивное или завершающее возврат awaitWork (w, c, ec); иначе if (ctl == c) {// Попробуйте инактивировать и включать длинный nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; W.EventCount = EC | Int_sign; if (! u.compareAndswaplong (this, ctl, c, nc)) w.eventcount = ec; // назад} Break; }}} return 0; } Давайте посмотрим на метод сканирования. Одним из параметров сканирования является рабочее. Как упомянуто выше, в каждой поток будет рабочая, а рабочая работа будет сохранена в рабочих раз. r - случайное число. Используйте R, чтобы найти рабочую, и выполнять задачи, которые должны выполняться в рабочей силе.
Затем, через рабочую основу, получите смещение базы.
b = Q.Base
..
длинный i = (((a.length - 1) & b) << ashift) + abase;
..
Затем получите последнюю задачу через смещение и запустите эту задачу
t = ((forkjointask <?>) u.getObjectVolatile (a, i))
..
w.runtask (t);
..
Благодаря этому грубому анализу мы обнаружили, что после того, как текущий поток вызовет метод сканирования, он не выполнит задачи в текущем рабочем месте, но получит другие задачи Workfeeue через случайное число r. Это один из основных механизмов Forkjoinpool.
Текущий поток будет не только сосредоточен на своих собственных задачах, но и будет расставлять приоритеты в других задачах. Это предотвращает продление голода. Это мешает некоторым потокам не может выполнять задачи во времени по застенчивым или другим причинам, или нить имеет большое количество задач, но другие нити не имеют ничего общего.
Тогда давайте посмотрим на метод Runtask
final void runtask (forkjointask <?> task) {if ((currentSteal = task)! = null) {forkjoinworkerThread Thread; task.doexec (); Forkjointask <?> [] A = массив; int md = mode; ++ nsteals; currentSteal = null; if (md! = 0) pollandexecall (); иначе if (a! = null) {int s, m = a.length - 1; Forkjointask <?> T; while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandSetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doExec (); }} if ((think = hover)! = null) // Нет необходимости делать в Think Thread.AfterTopleVexec (); }}Есть интересное имя: CurrentSteal, украденная задача - это действительно то, что я только что объяснил.
task.doexec ();
Эта задача будет выполнена.
После выполнения задач других людей вы выполните свои собственные задачи.
Получите первую задачу, получив верх
while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandSetObject (a, ((m & s) << ashift) + abase, null))! = null) {top = s; t.doexec ();}Затем используйте график, чтобы суммировать процесс пула потоков.
Например, есть два потока T1 и T2. T1 получит последнюю задачу T2 через базу T2 (конечно, это на самом деле последняя задача потока через случайное число R), а T1 также выполнит свою первую задачу через свою собственную вершину. Напротив, T2 сделает то же самое.
Задачи, которые вы выполняете для других тем, начинаются с базы, и задачи, которые вы выполняете для себя, начинаются сверху. Это уменьшает конфликт
Если другие задачи не найдены
иначе if (-j <0) {if ((ec | (e = (int) c)) <0) // неактивная или завершающая возврат a aightwork (w, c, ec); иначе if (ctl == c) {// Попробуйте инактивировать и включать длинный nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; W.EventCount = EC | Int_sign; if (! u.compareAndswaplong (this, ctl, c, nc)) w.eventcount = ec; // назад} Break; } Затем сначала, значение CTL будет изменено через серию прогонов, NC будет получено, и тогда новое значение будет назначено с CAS. Затем позвоните awaitwork (), чтобы войти в состояние ожидания (называемый метод парка Unfee, упомянутый в предыдущей серии статей).
Что нам нужно объяснить здесь, так это изменить значение CTL. Здесь, во -первых, AC -1 в CTL, а AC занимает 16 лучших битов CTL, поэтому он не может быть напрямую -1, но вместо этого достигает эффекта, создания 16 лучших бит CTL -1 через AC_UNIT (0x10000000000000000) первых 16 бит CTL.
Как упоминалось ранее, EventCount сохраняет Boolindex, а через Poolindex и Nextwait в Workfeeue вы можете пройти все резкие потоки.