1. Utilisation de base des pools de threads
1.1. Pourquoi avez-vous besoin d'une piscine de fil?
Dans les activités quotidiennes, si nous voulons utiliser le multi-threading, nous créerons des threads avant le début de l'entreprise et détruire des threads après la fin de l'entreprise. Cependant, pour les affaires, la création et la destruction des fils n'ont rien à voir avec l'entreprise elle-même et ne se soucie que des tâches effectuées par le fil. Par conséquent, j'espère utiliser autant de processeurs que possible pour effectuer des tâches, plutôt que pour créer et détruire des threads qui ne sont pas liés aux entreprises. Le pool de threads résout ce problème. La fonction du pool de threads est de réutiliser les threads.
1.2. Quel support JDK nous fournit-il
Les diagrammes de classe associés dans JDK sont illustrés dans la figure ci-dessus.
Plusieurs catégories spéciales à mentionner.
La classe callable est similaire à la classe gérée, mais la différence est que Calable a une valeur de retour.
ThreadPoolExecutor est une implémentation importante des pools de threads.
Les exécuteurs sont une classe d'usine.
1.3. Utilisation de pools de threads
1.3.1. Types de piscines de fil
Public Static ExecutorService newFixEdThreadpool (int nThreads) {return new ThreadPoolExecutor (nthreads, nthreads, 0l, timeunit.milliseconds, New LinkedBlockqueue <runnable> ());} public static exécutorvice New ThreadPoolExecutor (1, 1, 0l, timeunit.milliseconds, new LinkedBlockerQueue <Runnable> ()));} public static exécutorService newcachedThreadpool () {return whewpoolExecutor (0, Integer.max_value, 60L, TimeUnit.Secus, newschronouseue <runnable>);Du point de vue de la méthode, il est évident que FixedThreadpool, SingleThreAdExecutor et CachedThreadpool sont des instances différentes de ThreadPoolExecutor, mais les paramètres sont différents.
Public ThreadPoolExecutor (int corepoolSize, int maximumpoolSize, long keepalivetime, timeunit Unit, BlockerQueue <cunnable> workQueue) {this (CorepoolSize, maximumpoolSize, keepalivetime, unit, workqueue, exécutors.defaultthreadfactory (), defaulthandler);} Décrivons brièvement la signification des paramètres dans le constructeur ThreadPoolExecutor.
De cette façon, en regardant le FixedThereadpool mentionné ci-dessus, le nombre de cœurs et le nombre maximal de threads sont les mêmes, de sorte que les threads ne seront pas créés et détruits pendant le travail. Lorsque le nombre de tâches est grand et que les threads dans le pool de thread ne peuvent pas être satisfaits, la tâche sera enregistrée sur LinkedBlockingQueue, et la taille de LinkedBlockingQueue est Integer.max_value. Cela signifie que l'ajout continu des tâches fera consommer la mémoire de plus en plus.
CachedThreadpool est différent. Son numéro de thread de base est 0, le nombre maximum de stockage est entier.max_value, et sa file d'attente de blocage est synchronousqueue, qui est une file d'attente spéciale, et sa taille est 0. Étant donné que le nombre de threads centraux est 0, il est nécessaire d'ajouter la tâche à la synchrone. Cette file d'attente ne peut réussir que lorsqu'un thread en ajoute des données et qu'un autre thread en tire des données. L'ajout de données à cette file d'attente renverra un échec. Lorsque le retour échoue, le pool de threads commence à étendre le thread, c'est pourquoi le nombre de threads dans CachedThreadPool n'est pas fixe. Lorsque le fil n'est pas utilisé pendant les années 60, le fil est détruit.
1.4. Petits exemples d'utilisation de la piscine de fil
1.4.1. Pool de fil simple
Importer java.util.concurrent.execcutorService; Importer java.util.concurrent.execUtors; classe publique threadpooldo {public static class mytask implémente runnable {@override public void run () {System.out.printLn (System.CurrenttimeLis () + "Thread Id:" + threadSthersead (). essayez {thread.sleep (1000); } catch (exception e) {e.printStackTrace (); }}} public static void main (String [] args) {mytask mytask = new mytask (); ExecutorService es = exécutors.newFixEdThreadPool (5); for (int i = 0; i <10; i ++) {es.submit (mytask); }}} Étant donné que NewFixEdThreadpool (5) est utilisé, mais 10 threads sont démarrés, 5 sont exécutés à la fois, et il est évident que la réutilisation du thread est vue. ThreadID est répété, c'est-à-dire les 5 premières tâches et les 5 dernières tâches sont exécutées par le même lot de threads.
Ce qui est utilisé ici
es.submit (mytask);
Il existe également un moyen de soumettre:
ES.EXECUTE (MYTASK);
La différence est que SUMET renverra un futur objet, qui sera introduit plus tard.
1.4.2.ScheduledThreadpool
Importer java.util.concurrent.executors; importer java.util.concurrent.scheduleExecutorService; import java.util.concurrent.timeunit; public class threadpooldemo {public static main main (string [] args) {scheduledexETervice Ses = exécutors.NewScheduledTheReadPool (10); // Si la tâche précédente n'est pas terminée, le répartition ne démarre pas. SES.ScheduleWithfixedDelay (new Runnable () {@Override public void run () {try {Thread.Sleep (1000); System.out.println (System.Currenttimemilis () secondes, puis exécutez une fois toutes les 2 secondes dans un cycle}}Sortir:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Étant donné que l'exécution de la tâche prend 1 seconde, la planification des tâches doit attendre que la tâche précédente se termine. Autrement dit, toutes les 2 secondes ici signifie qu'une nouvelle tâche sera lancée 2 secondes après la fin de la tâche précédente.
2. Étendre et améliorer le pool de threads
2.1. Interface de rappel
Il existe des API de rappel dans le pool de threads pour nous fournir des opérations étendues.
ExecutorService es = new ThreadPoolExecutor (5, 5, 0l, timeunit.seconds, new LinkedBlockingQueue <Nuenable> ()) {@Override Protected void AVANTEXECUTE (Thread T, runnable r) {System.out.println ("se préparer à exécuter"); } @Override Protected void AfterExECUTE (Runnable R, Throwable T) {System.out.println ("EXÉCUTION EXCELLÉ"); } @Override Protected void terminé () {System.out.println ("Thread Pool Exit"); }};Nous pouvons implémenter les méthodes AVANTEXECUTE, AfterExecute et terminées de ThreadPoolExecutor pour implémenter la gestion des journaux ou d'autres opérations avant et après l'exécution du thread, sortie de pool de threads.
2.2. Stratégie de rejet
Parfois, les tâches sont très lourdes, ce qui entraîne trop de charge sur le système. Comme mentionné ci-dessus, lorsque le nombre de tâches augmente, toutes les tâches seront placées dans la file d'attente de blocage de FixedThreadpool, ce qui entraînera trop de consommation de mémoire et éventuellement un débordement de mémoire. De telles situations doivent être évitées. Ainsi, lorsque nous constatons que le nombre de threads dépasse le nombre maximal de threads, nous devons abandonner quelques tâches. Lors de la rejet, nous devons noter la tâche au lieu de le jeter directement.
Il y a un autre constructeur dans ThreadPoolExecutor.
public threadpoolExecutor (int corepoolSize, int maximumpoolSize, long keepalivetime, timeunit unité, blocingQueue <runnable> workqueue, threadfactory threadfactory, rejectEdExecutionHandler mandepre IllégalArgumentException (); if (workQueue == null || threadfactory == null || handler == null) lance un nouveau nullpointerException (); this.CorepoolSize = corePoolSize; this.MaxiMumpoolSize = maximumpoolSize; this.workQueue = workQueue; this.keepaliveTime = unit.Tonanos (keepalivetime); this.threadfactory = threadFactory; this.handler = handler; }
Nous présenterons ThreadFactory plus tard.
Le gestionnaire rejette la mise en œuvre de la politique, qui nous dira quoi faire si la tâche ne peut pas être exécutée.
Il y a un total des 4 stratégies ci-dessus.
Abortpolicy: si la tâche ne peut pas être acceptée, une exception est lancée.
CallErrunspolicy: Si la tâche ne peut pas être acceptée, laissez le fil d'appel terminer.
DiscardoldestPolicy: Si la tâche ne peut pas être acceptée, la tâche la plus ancienne sera jetée et maintenue par une file d'attente.
DiscardPolicy: Si la tâche ne peut pas être acceptée, la tâche sera rejetée.
EMECTROGROSSERVICE ES = NOUVEAU THREEPOOLEXECUTOR (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockerQueue <Runnable> (), New RejectEdexEcutionHandler () {@Override public void rejectEdExecution (Runnable R, ThreadPoolExecutor Exécutor) {system.out.println (R.Tostring () + "); }); Bien sûr, nous pouvons également mettre en œuvre l'interface RejectEdExecutionHandler nous-mêmes pour définir nous-mêmes la politique de rejet.
2.3. Personnaliser ThreadFactory
Je viens de voir que ThreadFactory peut être spécifié dans le constructeur de ThreadPoolExecutor.
Les threads du pool de threads sont tous créés par l'usine de threads, et nous pouvons personnaliser l'usine de threads.
Factory de thread par défaut:
Classe statique DefauthThreadFactory implémente ThreadFactory {private static final atomicInteger poolNumber = new atoMicInteger (1); Groupe final privé ThreadGroup; Final privé atomicInteger ThreadNumber = new AtomicInteger (1); chaîne finale privée namePrefix; DefaultThreadFactory () {SecurityManager S = System.getSecurityManager (); groupe = (s! = null)? s.getThreadGroup (): thread.currentThread (). getThreadGroup (); namePrefix = "pool-" + poolNumber.getAndIncrement () + "-thread-"; } public thread newthread (runnable r) {thread t = new Thread (groupe, r, namePrefix + threadNumber.getAndInCment (), 0); if (t.isdaemon ()) t.setDaemon (false); if (t.getPriority ()! = thread.norm_priority) t.setPriority (thread.norm_priority); retour t; }}3. Forkjoin
3.1. Pensées
C'est l'idée de diviser et de conquérir.
Fork / Join est similaire à l'algorithme MapReduce. La différence entre les deux est: la fourche / jointure est divisée en petites tâches uniquement lorsque cela est nécessaire, comme si la tâche est très grande, tandis que MapReduce commence toujours à effectuer la première étape pour la segmentation. Il semble que Fork / Join convient plus à un niveau de thread dans un JVM, tandis que MapReduce convient aux systèmes distribués.
4.2.usant l'interface
RécursiveAction: aucune valeur de retour
RecursiveTask: il y a une valeur de retour
4.3. Exemple simple
Importer java.util.arraylist; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkjointask; import java.util.concurrent.recursivetask; classe publique countask étend recursiveTask <long> {{private static inthershold = 10000; Démarrage long privé; Longue fin privée; public CountTask (Long Start, Long End) {Super (); this.start = start; this.end = end; } @Override Protected Long Compute () {long sum = 0; Booléen CanCompute = (end - démarrage) <Threshold; if (cancompute) {for (long i = start; i <= end; i ++) {sum = sum + i; }} else {// Split en 100 petites tâches longs step = (start + end) / 100; ArrayList <CountTask> sous-tasks = new ArrayList <CountTask> (); long pos = start; pour (int i = 0; i <100; i ++) {long lastOn = pos + étape; if (LastOne> end) {LastOne = end; } CountTask SubTask = new CountTask (pos, LastOne); pos + = étape + 1; sous-tasks.add (sous-tâche); subtask.fork (); // poussait les sous-tâches vers le pool de thread} pour (countTask t: sous-tâches) {sum + = t.join (); // attendant que toutes les sous-tâches se terminent}} de retour sum; } public static void main (String [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); Tâche countask = new CountTask (0, 200000l); FORKJOINTASK <long> result = forkjoinpool.submit (tâche); essayez {long res = result.get (); System.out.println ("sum =" + res); } catch (exception e) {// todo: gère l'exception e.printStackTrace (); }}} L'exemple ci-dessus décrit une tâche de résumé. Divisez les tâches accumulées en 100 tâches, chaque tâche effectue une somme des nombres, et après la jointure finale, la somme calculée par chaque tâche est ensuite accumulée.
4.4. Éléments de mise en œuvre
4.4.1.WorkQueue et CTL
Chaque fil aura une file d'attente de travail
Workqueue de classe finale statique
Dans la file d'attente de travail, il y aura une série de champs qui géreront les threads.
volatile int EventCount; // Nombre d'inactivation codé; <0 si inactif
int nextwait; // Enregistrement encodé du prochain événement serveur
Int Narrows; // Nombre d'aciers
dans l'indice; // indice d'acier
Short PoolIndex; // Index de cette file d'attente dans le pool
Mode court final; // 0: lifo,> 0: fifo, <0: partagé
INT QLOCK volatile; // 1: verrouillé, -1: terminer; d'autre 0
base INT volatile; // Index de la prochaine fente pour le sondage
INT TOP; // Index de la prochaine fente pour pousser
FORKJOINTASK <?> [] Array; // les éléments (initialement non alloués)
Pool Fin Forkjoinpool; // la piscine contenant (peut être nul)
Final ForkJoinworkerThread propriétaire; // posséder du fil ou null s'il est partagé
Fil volatil Parker; // == Propriétaire pendant l'appel à se garer; autrement nul
volatile forkjointask <?> CurrentJoin; // tâche se joint à Awaitjoin
FORKJOINTASK <?> CurrentSteal; // tâche non locale actuelle en cours d'exécution
Il convient de noter ici qu'il existe une grande différence entre JDK7 et JDK8 dans la mise en œuvre de Forkjoin. Ce que nous introduisons ici est de JDK8. Dans le pool de threads, parfois tous les threads ne s'exécutent pas, certains threads seront suspendus et ces threads suspendus seront stockés dans une pile. Il est représenté en interne par une liste liée.
Nextwait pointera vers le prochain fil d'attente.
L'indice d'index de l'indice dans le pool de threads PoolIndex.
EventCount Lorsqu'il est initialisé, EventCount est lié à PoolIndex. Un total de 32 bits, le premier bit indique s'il est activé et 15 bits indique le nombre de fois où il a été suspendu
EventCount, le reste représente PoolIndex. Utilisez un champ pour représenter plusieurs significations.
WorkQueue WorkQueue est représenté par ForkJointask <?> [] Array. Le haut et la base représentent les deux extrémités de la file d'attente, et les données se trouvent entre ces deux.
Maintenir CTL (type long 64 bits) dans Forkjoinpool
CTL long volatil;
* Field CTL est un long emballé avec:
* AC: nombre de travailleurs en cours d'exécution actifs moins parallélisme cible (16 bits)
* TC: Nombre de travailleurs totaux moins parallélisme cible (16 bits)
* ST: vrai si le pool est terminé (1 bit)
* EC: le nombre d'attente du fil d'attente supérieur (15 bits)
* ID: PoolIndex de la pile de serveurs en haut de Treiber (16 bits)
AC représente le nombre de threads actif moins le degré de parallélisme (probablement le nombre de processeurs)
TC signifie le nombre total de threads moins parallélisme
ST indique si le pool de threads lui-même est activé
EC représente le nombre de threads suspendus au temps d'attente supérieur
L'ID indique le poolindex en attente de fil en haut
Il est évident que ST + EC + ID est ce que nous venons d'appeler EventCount.
Alors pourquoi devez-vous synthétiser une variable avec 5 variables? En fait, la capacité occupe à peu près la même chose avec 5 variables.
La lisibilité de l'utilisation d'un code de variable sera bien pire.
Alors pourquoi utiliser une variable? En fait, c'est la chose la plus intelligente, car ces 5 variables sont un tout. Dans le multi-threading, si 5 variables sont utilisées, alors lors de la modification de l'une des variables, comment assurer l'intégrité des 5 variables. Ensuite, l'utilisation d'une variable résoudra ce problème. S'il est résolu avec des serrures, les performances seront dégradées.
L'utilisation d'une variable garantit la cohérence et l'atomicité des données.
Les modifications apportées à Forkjoin Squadron CTL sont toutes effectuées en utilisant les opérations CAS. Comme mentionné dans la série précédente d'articles, CAS est une opération sans serrure et a de bonnes performances.
Étant donné que les opérations CAS ne peuvent cibler qu'une seule variable, cette conception est optimale.
4.4.2. Vol de travail
Ensuite, nous présenterons le flux de travail de l'ensemble du pool de threads.
Chaque thread appelle Runworker
Final void Runworker (workQueue w) {w.growArray (); // allocation de la file d'attente pour (int r = w.hint; scan (w, r) == 0;) {r ^ = r << 13; r ^ = r >>> 17; r ^ = r << 5; // xorshift}} La fonction Scan () consiste à rechercher des tâches à effectuer.
R est un nombre relativement aléatoire.
Scan int privé final (workqueue w, int r) {workQueue [] ws; int m; long c = ctl; // Pour cohérence, vérifiez si ((ws = workQueues)! = null && (m = ws.length - 1)> = 0 && w! = null) {for (int j = m + m + 1, ec = w.eventCount ;;) {workQueue q; int b, e; Forkjointask <?> [] A; Forkjointask <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = ((a.Length - 1) & b) << ashift) + abase; if ((t = ((FORKJOINTASK <?>) U.GetObjectVolatile (a, i)))! = null) {if (ec <0) helprelease (c, ws, w, q, b); else if (q.base == b && u.compareAndWapObject (a, i, t, null)) {u.putOrderEdInt (q, qbase, b + 1); if ((b + 1) - q.top <0) Travail de signal (ws, q); w.runtask (t); } } casser; } else if (--j <0) {if ((ec | (e = (int) c)) <0) // inactif ou terminant un retour attendant (w, c, ec); else if (ctl == c) {// Essayez d'inactiver et d'obtenir un long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; w.eventcount = ec | Int_sign; if (! u.compareAndSwaplong (ceci, ctl, c, nc)) w.eventcount = ec; // Back Out} Break; }}} return 0; } Jetons un coup d'œil à la méthode de numérisation. Un paramètre de scan est Workqueue. Comme mentionné ci-dessus, chaque fil aura un travail de travail, et le travail de plusieurs threads sera enregistré dans Workquegues. R est un nombre aléatoire. Utilisez R pour trouver un Workqueue et avoir des tâches à effectuer dans Workqueue.
Ensuite, via la base WorkQueue, obtenez le décalage de la base.
b = Q.Base
..
long i = (((a.length - 1) & b) << ashift) + abase;
..
Ensuite, obtenez la dernière tâche à travers le décalage et exécutez cette tâche
t = ((forkjointask <?>) U.GetObjectVolatile (a, i))
..
w.runtask (t);
..
Grâce à cette analyse approximative, nous avons constaté qu'après que le thread actuel appelle la méthode de numérisation, il n'exécutera pas les tâches dans le travail actuel, mais obtiendra d'autres tâches de travail de travail via un nombre aléatoire r. C'est l'un des principaux mécanismes de Forkjoinpool.
Le thread actuel se concentrera non seulement sur ses propres tâches, mais donnera la priorité à d'autres tâches. Cela empêche la faim de se produire. Cela empêche certains fils d'être incapables de terminer les tâches à temps pour des raisons bloquées ou d'autres raisons, ou un fil a une grande quantité de tâches, mais d'autres fils n'ont rien à faire.
Alors jetons un coup d'œil à la méthode Runtask
Final void runTask (forkjointAsk <?> tâche) {if ((currentSteal = task)! = null) {FORKJoinworkerThread Thread; tâche.DoExec (); FORKJOINTASK <?> [] A = array; int md = mode; ++ nSteals; CurrentSteal = null; if (md! = 0) PollanDExECall (); else if (a! = null) {int s, m = a.length - 1; Forkjointask <?> T; while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandSetObject (a, ((m & s) << ashift) + abase, null)))! = null) {top = s; T.DoExec (); }} if ((thread = propriétaire)! = null) // pas besoin de faire dans la clause enfin thread.afterToplevelexec (); }}Il y a un nom intéressant: CurrentSteal, la tâche volée est en effet ce que je viens d'expliquer.
tâche.DoExec ();
Cette tâche sera terminée.
Après avoir accompli les tâches des autres, vous accomplirez vos propres tâches.
Obtenez la première tâche en obtenant le haut
while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getandSetObject (a, ((m & s) << ashift) + abase, null)))! = null) {top = s; t.DoExec ();}Ensuite, utilisez un graphique pour résumer le processus du pool de threads tout à l'heure.
Par exemple, il y a deux fils T1 et T2. T1 obtiendra la dernière tâche de T2 à travers la base de T2 (bien sûr, c'est en fait la dernière tâche d'un fil à travers un nombre aléatoire R), et T1 effectuera également sa première tâche à travers son propre haut. Au contraire, T2 fera de même.
Les tâches que vous effectuez pour les autres fils commencent à partir de la base, et les tâches que vous prenez pour vous-même commencent par le haut. Cela réduit le conflit
Si aucune autre tâche n'est trouvée
else if (--j <0) {if ((ec | (e = (int) c)) <0) // inactif ou terminant le rendement de retour (w, c, ec); else if (ctl == c) {// Essayez d'inactiver et d'obtenir un long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; w.eventcount = ec | Int_sign; if (! u.compareAndSwaplong (ceci, ctl, c, nc)) w.eventcount = ec; // Back Out} Break; } Ensuite, la valeur de CTL sera modifiée par une série de cycles, NC sera obtenu, puis la nouvelle valeur sera attribuée avec CAS. Ensuite, appelez Awaitwork () pour entrer dans l'état d'attente (appelé la méthode du parc Unsafe mentionné dans la série d'articles précédents).
Ce que nous devons expliquer ici est de modifier la valeur CTL. Ici, d'abord, AC-1 dans CTL, et AC occupe les 16 meilleurs bits de CTL, il ne peut donc pas être directement -1, mais obtient plutôt l'effet de la fabrication des 16 bits supérieurs de CTL -1 à AC_Unit (0x100000000000000) des 16 premiers bits de CTL.
Comme mentionné précédemment, l'EventCount enregistre le PoolIndex, et à travers le PoolIndex et Nextwait dans WorkQueue, vous pouvez traverser tous les fils d'attente.