1. grundlegende Verwendung von Threadpools
1.1. Warum brauchst du einen Thread -Pool?
Wenn wir im täglichen Geschäft Multi-Threading verwenden möchten, werden wir Threads erstellen, bevor das Geschäft startet, und Threads zerstören, nachdem das Geschäft beendet ist. Für das Geschäft haben die Schaffung und Zerstörung von Fäden jedoch nichts mit dem Geschäft selbst zu tun, und kümmert sich nur um die vom Thread ausgeführten Aufgaben. Daher hoffe ich, so viele CPUs wie möglich zu verwenden, um Aufgaben auszuführen, anstatt Threads zu erstellen und zu zerstören, die nicht mit dem Geschäft zusammenhängen. Der Fadenpool löst dieses Problem. Die Funktion des Threadpools besteht darin, Threads wiederzuverwenden.
1.2. Welche Unterstützung bietet JDK für uns
Die zugehörigen Klassendiagramme in JDK sind in der obigen Abbildung gezeigt.
Mehrere besondere Kategorien zu erwähnen.
Die Callable -Klasse ähnelt der laufbaren Klasse, aber der Unterschied besteht darin, dass Callable einen Rückgabewert hat.
ThreadPoolexecutor ist eine wichtige Implementierung von Threadpools.
Executors sind eine Fabrikklasse.
1.3. Verwendung von Threadpools
1.3.1. Arten von Threadpools
public static ExecutorService Newfixed threadpool (int nthreads) {return neuer threadpoolexecutor (Nthreads, Nthreads, 0L, TimeUnit.Millisekunden, neu LinkedBlocking BlockingQueue <Runnable> ()); Threadpoolexecutor (1, 1, 0l, TimeUnit.Milliseconds, New Linked BlockingQueue <Runnable> ());} öffentliche statische Ausführende.Aus methodischer Sicht ist es offensichtlich, dass es sich um Bestandthreadpool, SinglethreadExecutor und CachedThreadpool handelt es sich um unterschiedliche Instanzen von Threadpoolexecutor, aber die Parameter sind unterschiedlich.
public threadpoolexecutor (int corepoolsize, int maximumpoolsize, long keepalivetime, Zeiteinheit, Blockingqueue <Runnable> WorkQueue) {this (corepoolSize, maximumpoolsize, keepalivetime, uneinheit, arbeitsqueue, ausführende. Beschreiben wir kurz die Bedeutung von Parametern im ThreadPoolexecutor -Konstruktor.
Auf diese Weise ist die Anzahl der Kerne und die maximale Anzahl von Threads gleich, so dass die Threads während der Arbeit nicht erstellt und zerstört werden. Wenn die Anzahl der Aufgaben groß ist und die Threads im Thread -Pool nicht erfüllt werden können, wird die Aufgabe in LinkedBlockingCocingQueue gespeichert, und die Größe des Linked -Blocks ist integer.max_value. Dies bedeutet, dass die kontinuierliche Zugabe von Aufgaben dazu führt, dass der Speicher immer mehr verbraucht.
CachedThreadpool ist anders. Die Kern -Thread -Nummer ist 0, die maximale Anzahl von Speicher ist ganzzahlig.max_value, und die blockierende Warteschlange ist Synchronousqueue, eine spezielle Warteschlange, und ihre Größe beträgt 0. Da die Anzahl der Kernfäden 0 ist, ist es erforderlich, die Aufgabe zur Synchronousqueue hinzuzufügen. Diese Warteschlange kann nur dann erfolgreich sein, wenn ein Thread Daten daraus hinzufügt und ein anderer Thread Daten davon abholt. Das Hinzufügen von Daten allein in diese Warteschlange wird ein Fehler zurückgegeben. Wenn die Rückgabe fehlschlägt, beginnt der Thread -Pool, den Thread zu erweitern, weshalb die Anzahl der Threads in CachedThreadpool nicht behoben ist. Wenn der Faden für die 60er Jahre nicht verwendet wird, wird der Faden zerstört.
1.4. Kleine Beispiele für die Verwendung von Fadenbecken
1.4.1. Einfacher Fadenpool
Import Java.util.concurrent.executorService; Import Java.util.concurrent.executors; öffentliche Klasse ThreadPooldemo {public statische Klasse Mytask -Implements Runnable {@Override public void run () {System.out.println (System.CurentTheRe). try {thread.sleep (1000); } catch (Ausnahme e) {e.printstacktrace (); }}} public static void main (String [] args) {mytask mytask = new mytask (); ExecutorService ES = Executors.NewFixedThreadpool (5); für (int i = 0; i <10; i ++) {es.submit (mytask); }}} Da Newfixed threadpool (5) verwendet wird, aber 10 Threads gestartet werden, werden 5 gleichzeitig ausgeführt, und es ist offensichtlich, dass die Wiederverwendung von Threads zu sehen ist. ThreadID wird wiederholt, dh die ersten 5 Aufgaben und die letzten 5 Aufgaben werden von derselben Thread -Stapel ausgeführt.
Was wird hier verwendet
Es.Submit (mytask);
Es gibt auch eine Möglichkeit zu einreichen:
es.execute (mytask);
Der Unterschied besteht darin, dass Subjekt ein zukünftiges Objekt zurückgeben wird, das später eingeführt wird.
1.4.2.Scheduled threadpool
importieren java.util.concurrent.executors; import java.util.concurrent // Wenn die vorherige Aufgabe nicht abgeschlossen wurde, beginnt der Versand nicht. SES.SchedulewithFixedDelay (new Runnable () {@Override public void run () {try {thread.sleep (1000); System.out.println (System.currentTimemillis ()/1000); Sekunden und dann alle 2 Sekunden in einem Zyklus}}}} ausführenAusgabe:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Da die Aufgabenausführung 1 Sekunde dauert, muss die Aufgabenplanung warten, bis die vorherige Aufgabe abgeschlossen ist. Das heißt, hier alle 2 Sekunden bedeutet hier, dass eine neue Aufgabe 2 Sekunden nach Abschluss der vorherigen Aufgabe gestartet wird.
2. Erweitern und verbessern Sie den Fadenpool
2.1. Rückrufschnittstelle
Es gibt einige Rückruf -APIs im Thread -Pool, die uns erweiterten Vorgängen bereitstellen.
ExecutorService es = neuer ThreadPoolexecutor (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingQueue <Runnable> ()) {@Override Protected void voranschließend (Thread t, Runnable r) {System.out.println ("Vorbereitung auf Execute"); } @Override Protected void AfterExecute (Runnable r, Throwable t) {System.out.println ("Ausführung abgeschlossen"); } @Override Protected void termined () {System.out.println ("Thread Pool Exit"); }};Wir können die Methoden vor und beenden von ThreadPoolexecutor implementieren, um die Protokollverwaltung oder andere Vorgänge vor und nach der Thread -Ausführung und Thread Pool Exit zu implementieren.
2.2. Ablehnungsstrategie
Manchmal sind die Aufgaben sehr schwer, was zu viel Last auf das System führt. Wie oben erwähnt, werden alle Aufgaben, wenn die Anzahl der Aufgaben zunimmt, in der blockierenden Warteschlange von BesteThreadpool platziert, was zu zu viel Speicherverbrauch und schließlich Speicherüberlauf führt. Solche Situationen sollten vermieden werden. Wenn wir feststellen, dass die Anzahl der Threads die maximale Anzahl von Threads überschreitet, sollten wir einige Aufgaben aufgeben. Beim Abwerfen sollten wir die Aufgabe aufschreiben, anstatt sie direkt wegzuwerfen.
Es gibt einen weiteren Konstruktor im ThreadPoolexecutor.
public threadpoolexecutor (int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue <Runnable> WorkQueue, ThreadFactory Factory, RejejecedExecutionHandler Handler) {if (corepoolsesize <0 || maximalSize <= 0 || MAXIMAUMSICELSIZEISISCHE <0,0 || maximalSize. IllegalArgumentException (); if (WorkQueue == null || threadFactory == null || Handler == null) neue nullpointerexception () werfen; this.corepoolsize = corepoolsize; this.maxImumpoolSize = maximumpoolSize; this.workQueue = WorkQueue; this.keepalivetime = unit.tonanos (keepalivetime); this.threadfactory = threadFactory; this.handler = Handler; } Wir werden später ThreadFactory einführen.
Der Handler lehnt die Implementierung der Richtlinie ab, wodurch wir mitteilen, was zu tun ist, wenn die Aufgabe nicht ausgeführt werden kann.
Es gibt insgesamt die oben genannten 4 Strategien.
AbortPolicy: Wenn die Aufgabe nicht akzeptiert werden kann, wird eine Ausnahme ausgelöst.
Callerrunspolicy: Wenn die Aufgabe nicht akzeptiert werden kann, lassen Sie den aufrufenden Thread abgeschlossen.
AUSGEBORDESTPOLICY: Wenn die Aufgabe nicht akzeptiert werden kann, wird die älteste Aufgabe von einer Warteschlange verworfen und gepflegt.
LORDPOLICY: Wenn die Aufgabe nicht akzeptiert werden kann, wird die Aufgabe verworfen.
ExecutorService ES = NEU Threadpoolexecutor (5, 5, 0L, Timeunit.seconds, New LinkedBlockingQueue <Runnable> (), neue AblehnungsexecutionHandler () {@Override public void rejectedexecution (Runnable r, threadpoolexecutor Executor) {System.OUT.OUT.OUT.OUT.OUT.OUT.PRINTLN (R.ToString () () (). }); Natürlich können wir auch die rejecjecteDexecutionHandler -Schnittstelle selbst implementieren, um die Ablehnungsrichtlinie selbst zu definieren.
2.3. Passen Sie ThreadFactory an
Ich habe gerade gesehen, dass ThreadFactory im Konstruktor von ThreadPoolexecutor angegeben werden kann.
Die Threads im Thread -Pool werden alle von der Thread -Fabrik erstellt, und wir können die Thread -Fabrik anpassen.
Standard -Thread -Fabrik:
statische Klasse StandardthreadFactory implementiert ThreadFactory {private statische endgültige Atomicinger Poolnumber = new Atomicinteger (1); private endgültige Threadgroup -Gruppe; Private Final AtomicInteger ThreadNumber = New AtomicInteger (1); private endgültige String -Nameprefix; DefaultThreadfactory () {SecurityManager s = System.getSecurityManager (); Gruppe = (s! = null)? S.GethreadGroup (): Thread.currentThread (). GetThreadGroup (); nameprefix = "pool-" + poolnumber.getandIncrement () + "-Thread-"; } public thread newThread (runnable r) {thread t = neuer Thread (Gruppe, r, nameprefix + threadNumber.getandIncrement (), 0); if (t.isdaemon ()) t.setdaemon (falsch); if (t.getPriority ()! return t; }}3. Forkjoin
3.1. Gedanken
Es ist die Idee, sich zu teilen und zu erobern.
Fork/Join ähnelt dem MapReduce -Algorithmus. Der Unterschied zwischen den beiden ist: Gabel/Join ist nur bei Bedarf in kleine Aufgaben unterteilt, z. B. wenn die Aufgabe sehr groß ist, während MapReduce immer den ersten Schritt für die Segmentierung durchführt. Es scheint, dass Gabel/Join für eine Thread -Ebene innerhalb eines JVM besser geeignet ist, während MapReduce für verteilte Systeme geeignet ist.
4.2. die Schnittstelle verwenden
Rekursivresiakt: Kein Rückgabewert
Rekursiv -Task: Es gibt einen Rückgabewert
4.3. Einfaches Beispiel
Import Java.util.ArrayList; Import java.util.concurrent.forkjoinpool; Import Java.util.Concurrent.forkjointask; Import Java.util.Concurrent Privat langer Start; privates langes Ende; public counttask (langes Start, langes Ende) {Super (); this.start = start; this.end = Ende; } @Override geschützt long compute () {long sum = 0; boolean canCompute = (Ende - Start) <Schwelle; if (canCompute) {für (long i = start; i <= end; i ++) {sum = sum+i; }} else {// in 100 kleine Aufgaben Long Step = (Start + Ende)/100; ArrayList <CountTask> subtasks = new ArrayList <CountTask> (); lange pos = Start; für (int i = 0; i <100; i ++) {last lastone = pos+Schritt; if (lastOne> end) {lastone = Ende; } Counttask subtask = new counttask (pos, lastOne); pos + = Schritt + 1; subtasks.add (Subtask); subtask.fork (); // Subtasks in Thread Pool} für (counttask t: Unteraufgaben) {sum += t.join (); // Warten auf alle Unteraufgaben zum Ende}} zurück; } public static void main (String [] args) {forkjoinpool forkjoinpool = new forkjoinpool (); Counttask Task = new Countkasd (0, 200000L); Forkjointask <Long> result = forkjoinpool.submit (Aufgabe); try {long res = result.get (); System.out.println ("sum =" + res); } catch (Ausnahme e) {// toDo: Behandeln Sie die Ausnahme E.PrintStackTrace (); }}} Das obige Beispiel beschreibt eine Aufgabe des Zusammenfassens. Teilen Sie die akkumulierten Aufgaben in 100 Aufgaben auf, jede Aufgabe führt nur eine Summe der Zahlen aus, und nach dem endgültigen Join wird die von jeder Aufgabe berechnete Summe angesammelt.
4.4. Implementierungselemente
4.4.1.WorkQueue und CTL
Jeder Thread hat eine Arbeitswarteschlange
statische Endklasse WorkQueue
In der Arbeitswarteschlange gibt es eine Reihe von Feldern, die Threads verwalten.
volatile int eventCount; // codierte Inaktivierungszahl; <0 wenn inaktiv
int NextWait; // codierte Aufzeichnung des nächsten Ereigniskellners
Int verengt sich; // Anzahl der Stähle
int tipp; // Stahlindex Hinweis
kurzer Poolindex; // Index dieser Warteschlange im Pool
endgültiger Kurzmodus; // 0: lifo,> 0: fifo, <0: geteilt
flüchtiger int Qlock; // 1: gesperrt, -1: Beenden; sonst 0
flüchtige Int -Basis; // Index des nächsten Slot für die Umfrage
int top; // Index des nächsten Slot zum Push
Forkjointask <?> [] Array; // Die Elemente (ursprünglich nicht zugewiesen)
Final Forkjoinpool Pool; // der enthaltene Pool (kann null sein)
Final ForkjoinworkerThread -Besitzer; // Thread oder Null besitzen, wenn sie geteilt werden
flüchtiger Faden Parker; // == Besitzer während des Anrufs zum Park; sonst null
volatile gabelJointask <?> currentJoin; // Aufgabe in Awaitjoin angeschlossen werden
Forkjointask <?> CurrentStreal; // aktuelle nicht-lokale Aufgabe, die ausgeführt wird
Es ist hier zu beachten, dass es einen großen Unterschied zwischen JDK7 und JDK8 bei der Implementierung von Forkjoin gibt. Was wir hier vorstellen, stammt von JDK8. Im Thread -Pool werden manchmal nicht alle Threads ausgeführt, einige Threads werden suspendiert und diese suspendierten Threads werden in einem Stapel gespeichert. Es wird intern durch eine verknüpfte Liste dargestellt.
NextWait zeigt auf den nächsten wartenden Thread.
Der Indexindex des Index im PoolIndex -Threadpool.
EventCount Wenn EventCount initialisiert wird, bezieht sich EventCount mit PoolIndex. Insgesamt 32 Bit gibt das erste Bit an, ob es aktiviert ist, und 15 Bits gibt an, wie oft es suspendiert wurde
EventCount, der Rest repräsentiert PoolIndex. Verwenden Sie ein Feld, um mehrere Bedeutungen darzustellen.
WorkQueue WorkQueue wird durch Forkjointask <?> [] Array dargestellt. Top und Basis repräsentieren beide Enden der Warteschlange, und die Daten sind zwischen diesen beiden.
CTL (64-Bit-Long-Typ) in Forkjoinpool beibehalten
flüchtige lange CTL;
* Field CTL ist ein langer Pack mit:
* AC: Anzahl der aktiven laufenden Arbeiter minus Zielparallelität (16 Bit)
* TC: Anzahl der gesamten Arbeitnehmer minus Zielparallelität (16 Bit)
* ST: TRUE, wenn der Pool beendet wird (1 Bit)
* EC: Die Wartezahl von Top -Wartefaden (15 Bit)
* ID: Poolindex von Treiber Stapel von Kellnern (16 Bit)
AC repräsentiert die aktive Fadenzahl abzüglich des Parallelismus -Grades (wahrscheinlich die Anzahl der CPUs)
TC bedeutet die Gesamtzahl der Fäden abzüglich Parallelität
ST gibt an, ob der Fadenpool selbst aktiviert ist
EC repräsentiert die Anzahl der suspendierten Threads in der oberen Wartezeit
ID gibt den PoolIndex an
Es ist offensichtlich, dass ST+EC+ID das ist, was wir gerade EventCount genannt haben.
Warum müssen Sie also eine Variable mit 5 Variablen synthetisieren? Tatsächlich ist die Kapazität mit 5 Variablen ungefähr gleich.
Die Lesbarkeit der Verwendung eines variablen Codes wird viel schlechter sein.
Warum also eine Variable verwenden? In der Tat ist dies die klugste Sache, weil diese 5 Variablen ein Ganzes sind. Bei Verwendung von Multi-Threading, wenn 5 Variablen verwendet werden, wird beim Ändern einer der Variablen die Integrität der 5 Variablen sichergestellt. Die Verwendung einer Variablen löst dieses Problem. Wenn es mit Schlösser gelöst wird, wird die Leistung verschlechtert.
Die Verwendung einer Variablen sorgt für die Konsistenz und Atomizität der Daten.
Die Änderungen der Forkjoin Squadron CTL werden alle mit CAS -Operationen durchgeführt. Wie in der vorherigen Artikelserie erwähnt, ist CAS eine lockfreie Operation und hat eine gute Leistung.
Da CAS -Operationen nur auf eine Variable abzielen können, ist dieses Design optimal.
4.4.2. Diebstahl arbeiten
Als nächstes werden wir den Workflow des gesamten Thread -Pools vorstellen.
Jeder Thread ruft Runworker auf
endgültiger void Runworker (WorkQueue w) {W.Growarray (); // Warteschlange für (int r = w.Hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorShift}} Die Funktion scan () besteht darin, nach Aufgaben zu scannen.
R ist eine relativ zufällige Zahl.
Private Final INT -Scan (WorkQueue W, int r) {WorkQueue [] WS; int m; lange c = ctl; // für Konsistenzprüfung, wenn (((ws = WorkQueue)! int b, e; Forkjointask <?> [] A; Forkjointask <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! if ((t = ((forkjointask) u.getObjectVolatile (a, i))! sonst if (q.base == b && u.comPareAnDSWAPObject (a, i, t, null)) {u.putorderedInt (q, qbase, b + 1); if ((b + 1) - q.top <0) Signalarbeit (WS, q); W. Runtask (T); } } brechen; } else if (--j <0) {if ((ec | (e = (int) c)) <0) // inaktiv oder terminierende return actait-actwork (w, c, ec); sonst if (ctl == c) {// Versuchen Sie, lange nc = (lang) ec | zu inaktivieren und zu inaktivieren. ((c - ac_unit) & (acmask | tc_mask)); W.NextWait = e; W.EventCount = EC | Int_sign; if (! u.comPareandswaplong (this, ctl, c, nc)) W.EventCount = EC; // wieder raus} break; }}} return 0; } Schauen wir uns die Scan -Methode an. Ein Scan -Parameter ist Workqueue. Wie oben erwähnt, verfügt jeder Thread über einen Workqueue, und der Arbeitsqueue von mehreren Fäden wird in Workqueueues gespeichert. R ist eine Zufallszahl. Verwenden Sie R, um einen Workqueue zu finden und Aufgaben in Workqueue zu erledigen.
Lassen Sie dann durch die Arbeitsqueue -Basis den Basisversatz erhalten.
B = Q.Base
..
lang i = (((A.Length - 1) & b) << Ashift) + Abase;
..
Erhalten Sie dann die letzte Aufgabe über den Offset und führen Sie diese Aufgabe aus
t = ((forkjointask <?>) U.GetObjectVolatile (a, i))
..
W. Runtask (T);
..
Durch diese grobe Analyse haben wir festgestellt, dass nach dem aktuellen Thread die Scan -Methode aufgerufen wird, die Aufgaben nicht im aktuellen Workqueue ausführen, sondern andere Workqueue -Aufgaben über eine Zufallszahl r erhalten. Dies ist einer der Hauptmechanismen von Forkjoinpool.
Der aktuelle Thread konzentriert sich nicht nur auf seine eigenen Aufgaben, sondern priorisiert auch andere Aufgaben. Dies verhindert, dass der Hunger stattfindet. Dies verhindert, dass einige Threads aufgrund festgefahrener oder anderer Gründe nicht in der Lage sind, die Aufgaben nicht rechtzeitig zu erledigen, oder ein Faden hat eine große Anzahl von Aufgaben, andere Threads haben jedoch nichts zu tun.
Schauen wir uns dann die Runtask -Methode an
endgültiger void runtask (forkjointask <?> task) {if ((currentSteal = task)! task.doexec (); Forkjointask <?> [] A = Array; int md = modus; ++ nsteale; currentStal = null; if (md! = 0) pollandexecall (); sonst wenn (a! = null) {int s, m = A.Length - 1; Forkjointask <?> T; while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getAndObject (a, (m & s) << Ashift) + Abase, null)! = null) {top = s; t.doexec (); }} if ((thread = besitzer)! }}Es gibt einen interessanten Namen: CurrentSteal, die gestohlene Aufgabe ist in der Tat das, was ich gerade erklärt habe.
task.doexec ();
Diese Aufgabe wird erledigt.
Nachdem Sie die Aufgaben anderer Personen erledigt haben, werden Sie Ihre eigenen Aufgaben erledigen.
Holen Sie sich die erste Aufgabe, indem Sie die Spitze erhalten
while ((s = top - 1) - base> = 0 && (t = (forkjointask <?>) u.getAndObject (a, (m & s) << Ashift) + Abase, null)! = null) {top = s; t.doexec ();}Verwenden Sie als nächstes eine Grafik, um den Prozess des Thread -Pools gerade zusammenzufassen.
Zum Beispiel gibt es zwei Threads T1 und T2. T1 erhält die letzte Aufgabe von T2 über die Basis von T2 (natürlich ist es tatsächlich die letzte Aufgabe eines Threads durch eine Zufallszahl R), und T1 wird auch seine erste Aufgabe über seine eigene Oberseite ausführen. Im Gegenteil, T2 wird dasselbe tun.
Die Aufgaben, die Sie für andere Threads übernehmen, beginnen von der Basis, und die Aufgaben, die Sie für sich selbst übernehmen, beginnen von oben. Dies reduziert Konflikte
Wenn keine anderen Aufgaben gefunden werden
sonst if (--j <0) {if ((ec | (e = (int) c)) <0) // inaktiv oder terminierende return actait actwork (w, c, ec); sonst if (ctl == c) {// Versuchen Sie, lange nc = (lang) ec | zu inaktivieren und zu inaktivieren. ((c - ac_unit) & (acmask | tc_mask)); W.NextWait = e; W.EventCount = EC | Int_sign; if (! u.comPareandswaplong (this, ctl, c, nc)) W.EventCount = EC; // wieder raus} break; } Zuerst wird der Wert von CTL durch eine Reihe von Läufen geändert, NC wird erhalten und dann wird der neue Wert CAS zugeordnet. Rufen Sie dann auf die Wartestatus (die in der vorherigen Artikelserie genannten Parkmethode als unsichere Parkmethode genannt) an.
Was wir hier erklären müssen, ist, den CTL -Wert zu ändern. Zuerst nimmt AC -1 in CTL und AC die Top 16 Bit CTL ein, sodass es nicht direkt -1 sein kann, sondern erreicht stattdessen den Effekt, dass die Top 16 Bit CTL -1 bis AC_Unit (0x100000000000000) der ersten 16 Bit von CTL hergestellt werden.
Wie bereits erwähnt, rettet der EventCount den PoolIndex und über den PoolIndex und den nächsten Wait in WorkQueue können Sie alle Wartefäden durchqueren.