Na [alta concorrência Java II] Fundação multi-threading, mencionamos inicialmente as operações básicas de sincronização de encadeamentos. O que queremos mencionar desta vez é a ferramenta de controle de sincronização no pacote simultâneo.
1. Uso de várias ferramentas de controle de sincronização
1.1 Reentrantlock
O ReentrantLock parece uma versão aprimorada do sincronizado. A característica do sincronizada é que é simples de usar e tudo é deixado para a JVM para processamento, mas suas funções são relativamente fracas. Antes do JDK1.5, o desempenho de Reentrantlock foi melhor que o sincronizado. Devido à otimização da JVM, o desempenho dos dois na versão JDK atual é comparável. Se for uma implementação simples, não use deliberadamente o ReentrantLock.
Comparado ao sincronizado, o Reentrantlock é mais rico funcionalmente e possui as características do tempo reentrante, interrompível, limitado e de bloqueio justo.
Primeiro, vamos usar um exemplo para ilustrar o uso inicial do ReentrantLock:
teste do pacote; importar java.util.concurrent.locks.reentrantlock; o teste de classe pública implementa o runnable {public static reentrantlock bloqueio = new reentrantlock (); public static int i = 0; @Override public void run () {for (int j = 0; j <10000000; j ++) {Lock.lock (); tente {i ++; } finalmente {Lock.unlock (); }}} public static void main (string [] args) lança interruptedException {test test = new test (); Thread t1 = novo thread (teste); Tópico T2 = novo thread (teste); t1.start (); t2.start (); t1.Join (); t2.Join (); System.out.println (i); }}Existem dois threads que executam operações ++ em i. Para garantir a segurança do thread, o reentrantlock é usado. A partir do uso, podemos ver que, em comparação com o sincronizado, o ReentrantLock é um pouco mais complicado. Como a operação de desbloqueio deve ser executada finalmente, se não for finalmente desbloqueado, é possível que o código tenha uma exceção e o bloqueio não seja liberado e o sincronizado é liberado pela JVM.
Então, quais são as excelentes características do Reentrantlock?
1.1.1 Reentrada
Um único thread pode ser inserido repetidamente, mas deve ser excitado repetidamente
Lock.lock (); Lock.lock (); tente {i ++; } finalmente {Lock.unlock (); Lock.unlock ();}Como o ReentrantLock é um bloqueio reentrante, você pode obter a mesma trava repetidamente, que possui um contador de aquisição relacionado à bloqueio. Se um thread que possui a trava receber o bloqueio novamente, o contador de aquisição será aumentado em 1 e a trava precisará ser liberada duas vezes para obter a liberação real (trava reentrante). Isso imita a semântica da sincronizada; Se o thread entrar em um bloco sincronizado protegido pelo monitor que o thread já possui, o encadeamento poderá continuar. Quando o thread sai do segundo (ou subsequente) bloco sincronizado, o bloqueio não é liberado. O bloqueio é liberado apenas quando o thread sai do primeiro bloco sincronizado protegido pelo monitor que ele entra.
A classe pública Child estende o pai implementa Runnable {Final estático filho da criança = New Child (); // para garantir que o bloqueio público estático exclusivo estático principal principal (string [] args) {for (int i = 0; i <50; i ++) {new Thread (Child) .start (); }} public sincronizado void Dosomething () {System.out.println ("1Child.Dosomething ()"); DoanOtherthing (); // Ligue para outros métodos sincronizados em sua própria classe} private sincronizado void DoanOtherthing () {super.dosomething (); // Ligue para o método sincronizado da classe pai System.out.println ("3Child.doanOtherthing ()"); } @Override public void run () {Child.Dosomething (); }} classe pai {public sincronizado void Dosomething () {System.out.println ("2father.dosomething ()"); }}Podemos ver que um encadeamento entra em um método sincronizado diferente e não liberará os bloqueios obtidos antes. Portanto, a saída ainda é sequencialmente. Portanto, sincronizado também é uma trava reentrante
Saída:
1Child.Dosomething ()
2father.Dosomething ()
3Child.doanOthingthing ()
1Child.Dosomething ()
2father.Dosomething ()
3Child.doanOthingthing ()
1Child.Dosomething ()
2father.Dosomething ()
3Child.doanOthingthing ()
...
1.1.2. Interrompível
Ao contrário do sincronizado, o Reentrantlock responde às interrupções. Visão de conhecimento relacionado a interromper [alta concorrência Java 2] Basics multithreading
Ordinary Lock.lock () não pode responder às interrupções, Lock.lockInterruptível () pode responder às interrupções.
Simulamos uma cena de impasse e depois usamos interrupções para lidar com o impasse
teste de pacote; importar java.lang.management.ManagementFactory; importar java.lang.management.threadinfo; importar java.lang.management.threadMxBean; importar java.util.concurrent.locks.reentrantlock; public class Runnable {public satics; public static reentrantlock Lock2 = new reentrantlock (); int bloqueio; Teste público (int bloqueado) {this.lock = Lock; } @Override public void run () {try {if (Lock == 1) {Lock1.lockInterruptível (); tente {thread.sleep (500); } Catch (Exceção e) {// TODO: HODE EXCEÇÃO} LOCK2.LOCKINTERRUPBILÍVEL (); } else {Lock2.lockInterruptly (); tente {thread.sleep (500); } Catch (Exceção e) {// TODO: HODE EXCEÇÃO} LOCK1.LOCKINTERRUPTIAL (); }} Catch (Exceção e) {// TODO: HODE EXCEÇÃO} finalmente {if (Lock1.isheldByCurrentThread ()) {Lock1.unlock (); } if (Lock2.isheldByCurrentThread ()) {Lock2.Unlock (); } System.out.println (thread.currentThread (). GetId () + ": exit de thread"); }} public static void main (string [] args) lança interruptedException {teste t1 = novo teste (1); Teste t2 = novo teste (2); Thread Thread1 = novo thread (t1); Thread Thread2 = novo thread (T2); Thread1.start (); Thread2.start (); Thread.sleep (1000); //Deadlockchecker.check (); } classe estática deadlockChecker {private final estático threadMxBean mBean = gerenciamento de fábrica .getThreadMxBean (); final estático executável deadlockChecker = new Runnable () {@Override public void run () {// TODO Auto-Gerated Method Stub While (true) {Long [] DeadlockedThreadids = Mbean.FindDeadlockedthreads (); if (deadlockedThreadIds! = null) {threadInfo [] threadInfos = mbean.getThreadInfo (deadlockedThreadIds); para (thread t: thread.getAllStacktraces (). KeySet ()) {for (int i = 0; i <threadInfos.length; i ++) {if (t.getId () == ThreadInfos [i] .getThReadId ()) {t.interrupt (); }}}}} tente {thread.sleep (5000); } catch (Exceção e) {// TODO: lidar com exceção}}}}}; public static void check () {thread t = novo thread (deadlockChecker); t.SetDaemon (true); t.start (); }}}O código acima pode causar impasse, o thread 1 recebe o Lock1, o Thread 2 obtém o Lock2 e, em seguida, o outro quer obter as bloqueios um do outro.
Usamos o Jstack para ver a situação depois de executar o código acima
Um impasse foi realmente descoberto.
O deadlockchecker.check (); O método é usado para detectar deadlocks e interromper o tópico de impasse. Após a interrupção, o thread sai normalmente.
1.1.3. Tempo limitado
Se o tempo limite não puder obter o bloqueio, ele retornará falso e não esperará permanentemente para formar uma trava morta.
Use Lock.TryLock (tempo limite longo, unidade de unidade de tempo) para implementar bloqueios de tempo e tempo, com parâmetros sendo tempo e unidades.
Deixe -me dar um exemplo para ilustrar que o tempo pode ser limitado:
teste do pacote; importar java.util.concurrent.timeunit; importar java.util.concurrent.locks.reentrantlock; public class Test implementa Runnable {public static reentrantlock Lock = new reentrantlock (); @Override public void run () {try {if (lock.trylock (5, timeUnit.seconds)) {thread.sleep (6000); } else {System.out.println ("Get Lock Falt"); }} catch (Exceção e) {} finalmente {if (lock.isheldbycurrentThread ()) {Lock.unlock (); }}} public static void main (string [] args) {test t = new test (); Thread t1 = novo thread (t); Tópico T2 = novo thread (t); t1.start (); t2.start (); }}Use dois threads para competir para um bloqueio. Quando um thread adquire a trava, durma 6 segundos e cada thread tenta apenas obter a trava por 5 segundos.
Portanto, deve haver um thread que não possa obter o bloqueio. Se você não conseguir obtê -lo, sairá diretamente.
Saída:
Falha na bloqueio
1.1.4. Bloqueio justo
Como usar:
Public Reentrantlock (Boolean Fair)
public static reentrantlock Fairlock = novo reentrantlock (true);
As fechaduras em geral são injustas. Não é necessariamente possível que o thread que vem primeiro possa obter o bloqueio primeiro, mas o thread que vem mais tarde receberá o bloqueio mais tarde. Bloqueios injustos podem causar fome.
Uma trava justa significa que esse bloqueio pode garantir que o encadeamento chegue primeiro e receba a trava primeiro. Embora os bloqueios justos não causem fome, o desempenho de bloqueios justos será muito pior do que o de bloqueios que não são de faixas.
1.2 Condição
A relação entre condição e reentrantlock é semelhante à sincronizada e objeto.wait ()/signal ()
O método Await () fará com que o thread atual aguarde e solte o bloqueio atual. Quando o sinal () é usado em outros threads ou o método SignalAll (), o thread recuperará o bloqueio e continua a executar. Ou quando o tópico é interrompido, você também pode pular da espera. Isso é muito semelhante ao método objeto.wait ().
O método AwaitUnInterruptly () é basicamente o mesmo que o método Wait (), mas não aguarda a interrupção da resposta durante o processo. O método singal () é usado para acordar um fio esperando. O método relativo singalall () acordará todos os threads esperando. Isso é muito semelhante ao método objct.Notify ().
Não vou apresentá -lo em detalhes aqui. Deixe -me dar um exemplo para ilustrar:
teste de pacote; importar java.util.concurrent.locks.condition; importar java.util.concurrent.locks.reentrantlock; public classe teste implementa Runnable {public static reentrantlock bloqueio = new reentrantlock (); condição estática pública condição = bloqueio.newcondition (); @Override public void run () {try {Lock.lock (); condicional.await (); System.out.println ("Thread está acontecendo"); } catch (Exceção e) {e.printStackTrace (); } finalmente {Lock.unlock (); }} public static void main (string [] args) lança interruptedException {test t = new test (); Thread Thread = novo thread (t); thread.start (); Thread.sleep (2000); Lock.lock (); condicional.Signal (); Lock.Unlock (); }}O exemplo acima é muito simples. Deixe um tópico aguardar e deixe o fio principal acordar. condition.await ()/sinal só pode ser usado após a obtenção da trava.
1.3.Semáforo
Para bloqueios, é mutuamente exclusivo. Isso significa que, enquanto eu conseguir a fechadura, ninguém poderá obtê -lo novamente.
Para o semáforo, ele permite que vários threads entrem na seção crítica ao mesmo tempo. Pode ser considerado um bloqueio compartilhado, mas o limite compartilhado é limitado. Depois que o limite é usado, outros threads que não obtiveram o limite ainda bloquearão fora da área crítica. Quando a quantidade é 1, é equivalente a travar
Aqui está um exemplo:
teste do pacote; importar java.util.concurrent.executorService; importar java.util.concurrent.executores; importar java.util.concurrent.semaphore; teste público, implementa executáveis {final Semaphore Semaphore = New Semaphore (5); @Override public void run () {try {semaphore.acquire (); Thread.sleep (2000); System.out.println (thread.currentThread (). GetId () + "done"); } catch (Exceção e) {e.printStackTrace (); } finalmente {semaphore.release (); }} public static void main (string [] args) lança interruptedException {ExecutSorService ExecorService = executores.newfixedThreadpool (20); teste final t = new test (); for (int i = 0; i <20; i ++) {executorService.submit (t); }}}Há um pool de threads com 20 threads e cada thread vai para a licença do Semaphore. Existem apenas 5 licenças para semáforo. Após a execução, você pode ver que 5 são emitidos em lotes, os lotes são emitidos.
Obviamente, um tópico também pode solicitar várias licenças de uma só vez
public void adquirir (int lices) lança interruptedException
1.4 ReadWritelock
O ReadWritelock é um bloqueio que distingue as funções. A leitura e a escrita são duas funções diferentes: a leitura de leitura não é mutuamente exclusiva, a leitura de leitura é mutuamente exclusiva e a gravação de gravação é mutuamente exclusiva.
Esse design aumenta a simultaneidade e garante a segurança dos dados.
Como usar:
private estático reentrantreadwritelock readWritelock = new reentrantreadWritelock ();
Readlock de bloqueio estático privado = readWritelock.readlock ();
WriteLock de trava estática privada = readWritelock.Writelock ();
Para exemplos detalhados, você pode visualizar a implementação Java dos problemas de produtor e do consumidor e problemas de leitor e escritor, e não o expandirei aqui.
1.5 Countdownlatch
Um cenário típico para um cronômetro de contagem regressiva é um lançamento de foguete. Antes do lançamento do foguete, a fim de garantir que tudo seja infalível, as inspeções de vários equipamentos e instrumentos são frequentemente realizados. O motor só pode acender depois que todas as inspeções forem concluídas. Esse cenário é muito adequado para o Countdownlatch. Pode fazer com que o tópico de ignição aguarde todos os threads de verificação para concluir antes de executá -lo
Como usar:
estático final contingdownlatch end = new Countdownlatch (10);
end.countdown ();
end.await ();
Diagrama esquemático:
Um exemplo simples:
teste do pacote; importar java.util.concurrent.countdownlatch; importar java.util.concurrent.executorService; importar java.util.concurrent.executores; o teste de classe pública implementa executáveis {Static Final CountdownLatchllatchllatlatch = New CountDownLatch (10); teste final estático t = new test (); @Override public void run () {try {thread.sleep (2000); System.out.println ("Complete"); CountdownLatch.CountDown (); } catch (Exceção e) {e.printStackTrace (); }} public static void main (string [] args) lança interruptedException {ExecutSorService ExecorService = executores.newfixedThreadpool (10); for (int i = 0; i <10; i ++) {executorService.execute (t); } Countdownlatch.await (); System.out.println ("end"); executorService.shutdown (); }}O thread principal deve esperar que todos os 10 threads sejam executados antes de produzir "End".
1.6 CyclicBarrier
Semelhante ao CountdownLatch, ele também está aguardando alguns threads antes de executá -los. A diferença com o CountdownLatch é que esse contador pode ser usado repetidamente. Por exemplo, suponha que definimos o contador para 10. Depois de reunir o primeiro lote de 10 threads, o contador retornará a zero e depois coleta o próximo lote de 10 threads
Como usar:
Public CyclicBarrier (Int Parties, Runnable BarrierAction)
BarrierAction é a ação que o sistema será executado quando o contador contar uma vez.
aguarda ()
Diagrama esquemático:
Aqui está um exemplo:
teste de pacote; importar java.util.concurrent.cyclicBarrier; public class Test implementa Runnable {private string soldado; CiclicBarrier final privado Ciclic; Teste público (String Soldier, CyclicBarrier Cyclic) {this.soldier = soldado; this.cyclic = cyclic; } @Override public void run () {try {// Aguarde que todos os soldados cheguem Cyclic.await (); Dowork (); // Aguarde que todos os soldados completem seu trabalho ciclic.await (); } Catch (Exceção e) {// TODO BLOCO DE CAPAGEM AUTOMENTADO E.PRINTSTACKTRACE (); }} private void Dowork () {// TODO Método Gerado Auto-Generado Stub Try {Thread.sleep (3000); } Catch (Exceção e) {// TODO: HOLANDE Exception} System.out.println (Soldier + ": Done"); } classe estática pública Barrierrun implementa Runnable {Bandeira Booleana; int n; public Barrierrun (bandeira booleana, int n) {super (); this.flag = sinalizador; this.n = n; } @Override public void run () {if (flag) {System.out.println (n + "conclusão da tarefa"); } else {System.out.println (n + "set conclusão"); bandeira = true; }}} public static void main (string [] args) {final int n = 10; Thread [] threads = novo thread [n]; bandeira booleana = false; CiclicBarrier Barreira = novo CyclicBarrier (n, New Barrierrun (Flag, N)); System.out.println ("set"); for (int i = 0; i <n; i ++) {System.out.println (i+"relatário"); threads [i] = novo thread (novo teste ("soldado" + i, barreira)); threads [i] .start (); }}}Resultado de impressão:
juntar
0 relatórios
1 Relatório
2 relatórios
3 relatórios
4 relatórios
5 relatórios
6 relatórios
7 relatórios
8 relatórios
9 relatórios
10 conjuntos soldados completos 5: Concluído
Soldado 7: feito
Soldado 8: feito
Soldado 3: feito
Soldado 4: feito
Soldado 1: feito
Soldado 6: feito
Soldado 2: feito
Soldado 0: feito
Soldado 9: feito
10 tarefas concluídas
1.7 Locksupport
Forneça o Primitivo de Bloqueio de Tópico
Semelhante a suspender
LockSupport.park ();
Locksupport.unpark (T1);
Comparado com o suspensão, não é fácil causar congelamento de threads.
A idéia de Locksupport é um pouco semelhante ao semáforo. Tem uma licença interna. Ele retira esta licença quando estacionado e solicita esta licença quando desmarcada. Portanto, se o UMPARK for antes do parque, o congelamento do thread não ocorrerá.
O código a seguir é o código de amostra de suspensão na fundação de múltiplos threading [alta concorrência Java 2]. Um impasse ocorre ao usar suspensão.
teste de pacote; importar java.util.concurrent.locks.locksupport; Public class Test {objeto estático u = new Object (); testsussuspendThread t1 = novo testsuspendThread ("t1"); testsussuspendThread T2 = novo testsuspendThread ("t2"); classe estática public static testsUsEndThread estende o thread {public testSUSSPENDTHREAD (nome da string) {setName (nome); } @Override public void run () {Synchronized (u) {System.out.println ("em" + getName ()); //Thread.currentThread (). Suspender (); LockSupport.park (); }}} public static void main (string [] args) lança interruptedException {t1.start (); Thread.sleep (100); t2.start (); // t1.resume (); // t2.resume (); Locksupport.unpark (T1); Locksupport.unpark (T2); t1.Join (); t2.Join (); }}No entanto, o uso do LockSupport não causará impasse.
além disso
Park () pode responder a interrupções, mas não joga exceções. O resultado da resposta de interrupção é que o retorno da função Park () pode obter o sinalizador de interrupção do Thread.Interrupted ().
Existem muitos lugares no JDK que usam o Park, é claro, a implementação do LockSupport também é implementada usando o inseguro.park ().
Public Static Void Park () {
insegu.park (false, 0l);
}
1.8 Implementação do Reentrantlock
Vamos apresentar a implementação do Reentrantlock. A implementação do Reentrantlock é composta principalmente por três partes:
A classe pai do Reentrantlock terá uma variável de estado para representar o estado síncrono.
/*** O estado de sincronização. */ Estado volátil privado int;
Defina o estado para adquirir o bloqueio através da operação do CAS. Se definido como 1, o suporte de trava é dado ao tópico atual
Final Void Lock () {if (comparaandStState (0, 1)) setexclusivewnerthread (thread.currentThread ()); caso contrário, adquirir (1); }Se o bloqueio não for bem -sucedido, um aplicativo será feito
public final void adquirir (int arg) {if (! TryAcquire (arg) && adquirequeed (addwaiter (node.exclusive), arg)) selfInterrupt (); }Primeiro, tente TryAcquire após a aplicação, porque outro thread pode ter liberado o bloqueio.
Se você ainda não se inscreveu na fechadura, adicione o garçom, o que significa adicionar -se à fila de espera
nó privado addwaiter (modo nó) {nó nó = novo nó (thread.currentThread (), modo); // tente o caminho rápido do ENQ; backup para enq completo no nó de falha pred = cauda; if (pred! = null) {node.prev = pred; if (comparaAndStetTail (pred, nó)) {pred.Next = node; Nó de retorno; }} enq (nó); Nó de retorno; }Durante esse período, haverá muitas tentativas de se candidatar a uma fechadura e, se você ainda não puder se inscrever, estará pendurado.
private final boolean parkandcheckinterrupt () {LockSupport.park (this); retornar thread.Interrupted (); }Da mesma forma, se o bloqueio for liberado e a USPARK não for discutida em detalhes aqui.
2. Contêiner simultâneo e análise típica do código -fonte
2.1 Concurrenthashmap
Sabemos que o Hashmap não é um contêiner seguro para roscas. A maneira mais fácil de fazer o hashmap segura
Coleções.synchronizedmap, é um invólucro para hashmap
mapa estático público m = coleções.SynchronizedMap (new Hashmap ());
Da mesma forma, para a lista, o SET também fornece métodos semelhantes.
No entanto, esse método é adequado apenas para casos em que a quantidade de simultaneidade é relativamente pequena.
Vamos dar uma olhada na implementação do SynchronizedMap
mapa final privado <k, v> m; // mapa de apoio objeto final mutex; // objeto no qual sincronizar synchronizedmap (map <k, v> m) {if (m == null) lança new nullPointerException (); this.m = m; mutex = this; } SincronizadaMap (map <k, v> m, objeto mutex) {this.m = m; this.mutex = mutex; } public int size () {synchronized (mutex) {return m.size ();}} public boolean isEmpty () {synchronized (mutex) {return m.isempty ();}} public boolean containsKey (objeto key) {synchronized (matex); {return M.Contains sincronizado (mutex) {return m.containsValue (value);}} public v get (chave do objeto) {synchronized (mutex) {return m.get (key);}} public v put (synch); M.Remove (key);}} public void Putall (map <? Extende k ,? estende v> map) {sincronizado (mutex) {m.putall (map);}} public void clear () {synchronized (mutex) {m.clear ();}}}Ele envolve o hashmap no interior e depois sincronizou todas as operações do hashmap.
Como cada método adquire o mesmo bloqueio (Mutex), isso significa que operações como put e remover são mutuamente exclusivas, reduzindo bastante a quantidade de simultaneidade.
Vamos ver como simultaneses é implementado
public V put (K -Key, V valor) {segmento <k, v> s; if (value == null) lança novo nullPointerException (); int hash = hash (chave); int j = (hash >>> segmentshift) e segmentMask; if ((s = (segmento <k, v>) insefe.getObject // não -volátil; recheck (segmentos, (j << sshift) + sbase)) == null) // em aleresegment s = aleresegment (j); return s.put (chave, hash, valor, false); }Há um segmento de segmento dentro do sapão concorrente, que divide o grande hashmap em vários segmentos (pequeno hashmap) e, em seguida, hash os dados em cada segmento. Dessa forma, as operações de hash de vários threads em diferentes segmentos devem ser seguros para roscas, portanto, você só precisa sincronizar os encadeamentos no mesmo segmento, o que realiza a separação de bloqueios e aumenta muito a concorrência.
Será mais problemático ao usar o concurso. No momento, você precisa adicionar bloqueios a cada segmento e, em seguida, fazer estatísticas de dados. Essa é uma pequena desvantagem após separar a trava, mas o método de tamanho não deve ser chamado em alta frequência.
Em termos de implementação, não usamos sincronizados e bloqueia, mas o TRYLOCK o máximo possível. Ao mesmo tempo, também fizemos algumas otimizações na implementação do HashMap. Não vou mencionar aqui.
2.2 BlockingQueue
BlockingQueue não é um recipiente de alto desempenho. Mas é um contêiner muito bom para compartilhar dados. É uma implementação típica de produtores e consumidores.
Diagrama esquemático:
Para detalhes, você pode verificar a implementação Java dos problemas de produtor e do consumidor e problemas de leitores e escritores.