2. Introdução
A tecnologia multithreading resolve principalmente o problema da execução de vários threads em uma unidade de processador. Pode reduzir significativamente o tempo ocioso da unidade do processador e aumentar a capacidade de rendimento da unidade do processador. No entanto, a sobrecarga da criação frequente de roscas é muito grande. Portanto, como reduzir essa parte da sobrecarga, você precisa considerar usar um pool de threads. Um pool de threads é um contêiner de rosca, que executa apenas um número nominal de roscas por vez. O pool de threads é usado para gerenciar esse número nominal de threads.
3. Diagrama de estrutura de classe envolvendo pool de threads
Entre eles, o principal para usarmos é a classe ThreadpoolExecutor.
4. Como criar um pool de threads
Geralmente temos os seguintes métodos para criar pools de threads:
1. Use a classe de fábrica de executores
Os executores fornecem principalmente os seguintes métodos para criar pools de threads:
Vamos dar uma olhada no exemplo de uso abaixo:
1) newfixedthreadpool (pool de threads fixo)
classe pública FILLTHREADPOOL {public static void main (string [] args) {ExecutSorService pool = executores.newfixedthreadpool (5); // Crie um pool de threads com um tamanho fixo de 5 para (int i = 0; i <10; i ++) {pool.submit (novo mythread ()); } pool.shutdown (); }} classe pública mythread estende thread {@Override public void run () {System.out.println (thread.currentThread (). getName () + "execução ..."); }}Os resultados dos testes são os seguintes:
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-2 está executando. . .
Pool-1-Thread-3 está executando. . .
Pool-1-Thread-2 está executando. . .
Pool-1-Thread-3 está executando. . .
Pool-1-Thread-2 está executando. . .
Pool-1-Thread-2 está executando. . .
Pool-1-Thread-3 está executando. . .
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-4 está executando. . .
Pool de threads de tamanho fixo: Crie um encadeamento toda vez que uma tarefa for enviada, até que o encadeamento atinja o tamanho máximo do pool de threads. O tamanho do pool de encadeamentos permanecerá inalterado quando atingir seu valor máximo. Se um thread terminar devido a uma exceção de execução, o pool de threads adicionará um novo thread.
2) NewsingleThreadExecutor (pool de threads único)
classe pública singlethreadpool {public static void main (string [] args) {executorService pool = executores.newsinglethreadExecutor (); // Crie um único pool de threads para (int i = 0; i <100; i ++) {pool.subMit (new mythread ()); } pool.shutdown (); }}Os resultados dos testes são os seguintes:
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-1 está executando. . .
Pool de threads de thread único: este pool de threads tem apenas um thread funcionando, o que significa que uma execução serial de thread única de todas as tarefas. Se esse thread exclusivo terminar devido à exceção, haverá um novo thread para substituí -lo. Este pool de threads garante que a ordem de execução de todas as tarefas seja executada na ordem do envio da tarefa.
3) NewscheduledThreadpool
classe pública ScheduledThreadpool {public static void main (string [] args) {scheduledExecutorService pool = executores.newscheduledThreadpool (6); for (int i = 0; i <10000; i ++) {pool.submit (new mythread ()); } pool.schedule (new mythread (), 1000, timeUnit.millisEconds); Pool.Schedule (New Mythread (), 1000, TimeUnit.MillisEconds); pool.shutdown (); }}Os resultados dos testes são os seguintes:
Pool-1-Thread-1 está executando. . .
Pool-1-Thread-6 está executando. . .
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-4 está executando. . .
Pool-1-Thread-2 está executando. . .
Pool-1-Thread-3 está executando. . .
Pool-1-Thread-4 está executando. . .
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-6 está executando. . .
Pool-1-Thread-1 está executando. . .
……………………………………………………………………………………………………………………………………………………………… ………………………………………………………………………………………………………………………………………………………………
Pool-1-Thread-4 está executando. . .
Pool-1-Thread-1 está executando. . .
Os dois últimos threads do resultado do teste só começam a executar após o atraso de 1s. Este pool de threads suporta a exigência de tempo e execução periódica de tarefas
4) NewcachedThreadpool (piscina de threads em cache)
classe pública CACHEDTHREADPOOL {public static void main (string [] args) {executService pool = executores.newcachedthreadpool (); for (int i = 0; i <100; i ++) {pool.submit (new mythread ()); } pool.shutdown (); }}Os resultados dos testes são os seguintes:
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-7 está executando. . .
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-16 está em execução. . .
Pool-1-Thread-17 está executando. . .
Pool-1-Thread-16 está em execução. . .
Pool-1-Thread-5 está sendo executado. . .
Pool-1-Thread-7 está executando. . .
Pool-1-Thread-16 está em execução. . .
Pool-1-Thread-18 está executando. . .
Pool-1-Thread-10 está executando. . .
Pool de threads em cache: se o tamanho do pool de threads exceder o encadeamento necessário para processar a tarefa, alguns threads inativos (nenhuma execução de tarefas em 60 segundos) serão reciclados. Quando o número de tarefas aumenta, esse pool de threads pode adicionar de forma inteligente novos threads para lidar com a tarefa. Este pool de threads não limita o tamanho do pool de threads, que depende inteiramente do tamanho máximo de encadeamento que o sistema operacional (ou JVM) pode criar.
O funcionário sugere que os programadores usam os métodos de fábrica de executores mais convenientes. Esses pools de threads são predefinidos por configuração padrão para a maioria dos cenários de uso.
2. Herite a classe ThreadpoolExector e copie o método do construtor da classe pai.
Antes de apresentar esse método, vamos analisar os códigos subjacentes anteriores para criar pools de threads?
public class executores {public static executorService newfixedthreadpool (int nthreads) {return new threadpoolExecutor (NTHREADS, NTHREADS, 0L, timeUnit.millisEconds, novo LinkedBlockQueue <nnable> ()); } public Static ExecorService NewsingLethReadExecutor () {Retorne novo FinalizedElegateExecutorService (novo ThreadPoolExecutor (1, 1, 0L, timeUnit.millisEconds, novo LinkedBlockingQueue <nnable> ())); }}A partir do código subjacente da classe de fábrica de executores, podemos ver que os métodos fornecidos pela classe de fábrica para criar pools de threads são realmente implementados pela construção da ThreadPoolExecutor. O código do método do construtor ThreadpoolExecutor é o seguinte:
public ThreadPoolExecutor (int CorePoolSize, Int MaximumPoolSize, Longo KeepaliveTime, Unidade TimeUnit, BlockingQueue <uncrnable> WorkQueUe, ThreadFactoryFactory, rejeição rejeitada Manipulador de Manipulador) {if (CorePoolSize <0 MaximumpIlize <= 0 Maximumpool) { IlegalargumentException (); if (workQueue == null || threadFactory == NULL || Handler == NULL) lança novo NullPointerException (); this.corePoolSize = CorePoolSize; this.MaximumPoolSize = MaximumPoolSize; this.WorkQueue = WorkQueue; this.KeepaliveTime = Unit.toNanos (KeepaliveTime); this.ThreadFactory = ThreadFactory; this.Handler = manipulador; }Então, vamos falar sobre o método do construtor ThreadPoolExector. Neste método de construção, existem principalmente os seguintes parâmetros:
CorePoolSize-o número de threads salvos na piscina, incluindo threads gratuitos.
MaximumPoolSize - o número máximo de threads permitidos na piscina.
KeepAliveTime-Quando o número de threads é maior que o CorePoolSize, este é o tempo mais longo para o encadeamento ocioso aguardar uma nova tarefa.
Unidade- Unidade de tempo do parâmetro KeepAliveTime.
Trabalho-a fila usada para manter as tarefas antes da execução. Esta fila mantém apenas tarefas executáveis enviadas pelo método Execute.
ThreadFactory-a fábrica usada pelos executores para criar novos threads.
Manipulador-o manipulador usado quando a execução é bloqueada devido ao escopo da rosca e à capacidade da fila.
Em seguida, vamos falar sobre o relacionamento entre esses parâmetros. Quando o pool de threads é criado, não há threads no pool de threads (observe que não é que um certo número de threads seja criado assim que o pool de threads for criado). Quando o método Execute () é chamado para adicionar uma tarefa, o pool de threads fará o seguinte julgamento:
1) Se o número de threads atualmente em execução for menor que o CorePoolSize, crie um novo thread imediatamente para executar esta tarefa.
2) Se o número de threads atualmente em execução for maior ou igual ao CorePoolSize, essa tarefa será colocada na fila.
3) Se a fila do pool de threads estiver cheia, mas o número de threads em execução for menor que o MaximumPoolSize, um novo thread ainda será criado para executar essa tarefa.
4) Se a fila estiver cheia e o número de threads atualmente em execução for maior ou igual ao MaximumPoolSize, o pool de threads lidará com a tarefa atual com base na política de rejeição.
5) Quando uma tarefa é executada, o thread retira a próxima tarefa da fila para executar. Se não houver tarefa a ser executada na fila, o thread ficará ocioso. O tempo de sobrevivência do KeepAliveTime exceder, o fio será reciclado pelo pool de threads (Nota: os threads de reciclagem estarão condicionais. Se o número de threads em execução atualmente for maior que o CorePoolSize, o fio será destruído. Se ele não for mantido no núcleo do núcleo, o número não será destruído. Por que não é que o fio seja reciclado assim que estiver ocioso, mas que precisa esperar até que exceda o KeepAliveTtime antes que o tópico seja reciclado? O motivo é muito simples: porque a criação e a destruição de fios consomem muito, e não pode ser criada e destruída com frequência. Depois de exceder o KeepAliveTime, verifica -se que esse tópico realmente não é usado e será destruído. Nesse caso, a unidade representa a unidade de tempo do KeepaliveTime, e a definição de unidade é a seguinte:
public enum timeUnit {nanossegundos {// KeepAliveTime em nanossegundos}, microssegundos {// KeepAliveTime em microssegundos}, milissegundos {// miniMeTime em MillisEconds}, segundos {// não KeepaliveTime em horas}, dias {// KeepAliveTime nos dias}; Vamos analisar o código -fonte abaixo. Para as situações acima, os códigos de origem envolvidos principalmente são os seguintes:
Private Boolean AddifunderCorePoolSize (Runnable FirstTask) {Thread t = null; Final Reentrantlock mainLock = this.mainlock; mainlock.lock (); tente {if (poolsize <corePoolSize && runState == Running) t = addThread (FirstTask); } finalmente {mainlock.unlock (); } if (t == null) retorna false; t.start (); retornar true; } De fato, esse código é muito simples. Ele descreve principalmente que, se o pool de encadeamentos atual for menor que o CorePoolSize, um novo thread será criado para lidar com a tarefa.
Private Boolean AddifunderMaximumPoolSize (Runnable FirstTask) {Thread t = null; Final Reentrantlock mainLock = this.mainlock; mainlock.lock (); tente {if (poolsize <maximumPoolSize && runState == Running) t = addThread (FirstTask); } finalmente {mainlock.unlock (); } if (t == null) retorna false; t.start (); retornar true; }O código acima descreve que, se o número de pools de threads atuais for menor que o MaximumPoolSize, um thread será criado para executar a tarefa.
5. A fila do pool de threads
Existem 3 tipos de filas de pool de threads:
Comprometimento direto: a opção padrão para filas de trabalho é o síncrono, que envia tarefas diretamente aos threads sem mantê -los. Aqui, se não houver nenhum thread disponível para executar a tarefa imediatamente, tentando fila a tarefa falhará, construindo assim um novo thread. Essa política evita bloqueios ao manusear conjuntos de solicitações que podem ter dependências internas. Os envios diretos geralmente exigem o MaximumPoolsizes ilimitados para evitar rejeitar tarefas recentemente enviadas. Essa estratégia permite que os threads ilimitados tenham a possibilidade de crescimento quando os comandos chegam continuamente com uma média que a fila pode suportar.
Fila ilimitada: O uso de uma fila ilimitada (por exemplo, LinkedBlockingQueue sem capacidade predefinida) fará com que novas tarefas esperem na fila quando todos os threads do CorePoolSize estiverem ocupados. Dessa forma, o encadeamento criado não excederá o CorePoolSize. (O valor do MaximumPoolSize é inválido.) Quando cada tarefa é completamente independente de outras tarefas, ou seja, a execução da tarefa não se afeta, é adequada para filas ilimitadas; Por exemplo, em um servidor de página da web. Essa fila pode ser usada para lidar com solicitações de explosão transitórias, e essa estratégia permite que os threads ilimitados tenham a possibilidade de crescimento quando os comandos chegarem continuamente excedendo a média que a fila pode lidar.
Fila limitada: ao usar o Maximumpoolsizes limitados, as filas limitadas (como o ArrayBlockockQueue) ajudam a evitar a exaustão de recursos, mas pode ser difícil de ajustar e controlar. O tamanho da fila e o tamanho máximo da piscina podem precisar trocar um ao outro: usar filas grandes e pequenos piscinas pode minimizar o uso da CPU, os recursos do sistema operacional e a sobrecarga de comutação de contexto, mas podem resultar em redução manual na taxa de transferência. Se as tarefas estiverem frequentemente bloqueadas (por exemplo, se forem limites de E/S), o sistema poderá agendar o tempo para mais threads do que você permite. O uso de pequenas filas geralmente requer um tamanho de piscina maior e possui um uso mais alto da CPU, mas pode encontrar uma sobrecarga de agendamento inaceitável, o que também pode reduzir a taxa de transferência.
Vamos falar sobre a fila do pool de threads abaixo, o diagrama de estrutura de classes é o seguinte:
1) Síncrono
A fila corresponde ao envio direto mencionado acima. Primeiro de tudo, o síncrono é ilimitado, o que significa que sua capacidade de armazenar números é ilimitada. No entanto, devido às características da própria fila, após a adição de elementos, você deve esperar que outros threads os levassem antes que você possa continuar a adicioná -los.
2) LinkedBlockingQueue
A fila corresponde à fila ilimitada acima.
3) ArrayblockingQueue
A fila corresponde à fila limitada acima. ArrayblockingQueue tem os três construtores a seguir:
public ArrayBlockingQueue (intacabilidade int) {this (capacidade, false); } public ArrayBlockingQueue (INT Capacidade, Feira Booleana) {if (Capacidade <= 0) Lança novo ilegalArgumentException (); this.items = (e []) novo objeto [capacidade]; bloqueio = novo reentrantlock (justo); NotEmpty = Lock.NewCondition (); notfull = Lock.NewCondition (); } public ArrayBlockingQueue (INT Capacidade, Feira Booleana, Coleção <? Extende e> C) {this (Capacidade, Fair); if (capacidade <c.size ()) lançar novo ilegalargumentException (); for (iterator <? estende e> it = c.iterator (); it.hasnext ();) add (it.next ()); }Vamos nos concentrar nesta feira. A Fair representa a estratégia de competição dos tópicos de acesso à fila. Quando é verdade, as filas de inserção de tarefas cumprem as regras da FIFO. Se falso, você pode "cortar a fila". Por exemplo, se houver muitas tarefas na fila agora, um thread concluiu a tarefa e uma nova tarefa ocorre. Se for falso, essa tarefa não precisa ser filada na fila. Você pode cortar diretamente a fila e executá -la. Como mostrado na figura abaixo:
6. Estratégia de execução de rejeição do pool de threads
Quando o número de threads atinge o valor máximo, as tarefas ainda estão chegando neste momento e, neste momento, tenho que me recusar a aceitar as tarefas.
O ThreadpoolExector permite a personalização das políticas de execução ao adicionar uma tarefa falha. Você pode chamar o método setRejecteDexecutionHandler () do pool de threads e substituir a política existente pelo objeto RejejeTexecutionHandler personalizado. A estratégia de processamento padrão fornecida pelo ThreadPoolExector é descartar e lançar diretamente informações de exceção ao mesmo tempo. ThreadpoolExecutor fornece 4 políticas existentes, a saber::
ThreadpoolExecutor.abortpolicy: indica que a tarefa é rejeitada e uma exceção é lançada. O código -fonte é o seguinte:
Classe estática pública ABORTPOLICY IMPLEMENTES REJENDEDEXECUTIONHANDLER { /*** Cria um <tt> abortpolicy </tt>. * / public abortpolicy () {} / *** sempre lança rejeitEdExecutionException. * @param r A tarefa executável solicitada a ser executada * @param e o executor tentando executar esta tarefa * @THOWSE REJENTEDEXECUTIONECTIONSCECTION Sempre. */ public void rejeitEdExecution (runnable r, threadpoolExecutor e) {tiro novo rejejetexecutionException (); // Exceção jogando}}ThreadPoolExecutor.DiscardPolicy: Isso significa que a tarefa é rejeitada, mas nenhuma ação é realizada. O código -fonte é o seguinte:
Classe estática pública DispardPolicy implementos rejeitedExecutionHandler { /*** Cria um <tt> disardpolicy </tt>. * / public DispardPolicy () {} / *** não faz nada, que tem o efeito de descartar a tarefa r. * @param r A tarefa executável solicitada a ser executada * @param e o executor tentando executar esta tarefa */ public void rejejeteExecution (runnable r, threadpoolExecutor e) {} // rejeitar diretamente, mas não faça nada}ThreadPoolExecutor.CallERRUNSPOLICY: Indica que a tarefa é rejeitada e a tarefa é executada diretamente no thread do chamador. O código -fonte é o seguinte:
Classe estática pública Os implementos de CallErNSPolicy rejejetexecutionHandler { /*** cria um <tt> callerRunSpolicy </tt>. * / public CallerNSPolicy () {} / ** * Executa a tarefa r no tópico do chamador, a menos que o executor * tenha sido desligado; nesse caso, a tarefa é descartada. * @param r A tarefa executável solicitada a ser executada * @param e o executor tentando executar esta tarefa */ public void rejejeteDexecution (runnable r, threadpoolExecutor e) {if (! E.ISSHUTDOWN ()) {r.run (); // executar tarefas diretamente}}}ThreadPoolExecutor.DISCARDOLDestPolicy: significa que a primeira tarefa na fila de tarefas é descartada primeiro e, em seguida, a tarefa é adicionada à fila. O código -fonte é o seguinte:
Classe estática pública DispardoldestPolicy implementos rejeitEdExecutionHandler { /*** Cria um <tt> descartedestpolicy </tt> para o executor especificado. */ public DispardoldestPolicy () {} public void rejejeteExecution (runnable r, threadpoolExecutor e) {if (! eSishutdown ()) {e.getQueue (). poll (); // descarte a primeira tarefa na fila e.execute (r); // execute uma nova tarefa}}}Quando as tarefas chegarem continuamente, uma tarefa será pesquisada na fila e, em seguida, uma nova tarefa será executada.
Resumir
O exposto acima é uma explicação detalhada do próprio pool de threads do JDK introduzido pelo editor. Espero que seja útil para todos. Se você tiver alguma dúvida, deixe -me uma mensagem e o editor responderá a todos a tempo. Muito obrigado pelo seu apoio ao site wulin.com!