В многопоточном фонде [высокой параллелистики Java II] мы первоначально упомянули базовые операции синхронизации потоков. На этот раз мы хотим упомянуть инструмент управления синхронизацией в одновременном пакете.
1. Использование различных инструментов управления синхронизацией
1.1 Reentrantlock
Reentrantlock ощущается как улучшенная версия синхронизации. Особенность синхронизации заключается в том, что он прост в использовании, и все остается для JVM для обработки, но его функции относительно слабые. Перед JDK1.5 производительность Reentrantlock была лучше, чем синхронизированная. Из -за оптимизации JVM производительность двух в текущей версии JDK сопоставима. Если это простая реализация, не сознательно используйте Reentrantlock.
По сравнению с синхронизированным, Reentrantlock более функционально богат, и он имеет характеристики повторного, прерываемого, ограниченного времени и справедливой блокировки.
Во -первых, давайте воспользуемся примером, чтобы проиллюстрировать первоначальное использование reentrantlock:
Пакет Test; import java.util.concurrent.locks.reentrantlock; Общедоступный тест класса реализует {public static reentrantlock lock = new Reentrantlock (); Public Static int i = 0; @Override public void run () {for (int j = 0; j <10000000; j ++) {lock.lock (); попробуйте {i ++; } наконец {lock.unlock (); }}} public static void main (string [] args) бросает прерывание. Потока T1 = новый поток (тест); Потока T2 = новый поток (тест); t1.start (); t2.start (); t1.join (); t2.join (); System.out.println (i); }}Есть два потока, которые выполняют операции ++ на i. Чтобы обеспечить безопасность потока, используется повторный интенсивность. Из использования мы видим, что по сравнению с синхронизированным, Reentrantlock немного сложнее. Поскольку операция разблокировки должна быть выполнена, наконец, если она не будет, наконец, разблокирована, возможно, что код имеет исключение, а блокировка не выпускается, а Synchronized выпускается JVM.
Так каковы отличные характеристики Reentrantlock?
1.1.1 Повторное вход
Одиночный поток может быть неоднократно вводить, но должен быть неоднократно выходить
lock.lock (); lock.lock (); try {i ++; } наконец {lock.unlock (); lock.unlock ();}Поскольку Reentrantlock является блокировкой повторного завода, вы можете неоднократно получать тот же замок, который имеет счетчик сбора, связанного с замком. Если поток, который владеет блокировкой, снова получает блокировку, счетчик сбора увеличивается на 1, и замок должен быть выпущен дважды, чтобы получить реальный релиз (reentrant lock). Это имитирует семантику синхронизации; Если поток входит в синхронизированный блок, защищенный монитором, который уже имеет поток, поток разрешено продолжать. Когда поток выходит из второго (или последующего) синхронизированного блока, блокировка не высвобождается. Замок выпускается только тогда, когда поток выходит из первого синхронизированного блока, защищенного монитором, который он входит.
Public Class Child расширяет отцом реализует Runnable {Final Static Child child = new Child (); //, чтобы убедиться, что блокирует уникальную публичную статическую void main (string [] args) {for (int i = 0; i <50; i ++) {new Thread (Child) .start (); }} public synchronized void dosomething () {System.out.println ("1child.dosomething ()"); doanotherthing (); // вызовать другие синхронизированные методы в своем собственном классе} Частный синхронизированный void doanotherthing () {super.dosomething (); // вызовуте синхронизированный метод системы родительского класса. } @Override public void run () {child.dosomething (); }} класс Отец {public Synchronized void dosomething () {System.out.println ("2father.dosomething ()"); }}Мы видим, что поток входит в другой синхронизированный метод и не будет выпускать замки, полученные ранее. Таким образом, выход все еще последовательно. Поэтому синхронизированный также является блокировкой повторной
Выход:
1child.dosomething ()
2father.dosomething ()
3child.doanotherthing ()
1child.dosomething ()
2father.dosomething ()
3child.doanotherthing ()
1child.dosomething ()
2father.dosomething ()
3child.doanotherthing ()
...
1.1.2. Прерывим
В отличие от синхронизации, Reentrantlock реагирует на прерывания. Связанный с собой представление о знаниях [высокая параллельная java 2] Основы многопоточного чтения
Обычный lock.lock () не может ответить на прерывания, lock.lockEntertifle () может отвечать на прерывания.
Мы имитируем сцену тупика, а затем используем прерывания, чтобы справиться с тупиком
Пакет -тест; импорт java.lang.management.managemagefactory; импорт java.lang.management.threadinfo; импорт java.lang.management.threadmxbean; импорт java.util.concurrent.locks.reentrantlock; общедоступные тестовые импровизии Runnable {public static reentrantlock lock1 = new Reentrantlock (););););););););); Public Static Reentrantlock Lock2 = new Reentrantlock (); int lock; public Test (int lock) {this.lock = lock; } @Override public void run () {try {if (lock == 1) {lock1.lockEntertible (); try {thread.sleep (500); } catch (Exception e) {// todo: обработка exception} lock2.lockEntertifle (); } else {lock2.lockEntertible (); try {thread.sleep (500); } catch (Exception e) {// todo: обработка exception} lock1.lockEntertible (); }} catch (Exception e) {// toDo: обрабатывать исключение} наконец {if (lock1.isheldbycurrentThread ()) {lock1.unlock (); } if (lock2.isheldbycurrentthread ()) {lock2.unlock (); } System.out.println (thread.currentThread (). GetId () + ": exit Thread"); }} public static void main (string [] args) бросает прерванные Тест T2 = новый тест (2); Thread Thread1 = новый поток (T1); Thread Think2 = новый поток (T2); Thread1.start (); Thread2.start (); Thread.sleep (1000); //Deadlockchecker.check (); } static class DeadlockChecker {private final Static ThreadMxbean mbean = ManagementFactory .getThreadMxbean (); Окончательный статический runnable deadlockchecker = new Runnable () {@Override public void run () {// todo Автогенерированный метод заглушка, в то время как (true) {long [] DeadlockedThreadids = mbean.finddeadlockedthreads (); if (deadlockedthreadids! = null) {ThreadInfo [] threadInfos = mbean.getThreadInfo (DeadlockedThreadids); for (Thread T: Thread.getAllStackTraces (). keySet ()) {for (int i = 0; i <threadInfos.length; i ++) {if (t.getId () == ThreadInfos [i] .getThreadId ()) {t.interrupt (); }}}}} try {thread.sleep (5000); } catch (Exception e) {// todo: обрабатывать исключение}}}}}; public static void check () {Thread T = новый поток (DeadlockChecker); T.SetDaemon (True); t.start (); }}}Приведенный выше код может вызвать тупики, поток 1 получает Lock1, поток 2 получает Lock2, а затем друг друга хотят получить блокировки друг друга.
Мы используем JStack для просмотра ситуации после запуска вышеуказанного кода
Тупика действительно была обнаружена.
DeadlockChecker.Check (); Метод используется для обнаружения тупиков, а затем прервать поток тупика. После прерывания нить выходит нормально.
1.1.3. Ограниченное время
Если тайм -аут не может получить блокировку, он вернет ложь и не будет ждать навсегда, чтобы сформировать мертвую блокировку.
Используйте Lock.trylock (Long Timeout, TimeUnit Unit), чтобы реализовать блокировки, ограниченные временем, причем параметры являются временем и единицами.
Позвольте мне привести вам пример, чтобы проиллюстрировать, что время может быть ограничено:
Тест пакета; импорт java.util.concurrent.timeUnit; import java.util.concurrent.locks.reentrantlock; Общедоступный тест класса реализует выполняется {public static reentrantlock lock = new Reentrantlock (); @Override public void run () {try {if (lock.trylock (5, timeunit.seconds)) {thread.sleep (6000); } else {System.out.println ("Get Lock Faile"); }} catch (Exception e) {} наконец {if (lock.isheldbycurrentthread ()) {lock.unlock (); }}} public static void main (string [] args) {test t = new Test (); Потока T1 = новый поток (t); Потока T2 = новый поток (t); t1.start (); t2.start (); }}Используйте два потока, чтобы конкурировать за блокировку. Когда нить получает замок, спит 6 секунд, и каждая нить пытается получить блокировку только в течение 5 секунд.
Таким образом, должен быть поток, который не может получить блокировку. Если вы не можете получить его, вы будете напрямую выйти.
Выход:
Получить сбой блокировки
1.1.4. Справедливый замок
Как использовать:
Public Reentrantlock (логическая ярмарка)
Public Static Reentrantlock Fairlock = new Reentrantlock (True);
Замки в целом несправедливы. Не обязательно возможно, что поток, который появляется первым, может сначала получить блокировку, но поток, который появится позже, получит блокировку позже. Несправедливые замки могут вызвать голод.
Справедливая блокировка означает, что эта блокировка может гарантировать, что поток поступает на первом месте, и сначала получает блокировку. Хотя справедливые замки не приведут к голоду, производительность справедливых замков будет намного хуже, чем у не-Fair.
1.2 Условие
Соотношение между условием и повторным делом аналогична синхронизированному и объекту. Wation ()/signal ()
Метод wait () заставит текущий поток подождать и отпустить текущую блокировку. Когда сигнал () используется в других потоках или методе Signalall (), поток восстановит блокировку и продолжит выполнять. Или когда поток прерывается, вы также можете выпрыгнуть из ожидания. Это очень похоже на метод Object.Wait ().
Метод wiateUninterbulal () в основном такой же, как метод wait (), но он не будет ждать прерывания ответа во время процесса. Метод Singal () используется для разбуждения в ожидании потока. Относительный метод Singalall () разбудит все, ожидающие все потоки. Это очень похоже на метод objct.notify ().
Я не буду представить его подробно здесь. Позвольте мне привести пример, чтобы проиллюстрировать:
Пакет Test; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.reentrantlock; Общедоступный тест класса реализует {public static reentrantlock lock = new Reentrantlock (); Состояние общественного статического условия = lock.newCondition (); @Override public void run () {try {lock.lock (); Condition.Await (); System.out.println («Поток продолжается»); } catch (Exception e) {e.printstackTrace (); } наконец {lock.unlock (); }} public static void main (string [] args) бросает прерывание. Потоковая потока = новый поток (t); Thread.Start (); Thread.sleep (2000); lock.lock (); Condition.Signal (); lock.unlock (); }}Приведенный выше пример очень прост. Пусть нить ожидает и позвольте главной нити разбудить его. Condition.Await ()/Signal может использоваться только после получения блокировки.
1.3.Semaphore
Для замков это взаимоисключающе. Это означает, что до тех пор, пока я получаю замок, никто не может получить его снова.
Для семафора он позволяет нескольким потокам входить в критический раздел одновременно. Его можно рассматривать как общий замок, но общий лимит ограничен. После того, как предел будет изготовлен, другие потоки, которые не получили предел, все равно будут блокировать за пределами критической области. Когда сумма составляет 1, она эквивалентна блокировке
Вот пример:
Пакет Test; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.semaphore; тест открытого класса реализует выполняемый {окончательный семифор Semaphore = новый Semaphore (5); @Override public void run () {try {semaphore.acquire (); Thread.sleep (2000); System.out.println (Thread.currentThread (). GetId () + "end"); } catch (Exception e) {e.printstackTrace (); } наконец {semaphore.release (); }} public static void main (string [] args) бросает прерывание {receectorservice executorservice = executors.newfixedthreadpool (20); Окончательный тест t = new Test (); for (int i = 0; i <20; i ++) {rececusterservice.submit (t); }}}Существует пул потоков с 20 потоками, и каждая поток переходит к лицензии Semaphore. Есть только 5 лицензий на семафор. После запуска вы можете видеть, что 5 выводят партии, партии выводятся.
Конечно, один поток также может подать заявку на несколько лицензий одновременно
public void приобретать (int разрешения) бросает прерванное
1.4 ReadWritelock
ReadWritelock - это блокировка, которая отличает функции. Чтение и написание-это две разные функции: чтение чтения не является взаимоисключающим, чтение-записка является взаимоисключающей, а запись записи является взаимоисключающей.
Этот дизайн увеличивает параллелизм и обеспечивает безопасность данных.
Как использовать:
Частный static reentertreadwritelock readwritelock = new ReenterTreadWriteLock ();
частная статическая блокировка readlock = readwritelock.readlock ();
Private Static Lock WriteLock = readWritelock.writelock ();
Для получения подробных примеров, вы можете просмотреть Java -реализацию проблем с производителями и потребителями, а также проблемы с читателем и писателями, и я не буду расширять ее здесь.
1.5 Countdownlatch
Типичным сценарием для таймера обратного отсчета является запуск ракету. Перед запуском ракеты, чтобы все было надежным, часто проводятся инспекции различного оборудования и инструментов. Двигатель может зажечь только после того, как все проверки завершены. Этот сценарий очень подходит для Countdownlatch. Это может заставить поток зажигания подождать, пока все проверки будут завершены перед его выполнением
Как использовать:
Статический окончательный отсчетный отсчетный счет end = new Countdownlatch (10);
end.countdown ();
end.await ();
Схематическая диаграмма:
Простой пример:
Пакет -тест; импорт java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; тест открытого класса внедряет выполнение {статическое окончательное countdownlatch countdownlatch = new countdownlatch (10); Статический окончательный тест t = новый тест (); @Override public void run () {try {thread.sleep (2000); System.out.println ("overse"); countdownlatch.countdown (); } catch (Exception e) {e.printstackTrace (); }} public static void main (string [] args) бросает прерывание {receectorservice executorservice = executors.newfixedthreadpool (10); for (int i = 0; i <10; i ++) {rececusterservice.execute (t); } countdownlatch.await (); System.out.println ("end"); executorservice.shutdown (); }}Основной поток должен ждать, пока все 10 потоков выполнят, прежде чем вывести «конец».
1.6 Cyclicbarrier
Подобно Countdownlatch, он также ждет завершения некоторых потоков, прежде чем их выполнить. Разница с CountdownLatch заключается в том, что этот счетчик может использоваться неоднократно. Например, предположим, что мы установили счетчик на 10. Затем после сбора первой партии из 10 потоков счетчик вернется в ноль, а затем собирает следующую партию из 10 потоков.
Как использовать:
Public Cyclicbarrier (Int Party, Runnable Barrieraction)
Барриерация - это действие, которое система выполнит, когда счетчик отсчитает один раз.
жду ()
Схематическая диаграмма:
Вот пример:
Пакет Test; import java.util.concurrent.cyclicbarrier; Общедоступный тест класса реализует Runnable {Private String Soldier; Частный конечный циклический циклический; Публичный тест (String Soldier, Cyclicbarrier Cyclic) {this.soldier = Soldier; this.cyclic = циклический; } @Override public void run () {try {// ждать, пока все солдаты прибудут cyclic.await (); Dowork (); // ждать, пока все солдаты завершит свою работу циклическим. } catch (Exception e) {// todo автоматически сгенерированный блок e.printstacktrace (); }} private void dowork () {// todo Автогенерированный метод заглушка try {thread.sleep (3000); } catch (Exception e) {// todo: обработка excection} System.out.println (Soldier + ": Dode"); } public Static Class Barrierrun реализует Runnable {Boolean Flag; int n; public Barrierrun (логический флаг, int n) {super (); this.flag = flag; this.n = n; } @Override public void run () {if (flag) {System.out.println (n + "Завершение задачи"); } else {System.out.println (n + "Установите завершение"); flag = true; }}} public static void main (string [] args) {final int n = 10; Thread [] Threads = новый поток [n]; логический флаг = false; Cyclicbarrier Barrier = новый Cyclicbarrier (n, New Barrierrun (Flag, N)); System.out.println ("set"); for (int i = 0; i <n; i ++) {System.out.println (i+"report"); потоки [i] = новый поток (новый тест ("Солдат" + I, Barrier)); Threads [i] .start (); }}}Результат печати:
собирать
0 отчетов
1 отчет
2 отчеты
3 отчеты
4 отчеты
5 отчетов
6 отчетов
7 отчетов
8 отчетов
9 отчетов
10 комплектов полного солдата 5: сделано
Солдат 7: Сделано
Солдат 8: Сделано
Солдат 3: Сделано
Солдат 4: Сделано
Солдат 1: Сделано
Солдат 6: Сделано
Солдат 2: Сделано
Солдат 0: Сделано
Солдат 9: Сделано
10 задач выполнены
1.7 С ЛОКАПОРТА
Обеспечить блокировку нити примитив
Похоже на приостановку
Locksupport.park ();
Locksupport.unpark (T1);
По сравнению с подвеской нелегко вызвать замораживание потока.
Идея слесаря несколько похожа на семафор. У него есть внутренняя лицензия. Это убирает эту лицензию при припарковании и подает заявку на эту лицензию, когда вы не являетесь. Следовательно, если unpark находится перед парком, замораживание ниток не произойдет.
Следующий код является приостановлением примерного кода в многопользовательском фонде [высокая параллелизации Java 2]. Приостановка приостановлена приостановлен.
Тест пакета; импорт java.util.concurrent.locks.locksupport; открытый тест класса {статический объект u = new Object (); staticessuspendthread t1 = new testsuspendthread ("t1"); staticessuspendthread t2 = new testsuspendthread ("t2"); Public Static Class TestSuspendThread Extends Thread {public TestSuspendThread (String name) {setName (name); } @Override public void run () {synchronized (u) {System.out.println ("in" + getName ()); //Thread.currentthread (). Supply (); Locksupport.park (); }}} public static void main (string [] args) бросает прерывание. Thread.sleep (100); t2.start (); // t1.resume (); // t2.resume (); Locksupport.unpark (T1); Locksupport.unpark (T2); t1.join (); t2.join (); }}Тем не менее, использование LockSupport не вызовет тупики.
кроме того
Park () может реагировать на прерывания, но не бросает исключения. Результатом ответа прерывания является то, что возврат функции Park () может получить флаг прерывания из потока. Interrupten ().
В JDK есть много мест, которые используют Park, конечно, реализация LockSupport также реализуется с использованием UnfeA.park ().
Public Static Void Park () {
небезопасно.park (false, 0l);
}
1.8 Реинтллок реализация
Давайте представим реализацию Reentrantlock. Реализация повторного урегулирования в основном состоит из трех частей:
Родительский класс Reentrantlock будет иметь переменную состояния для представления синхронного состояния.
/*** Состояние синхронизации. */ частное летучие состояния;
Установите состояние, чтобы приобрести блокировку через работу CAS. Если установить 1, держатель блокировки передается текущему потоку
final void lock () {if (compareandState (0, 1)) setExclusiveHarthread (thread.currentThread ()); иначе приобрести (1); }Если блокировка не успешной, будет сделана заявка
public final void приобретен (int arg) {if (! tryAcquire (arg) && accirequed (addWaiter (node.exclusive), arg)) selfEntruption (); }Во -первых, попробуйте TryAcquire после подачи заявления, потому что еще одна ветка, возможно, выпустила блокировку.
Если вы все еще не подали заявку на замок, добавьте официанта, что означает добавление себя в очередь ожидания
Private Node AddWaiter (режим узла) {Node node = new Node (Thread.currentThread (), Mode); // попробуйте быстрый путь ENQ; Резервное копирование на полное ENQ на узел сбоя Pred = Tail; if (pred! = null) {node.prev = pred; if (CompareandStatail (pred, node)) {pred.next = node; вернуть узел; }} enq (node); вернуть узел; }В течение этого периода будет много попыток подать заявку на замок, и если вы все еще не можете подать заявку, вы будете повешены.
Частный финальный логический балансовый return Thread.Erenprupten (); }
Точно так же, если блокировка выпускается, а затем не обсуждается подробно здесь.
2. Одновременный контейнер и типичный анализ исходного кода
2.1 concurrenthashmap
Мы знаем, что HashMap не является защитным потоком контейнера. Самый простой способ сделать хэшмап-нить-это использование
Collections.synchronizedMap, это обертка для HashMap
Общественная статическая карта m = collections.synchronizedmap (new hashmap ());
Точно так же для списка SET также предоставляет аналогичные методы.
Тем не менее, этот метод подходит только для тех случаев, когда сумма параллелизма относительно невелика.
Давайте посмотрим на реализацию SynchronizedMap
Частная финальная карта <K, V> M; // поддержка карты конечного объекта Mutex; // объект для синхронизации SynchronizedMap (map <k, v> m) {if (m == null) бросить новый NullPointerException (); this.m = m; mutex = это; } SynchronizedMap (map <k, v> m, объект mutex) {this.m = m; this.mutex = mutex; } public int size () {synchronized (mutex) {return m.size ();}} public boolean isempty () {synchronized (mutex) {return m.isempty ();}} public boolean containskey (объект) {synchronized (mutex) {return m.containse keemaine); Synchronized (mutex) {return m.containsvalue (value);}} public v get (объект ключа) {synchronized (mutex) {return m.get (key);}} public v (k -ключ, v value) {synchronized (mutex) {return m.put (key, value);} public ved (объект) {synch) {synch) {return) {return) {return) M.Remove (key);}} public void putall (map <? Extends k ,? extends v> map) {synchronized (mutex) {m.putall (map);}} public void clear () {synchronized (mutex) {m.clear ();}}Он завершает хэшмап и затем синхронизировал каждую операцию HashMap.
Поскольку каждый метод приобретает один и тот же блокировка (Mutex), это означает, что такие операции, как Put and Relement, являются взаимоисключающими, что значительно сокращает количество параллелизма.
Посмотрим, как реализован concurrenthashmap
public v put (k key, v value) {сегмент <k, v> s; if (value == null) бросить новый NullPointerException (); int hash = hash (ключ); int j = (хэш >>> segmentShift) и сегментмак; if ((s = (сегмент <k, v>) unceabe.getObject // нелетучий; перепроверка (сегменты, (j << sshift) + sbase)) == null) // in ensuresegment s = evuresegment (j); вернуть S.Put (ключ, хэш, значение, false); }В пределах concurrenthashmap есть сегмент сегмента, который делит большой хэшмап на несколько сегментов (небольшая хэшмапа), а затем хэшируют данные на каждом сегменте. Таким образом, операции хеш-нескольких потоков в разных сегментах должны быть защищены потоком, поэтому вам необходимо синхронизировать потоки только в одном и том же сегменте, что осознает разделение замков и значительно увеличивает параллелизм.
При использовании comurrenthashmap.size будет более хлопотно, потому что он должен подсчитать сумма данных каждого сегмента. В настоящее время вам нужно добавить блокировки в каждый сегмент, а затем сделать статистику данных. Это небольшой недостаток после разделения блокировки, но метод размера не должен вызывать на высокой частоте.
С точки зрения реализации, мы не используем синхронизированный и блокировка, но пробуйте как можно больше. В то же время мы также сделали некоторые оптимизации в реализации HashMap. Я не буду упомянуть об этом здесь.
2.2 Blockqueue
BlockingQueue не является высокопроизводительным контейнером. Но это очень хороший контейнер для обмена данными. Это типичная реализация производителей и потребителей.
Схематическая диаграмма:
Для получения подробной информации вы можете проверить реализацию Java проблем с производителями и потребителями, а также задач читателя и писателей.