1. Uso básico de pools de threads
1.1. Por que você precisa de um pool de threads?
Nos negócios diários, se quisermos usar o multi-threading, criaremos tópicos antes do início do negócio e destruiremos threads após o término do negócio. No entanto, para os negócios, a criação e destruição de tópicos não têm nada a ver com o próprio negócio, e apenas se importa com as tarefas executadas pelo tópico. Portanto, espero usar o maior número possível de CPUs para executar tarefas, em vez de criar e destruir threads que não estão relacionados aos negócios. O pool de threads resolve esse problema. A função do pool de threads é reutilizar threads.
1.2. Que apoio o JDK fornece para nós
Os diagramas de classe relacionados no JDK são mostrados na figura acima.
Várias categorias especiais a serem mencionadas.
A classe Callable é semelhante à classe Runable, mas a diferença é que chamável tem um valor de retorno.
ThreadpoolExecutor é uma implementação importante dos pools de threads.
Executores é uma classe de fábrica.
1.3. Uso de piscinas de threads
1.3.1. Tipos de piscinas de threads
public Static ExecorService NewFixedThreadpool (int nthreads) {Retorne novo ThreadPoolExecutor (NTHREADS, NTHREADS, 0L, TIMEUNIT.MILLISECONDS, NEW LinkEdBlockBlockBeUe <Runnable> ());} public staticsSerSseler NewsingLeTeLexExEngEue (Runnable> (); 1, 0l, timeUnit.millisEconds, novo LinkedBlockingQueue <drunnable> ()));} public static executorService newcachedthreadpool () {return threadpoolExecutor (0, Integer.max_value, 60l, timeUnit.SoConds, synchronsquue <runnenue, <nunlEble),Do ponto de vista do método, é óbvio que o FILLTHREADPOOL, o SingleThreadExecutor e o CachedThreadpool são diferentes instâncias do ThreadPoolExecutor, mas os parâmetros são diferentes.
public threadpoolExecutor (int CorePoolSize, int maximumPoolSize, Longo KeepAliveTime, Unidade Timeunit, BlockingQueue <uncrnable> WorkQueue) {this (CorePoolSize, MaximumPoolSize, KeepAlivETime, unidade, Workqueue, Executores.DefaultThreadFactory (), DefaLiVandler); Vamos descrever brevemente o significado dos parâmetros no construtor ThreadpoolExecutor.
Dessa forma, olhando para o FILLTHREADPOOL mencionado acima, o número de núcleos e o número máximo de threads é o mesmo, para que os threads não sejam criados e destruídos durante o trabalho. Quando o número de tarefas é grande e os threads no pool de threads não podem ser atendidos, a tarefa será salva no LinkedBlockingQueue e o tamanho do LinkedBlockingQueue é inteiro.max_value. Isso significa que a adição contínua de tarefas fará com que a memória consuma cada vez mais.
CachedThreadpool é diferente. Seu número de encadeamento principal é 0, o número máximo de armazenamento é inteiro.max_value e sua fila de bloqueio é síncrona, que é uma fila especial, e seu tamanho é 0. Como o número de roscas principais é 0, é necessário adicionar a tarefa ao síncrono. Esta fila só pode ter sucesso quando um thread adiciona dados e outro thread obtém dados dele. Adicionar dados a esta fila por si só retornará uma falha. Quando o retorno falha, o pool de threads começa a expandir o thread, e é por isso que o número de threads no CachedThreadpool não é corrigido. Quando o fio não é usado para os anos 60, o fio é destruído.
1.4. Pequenos exemplos de uso do pool de threads
1.4.1. Pool de threads simples
importar java.util.concurrent.executorService; importar java.util.concurrent.executores; classe pública threadpooldemo {public static class MyTask implementa runnable {@Override public void Run () {System.out.println (System.CurntIntInMillis () + " +"; tente {thread.sleep (1000); } catch (Exceção e) {e.printStackTrace (); }}} public static void main (string [] args) {myTask myTask = new MyTask (); ExecutorService ES = executores.newfixedThreadpool (5); for (int i = 0; i <10; i ++) {es.submit (myTask); }}} Como o newFixedThreadpool (5) é usado, mas 10 threads são iniciados, 5 são executados por vez e é óbvio que a reutilização do thread é vista. O ThreadID é repetido, ou seja, as 5 primeiras tarefas e as últimas 5 tarefas são executadas pelo mesmo lote de threads.
O que é usado aqui
es.submit (myTask);
Há também uma maneira de enviar:
es.execute (myTask);
A diferença é que o envio retornará um objeto futuro, que será introduzido posteriormente.
1.4.2.ScheduledThreadpool
importar java.util.Concurrent.Executores; importar java.util.concurrent.SCHEDULEDEXECONDOVERSERVICE; importar java.util.Concurrent.TimeUnit; public class ThreadPooldemo {public static void main (string [] args) {schedulEdExExExPeReReReReRSENSN = STATID Main (string [] args) {ScheduLEdExExExEdemo ssEstrice. // Se a tarefa anterior não tiver sido concluída, o despacho não será iniciado. ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()/1000); } catch (Exception e) { // TODO: handle exception } } }, 0, 2, TimeUnit.SECONDS);//Execute after starting 0 segundos e depois execute uma vez a cada 2 segundos em um ciclo}}Saída:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Como a execução da tarefa leva 1 segundo, o agendamento de tarefas deve aguardar a conclusão da tarefa anterior. Ou seja, a cada 2 segundos aqui significa que uma nova tarefa será iniciada 2 segundos após a conclusão da tarefa anterior.
2. Estender e aprimorar o pool de threads
2.1. Interface de retorno de chamada
Existem algumas APIs de retorno de chamada no pool de threads para nos fornecer operações estendidas.
ExecutorService ES = novo ThreadPoolExector (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingQueue <uncrnable> ()) {@Override Protected void antes (Thread t, Runnable r) {System.out.println ("Prepare para Execute"); } @Override Protected void depoisexecute (runnable r, throwable t) {System.out.println ("Execução concluída"); } @Override Protected void terminado () {System.out.println ("EXIT do pool de threads"); }};Podemos implementar os métodos anteriores ao Execute, AfterExecute e terminados do ThreadPoolExector para implementar o gerenciamento de logs ou outras operações antes e após a execução do thread, a saída do pool de threads.
2.2. Estratégia de rejeição
Às vezes, as tarefas são muito pesadas, resultando em muita carga no sistema. Como mencionado acima, quando o número de tarefas aumentar, todas as tarefas serão colocadas na fila de bloqueio do FILLTHREADPOOL, resultando em muito consumo de memória e eventualmente transbordamento de memória. Tais situações devem ser evitadas. Portanto, quando descobrimos que o número de threads excede o número máximo de threads, devemos desistir de algumas tarefas. Ao descartar, devemos escrever a tarefa em vez de jogá -la fora diretamente.
Há outro construtor no ThreadpoolExector.
public ThreadPoolExecutor (int CorePoolSize, Int MaximumPoolSize, Longo KeepaliveTime, Unidade TimeUnit, BlockingQueue <uncrnable> WorkQueUe, ThreadFactoryFactory, rejeição rejeitada Manipulador de Manipulador) {if (CorePoolSize <0 MaximumpIlize <= 0 Maximumpool) { IlegalargumentException (); if (workQueue == null || threadFactory == NULL || Handler == NULL) lança novo NullPointerException (); this.corePoolSize = CorePoolSize; this.MaximumPoolSize = MaximumPoolSize; this.WorkQueue = WorkQueue; this.KeepaliveTime = Unit.toNanos (KeepaliveTime); this.ThreadFactory = ThreadFactory; this.Handler = manipulador; } Introduziremos o ThreadFactory mais tarde.
O manipulador rejeita a implementação da política, que nos dirá o que fazer se a tarefa não puder ser executada.
Há um total das 4 estratégias acima.
ABORTPOLICY: Se a tarefa não puder ser aceita, uma exceção será lançada.
CallerNSPolicy: Se a tarefa não puder ser aceita, deixe o tópico de chamada completo.
DispardoldestPolicy: Se a tarefa não puder ser aceita, a tarefa mais antiga será descartada e mantida por uma fila.
DispardPolicy: Se a tarefa não puder ser aceita, a tarefa será descartada.
ExecutorService ES = novo ThreadPoolExector (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingQueue <unnable> (), new rejejatEDexecutionHandler () {@Override ISTID.PretexExecution (runnable r, threadpoolosCutor); Obviamente, também podemos implementar a rejeição de rejeição de manipulador de manutenção para definir a política de rejeição sozinha.
2.3. Personalize ThreadFactory
Acabei de ver que o ThreadFactory pode ser especificado no construtor do ThreadPoolExector.
Os threads no pool de threads são todos criados pela fábrica de threads e podemos personalizar a fábrica de threads.
Fábrica de threads padrão:
Classe estática DefaultThreadFactory Iplementos threadFactory {private estático final atomicinteger poolNumber = new AtomicInteger (1); Grupo de Thread Group privado; private final atomicinteger threadNumber = new AtomicInteger (1); String final privada Nameprefix; DefaultThreadFactory () {SecurityManager S = System.getSecurityManager (); grupo = (s! = nulo)? s.getThreadGroup (): Thread.currentThread (). getThreadGroup (); nameprefix = "Pool-" + poolNumber.GetAndIncrement () + "-Thread-"; } thread public newthread (runnable r) {thread t = new Thread (grupo, r, nameprefix + threadNumber.GetAndIncrement (), 0); if (t.isdaemon ()) t.setDaemon (false); if (t.getPriority ()! = thread.norm_priority) t.setPriority (thread.norm_priority); retornar t; }}3. Forkjoin
3.1. Pensamentos
É a ideia de dividir e conquistar.
O garfo/junção é semelhante ao algoritmo MapReduce. A diferença entre os dois é: o garfo/junção é dividido em pequenas tarefas somente quando necessário, como se a tarefa for muito grande, enquanto o MapReduce sempre começar a executar a primeira etapa para a segmentação. Parece que o garfo/junção é mais adequado para um nível de encadeamento dentro de uma JVM, enquanto o MapReduce é adequado para sistemas distribuídos.
4.2.Usando a interface
Recursiveaction: sem valor de retorno
RecursiveTask: há um valor de retorno
4.3. Exemplo simples
importar java.util.ArrayList; importar java.util.concurrent.forkjoinpool; importar java.util.concurrent.forkjointask; importar java.util.concurrent.recursivetask; classe pública CountTask estende o Recursivetask <long> {Privatic Intatholl; Private Long Start; Private Long End; public CountTask (Start Long, Long End) {super (); this.start = start; this.end = end; } @Override Protected long compute () {long sum = 0; boolean cancompute = (end - start) <limiar; if (cancompute) {for (long i = start; i <= end; i ++) {sum = sum+i; }} else {// dividido em 100 tarefas pequenas de longa etapa = (start + end)/100; ArrayList <CountTask> Subtarasks = new ArrayList <CountTask> (); long POS = Start; for (int i = 0; i <100; i ++) {long lastone = pos+etapa; if (lastone> end) {lastone = end; } CountTask SubTask = new CountTask (POS, Lastone); pos + = etapa + 1; subtarasks.add (subtarefa); SubTask.fork (); // Push subtarefas para o pool de threads} para (countTask t: Subtaras) {sum += t.join (); // aguardando todas as letases final}} retornar a soma; } public static void main (string [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); Tarefa de countTask = new CountTask (0, 200000l); Forkjointask <long> resultado = forkjoinpool.submit (tarefa); tente {long res = resultado.get (); System.out.println ("sum =" + res); } catch (Exceção e) {// TODO: lidar com a exceção e.printStackTrace (); }}} O exemplo acima descreve uma tarefa de resumir. Divida as tarefas acumuladas em 100 tarefas, cada tarefa executa apenas uma soma dos números e, após a junção final, a soma calculada por cada tarefa é acumulada.
4.4. Elementos de implementação
4.4.1.WorkQueue e CTL
Cada tópico terá uma fila de trabalho
Classe final estática de trabalho
Na fila de trabalho, haverá uma série de campos que gerenciam threads.
VOLATILE INT EventCount; // contagem de inativação codificada; <0 se inativo
int nextwait; // Registro codificado do próximo garçom de evento
int nearrows; // Número de aços
int dica; // Dica de índice de aço
Poolindex curto; // Índice desta fila na piscina
Modo curto final; // 0: LIFO,> 0: FIFO, <0: compartilhado
volátil int qlock; // 1: bloqueado, -1: termine; else 0
Base volátil int; // Índice do próximo slot para pesquisa
int top; // índice do próximo slot para push
Array de Forkjointask <?> []; // Os elementos (inicialmente não alocados)
Pool Final Forkjoinpool; // o pool contendo (pode ser nulo)
Final ForkJoinworkerThread Proprietário; // possuindo tópico ou nulo se compartilhado
Parker de thread volátil; // == proprietário durante a chamada para estacionar; else null
Volátil Forkjointask <?> CurrentJoin; // tarefa sendo unida no AwaitJoin
Forkjointask <?> CurrentSteal; // tarefa atual não local sendo executada
Deve -se notar aqui que há uma grande diferença entre JDK7 e JDK8 na implementação do Forkjoin. O que estamos apresentando aqui é do JDK8. No pool de threads, às vezes nem todos os threads estão executando, alguns threads serão suspensos e esses threads suspensos serão armazenados em uma pilha. É representado internamente por uma lista vinculada.
NextWait apontará para o próximo tópico de espera.
O índice do índice do subscrito no pool de threads PoolIndex.
EventCount Quando inicializado, o EventCount está relacionado ao PoolIndex. Um total de 32 bits, o primeiro bit indica se ele está ativado e 15 bits indica o número de vezes que foi suspenso
EventCount, o restante representa PoolIndex. Use um campo para representar vários significados.
O WorkCeUe WorkQueue é representado pela Array Forkjointask <?> []. Top e base representam as duas extremidades da fila, e os dados estão entre esses dois.
Mantenha CTL (tipo longo de 64 bits) em Forkjoinpool
CTL volátil longo;
* Campo CTL é um pouco embalado com:
* AC: Número de trabalhadores ativos menos paralelismo alvo (16 bits)
* TC: Número de trabalhadores totais menos paralelismo -alvo (16 bits)
* ST: Verdadeiro se o pool for encerrado (1 bit)
* EC: A contagem de espera do tópico de espera superior (15 bits)
* ID: PoolIndex do topo da pilha de garçons de Treiber (16 bits)
CA representa a contagem de roscas ativas menos o grau de paralelismo (provavelmente o número de CPUs)
TC significa o número total de fios menos paralelismo
ST indica se o próprio pool de threads está ativado
A CE representa o número de fios suspensos no tempo de espera superior
ID indica o poolindex esperando por thread no topo
É óbvio que o ST+EC+ID é o que acabamos de chamar de EventCount.
Então, por que você precisa sintetizar uma variável com 5 variáveis? De fato, a capacidade ocupa o mesmo com 5 variáveis.
A legibilidade do uso de um código variável será muito pior.
Então, por que usar uma variável? De fato, essa é a coisa mais inteligente, porque essas 5 variáveis são um todo. Na multi-threading, se 5 variáveis forem usadas, ao modificar uma das variáveis, como garantir a integridade das 5 variáveis. Em seguida, o uso de uma variável resolverá esse problema. Se resolvido com bloqueios, o desempenho será degradado.
O uso de uma variável garante a consistência e a atomicidade dos dados.
As alterações no Squadron CTL do Forkjoin são todas as operações do CAS. Como mencionado na série anterior de artigos, o CAS é uma operação sem bloqueio e tem um bom desempenho.
Como as operações do CAS podem segmentar apenas uma variável, esse design é ideal.
4.4.2. Roubo de trabalho
Em seguida, apresentaremos o fluxo de trabalho de todo o pool de threads.
Cada thread chama Runworker
Final Void Runworker (WorkQueue W) {W.GrowArray (); // aloca fila para (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorShift}} A função Scan () é digitalizar que as tarefas sejam realizadas.
R é um número relativamente aleatório.
Scan private final int (WorkQueue W, int r) {WorkQueue [] WS; int m; longa c = ctl; // Para consistência, verifique se ((ws = workcaues)! = null && (m = ws.length - 1)> = 0 && w! = null) {for (int j = m + m + 1, ec = w.EventCount ;;) {WorkQueue q; int b, e; Forkjointask <?> [] A; Forkjointask <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = ((A.Length - 1) & b) << Ashift) + abase; if ((t = ((forkjointask <?>) u.getObjectVolatile (a, i))! caso contrário, if (q.base == b && u.comparandswapobject (a, i, t, null)) {u.putOrderEdInt (q, qbase, b + 1); if ((b + 1) - q.top <0) sinalizador (ws, q); W.Runtask (T); } } quebrar; } else if (--j <0) {if ((ec | (e = (int) c)) <0) // inativo ou rescisão de retorno aguardar (w, c, ec); caso contrário, if (ctl == c) {// tente inativar e enquadrar NC longo = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.Nextwait = e; w.EventCount = EC | Int_sign; if (! u.compareandswaplong (this, ctl, c, nc)) w.EventCount = ec; // de volta} quebra; }}} retornar 0; } Vamos dar uma olhada no método de varredura. Um parâmetro de varredura é a estação de trabalho. Como mencionado acima, cada thread terá uma câmara de trabalho, e a cestidão de vários threads será salva em cacas de trabalho. R é um número aleatório. Use R para encontrar uma câmara de trabalho e fazer com que as tarefas sejam realizadas em trabalho.
Em seguida, através da base de trabalho, obtenha o deslocamento da base.
B = Q.Base
..
Long i = (((a.Length - 1) e b) << Ashift) + abase;
..
Em seguida, obtenha a última tarefa através do deslocamento e execute esta tarefa
t = ((forkjointask <?>) u.getObjectVolatile (a, i))
..
W.Runtask (T);
..
Através dessa análise aproximada, descobrimos que, depois que o thread atual chama o método de varredura, ele não executará as tarefas na câmara de trabalho atual, mas obterá outras tarefas de trabalho por meio de um número aleatório r. Este é um dos principais mecanismos de Forkjoinpool.
O encadeamento atual não apenas se concentrará em suas próprias tarefas, mas priorizará outras tarefas. Isso impede que a fome aconteça. Isso impede que alguns threads sejam incapazes de concluir tarefas no tempo devido a presos ou outros motivos, ou um thread tem uma grande quantidade de tarefas, mas outros threads não têm nada a fazer.
Então vamos dar uma olhada no método Rundask
Final Void Runtask (ForkJointask <?> Task) {if ((currentSteal = Task)! = NULL) {forkJoinworkerThread Thread; task.doexec (); Forkjointask <?> [] A = Array; int md = modo; ++ nsteals; CurrentSteal = NULL; if (md! = 0) Pollandexecall (); else if (a! = null) {int s, m = a.Length - 1; Forkjointask <?> T; while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.GetAndSetObject (a, ((m & s) << Ashift) + abase, null))! = null) {top = s; t.doexec (); }} if ((thread = proprietário)! = null) // Não é necessário fazer em finalmente cláusulas thread.aftertopleSexec (); }}Há um nome interessante: CurrentSteal, a tarefa roubada é realmente o que eu acabei de explicar.
task.doexec ();
Esta tarefa será concluída.
Depois de concluir as tarefas de outras pessoas, você concluirá suas próprias tarefas.
Obtenha a primeira tarefa obtendo o topo
while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.GetAndSetObject (a, ((m & s) << Ashift) + abase, null))! = null) {top = s; t.doexec ();}Em seguida, use um gráfico para resumir o processo do pool de threads agora.
Por exemplo, existem dois threads T1 e T2. O T1 obterá a última tarefa de T2 através da base de T2 (é claro, é na verdade a última tarefa de um thread através de um número aleatório R), e o T1 também executará sua primeira tarefa por meio de seu próprio topo. Pelo contrário, T2 fará o mesmo.
As tarefas que você executa para outros threads começam da base e as tarefas que você executa para começar de cima. Isso reduz o conflito
Se nenhuma outra tarefa for encontrada
caso contrário, if (--j <0) {if ((ec | (e = (int) c)) <0) // inativo ou rescisão de retorno aguardo (w, c, ec); caso contrário, if (ctl == c) {// tente inativar e enquadrar NC longo = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.Nextwait = e; w.EventCount = EC | Int_sign; if (! u.compareandswaplong (this, ctl, c, nc)) w.EventCount = ec; // de volta} quebra; } Em seguida, primeiro, o valor do CTL será alterado por meio de uma série de execuções, o NC será obtido e, em seguida, o novo valor será atribuído ao CAS. Em seguida, ligue para Waitwork () para entrar no estado de espera (chamado Método do Parque Unsefe mencionado na série anterior de artigos).
O que precisamos explicar aqui é alterar o valor da CTL. Aqui, primeiro, o AC -1 no CTL e o CA ocupa os 16 principais bits de CTL, portanto não pode ser diretamente -1, mas, em vez disso, alcança o efeito de criar os 16 melhores bits de CTL -1 através do AC_unit (0x100000000000000) dos primeiros 16 bits de CTL.
Como mencionado anteriormente, o EventCount salva o PoolIndex e, através do PoolIndex e NextWait em WorkQueue, você pode atravessar todos os threads de espera.