Es gibt oft einige Aufgaben im Projekt, die asynchron ausgeführt werden müssen (zum Thread -Pool übermittelt), während der Haupt -Thread häufig die Ergebnisse der asynchronen Ausführung kennen muss. Was sollen wir zu diesem Zeitpunkt tun? Mit Runnable ist es unmöglich, den folgenden Code zu lesen:
import Java.util.concurrent.callable; import Java.util.concurrent public addtask (int a, int b) {this.a = a; this.b = b; } @Override public Integer Call löst Ausnahme aus {Integer result = a + b; Rückgabeergebnis; } public static void main (String [] args) löst InterruptedException, ExecutionException aus {ExecutorService Executor = Executors.NewsingLethreadExecutor; // JDK ist bisher zurückgekehrt und sind Instanzen von Futuretask Future <Gearner> Future = Executor.Submit (neuer Addtask (1, 2)); Integer result = Future.get; // Nur wenn der Status der Zukunft abgeschlossen ist (future.isdone = true), wird die GET -Methode zurückgeben}}} Obwohl wir die Anforderung erkennen können, asynchrone Ausführungsergebnisse zu erzielen, stellten wir fest, dass diese Zukunft tatsächlich nicht nützlich ist, da sie keinen Benachrichtigungsmechanismus liefert. Dies bedeutet, dass wir nicht wissen, wann die Zukunft abgeschlossen sein wird (wenn wir ISDone () beurteilen müssen, um zu beurteilen, wie es nicht erforderlich ist, dies zu verwenden). Schauen Sie sich die Schnittstellenmethode von java.util.concurrent.future.future an:
öffentliche Schnittstelle zukünftige <V> {boolean abbrechen (boolean mayinterruptiFrunning); boolean iscancelled; boolean isdone; V throws interruptedException, ExecutionException; V GET (langfristige Zeiteinheit) löst InterruptedException, ExecutionException, TimeoutException aus;} Daraus können wir erkennen, dass der zukünftige Mechanismus von JDK eigentlich nicht einfach zu bedienen ist. Wenn Sie dieser Zukunft einen Zuhörer hinzufügen und den Hörer nach Abschluss benachrichtigen lassen können, ist dies einfacher zu bedienen, genau wie die folgende Ifuture:
Paket Future; Import Java.util.Concurrent.CancellationException; Import Java.util.Concurrent * * @author lixiaohui * @param <V> Typ Parameter des Ausführungsergebnisses */öffentliche Schnittstelle Ifuture <V> erweitert Future <V> {boolean issuccess; // ob V GetNow erfolgreich ist; // das Ergebnis sofort zurückgibt (unabhängig davon, ob die Zukunft im abgeschlossenen Zustand ist), faszbare Ursache; // Der Grund für den Ausführungsversagen ist stornierbar. // kann ich ifUture <v> auf die InterruptedException abbrechen? // Warten auf die zukünftige Fertigstellung boolean wartet (langfristig). // Timeout Warten Sie auf die zukünftige Abschluss boolean warten (langfristig, zeitunitieren). Ifuture <V> wartelinterbrechbar; // Warten auf zukünftige Fertigstellung, kein Unterbrechung boolean wartetUnterriptable (langfristig); // Timeout wartet auf zukünftige Fertigstellung, keine Interrupt -Antwort boolean erwartete: langfristig (langfristig, zeitunitiert); Ifuture <V> addListener (ifuturelistener <v> l); // Wenn die Zukunft abgeschlossen ist, werden diese zusätzlichen Zuhörer ifuture <V> removelistener (ifuturelistener <v> l) benachrichtigt. } Lassen Sie uns als nächstes diese Ifuture gemeinsam implementieren. Vorher werden wir das Objekt erklären. Schauen Sie sich die Erklärung in JDK an:
Das öffentliche Klassenobjekt { /** * lässt den aktuellen Thread warten, bis ein anderer Thread die * {@link java.lang.object#nachify} oder die * {@link java.lang.object#nocifyall} für dieses Objekt aufruft. * Mit anderen Worten, diese Methode verhält sich genau so, als ob sie einfach * den Anruf {@Code Wait (0)} ausführt. * Nachdem der aktuelle Thread diese Methode aufgerufen hat, wird das Objektmonitor -Sperre veröffentlicht und die CPU -Nutzungsrechte aufgeben. Bis ein anderer Thread -Aufruf notify/ notifyAll */ public endgültiger void Wait auswirft InterruptedException {Wait (0); } /*** wacht alle Threads auf, die auf den Monitor dieses Objekts warten. Ein * Thread wartet auf dem Monitor eines Objekts, indem Sie einen der * {@code Wait} -Methoden aufrufen. * <p> * Die erwachten Threads können erst fortfahren, bis der aktuelle * Thread die Sperre für dieses Objekt aufgibt. Die erwachten Threads * konkurrieren auf die übliche Weise mit anderen Threads, die aktiv mit der Synchronisierung dieses Objekts konkurrieren. Zum Beispiel * Die erwachten Threads genießen keine zuverlässigen Privilegien oder Nachteile, wenn * der nächste Thread ist, der dieses Objekt sperle. */ Public Final Native Void Notifyall;} Nachdem wir dies wissen, haben wir eine Idee, um die Zukunft selbst umzusetzen. Wenn der Thread eine Reihe von Methoden wie ifuture.aait aufruft und die Zukunft nicht abgeschlossen ist, rufen Sie die Zukunft an. Warten Sie die Methode, um den Thread in den Wartezustand einzugeben. Wenn andere Threads die Zukunft auf den Abschlusszustand festlegen (beachten Sie, dass der Abschlusszustand hier das normale Ende und das abnormale Ende enthält), muss die zukünftige Methode aufgerufen werden, um die Threads aufzuwecken, die sich im Wartezustand befanden, weil die Wartenmethode aufgerufen wurde. Die vollständige Implementierung ist wie folgt (der Code sollte nicht schwer zu verstehen sein. Ich beziehe mich auf den zukünftigen Mechanismus von Netty. Wenn Sie interessiert sind, können Sie den Quellcode von Netty überprüfen):
Paket Zukunft; import Java.util.Collection; Import Java.util.Concurrent.CancellationException; Import Java.util.Concurrent * Wenn das Ausführungsergebnis nicht null ist, ist das Ergebnis das Ausführungsergebnis. Wenn das Ausführungsergebnis NULL ist, dann ist Ergebnis = {@link contractFuture#success_signal} * Wenn die Ausnahme endet, ist das Ergebnis eine Instanz von {@link CauseHolder}; Wenn die Ausnahme aufgrund von Stornierung endet, ist das Ergebnis eine Instanz von {@link cancellationException}. Andernfalls handelt es sich um eine Instanz anderer Ausnahmen. <li>When the asynchronous operation ends normally (setSuccess method) </li> * <li>When the asynchronous operation ends abnormally (setFailure method) </li> * </ul> * </pre> * * @author lixiaohui * * @param <V> * Type of asynchronous execution result*/public class AbstractFuture<V> implements IFuture<V> { geschütztes flüchtiges Objekt Ergebnis; // Es muss garantiert eine Sichtbarkeit sein/ *** Listener -Set*/ Protected Collection <ifuturelistener <v >> listener = newOnonwriteArrayList <ifuturelistener <v >>; / *** Wenn das normale Ausführungsergebnis der Aufgabe null ist, dh wenn der Client {@link AbstractFuture#setSuccess (null)} aufruft, bezieht sich das Objekt auf das Objekt*/ private statische endgültige Successsignal Successnal Success_signal = New Successsignal; @Override public boolean stornieren (boolean MayInterruptiFrunning) {if (isdone) {// return false kann nicht storniert werden; } synchronized (this) {if (isdone) {// double check return false; } result = neuer Dauerinhaber (neue Stornierungxzeption); notifyall; // isdone = true, benachrichtigen Sie den Thread, der auf das Warten auf dem Objekt wartet. // den Hörer benachrichtigen, dass die asynchrone Operation abgeschlossen wurde. Return True; } @Override public boolean iscancellable {return result == null; } @Override public boolean iscancelled {Rückgabeergebnis! = Null && resultinstance von Causeinder && ((((CauseHolder)) .Cause Instance der Stornierungxzeption; } @Override public boolean isdone {return Ergebnis! = Null; } @Override public v throws interruptedException, executionException {wartet; // Warten Sie auf das Ausführungsergebnis Throwable Ursache = Ursache; if (cause == null) {// Keine Ausnahme aufgetreten, die asynchrone Operation endete normalerweise zurück GetNow; } if (Ursacheninstanz von CancellationException) {// Die asynchrone Operation wurde abgebrochen (CancellationException) Ursache; } neue executionException (Ursache) werfen; // Andere Ausnahmen} @Override public v GET (langfristig, Zeiteinheit) löscht InterruptedException, ExecutionException, Timeoutexception {if (ute (timeout, ap)) {// Timeout Warted auf die Ausführungsergebnissergebnis = Ursache; if (cause == null) {// Keine Ausnahme aufgetreten, die asynchrone Operation endete normalerweise zurück GetNow; } if (Ursacheninstanz von CancellationException) {// Die asynchrone Operation wurde abgesagt, Wurf (CancellationException) Ursache; } Wirf eine neue ExecutionException (Ursache); // andere Ausnahmen} // Die Zeit ist noch nicht beendet, und wirft eine Ausnahme von Timeout -Ausnahme. } @Override public boolean issuccess {return result == null? Falsch:! (Ergebnisinstanz von Causein); } @SuppressWarnings ("Deaktiviert") @Override public v GetNow {return (v) (result == success_signal? Null: result); } @Override public Throwable Cause {if (result! = Null && resultinstance von causeHolder) {return ((CauseHolder) Ergebnis) .Cause; } return null; } @Override public ifUture <V> addierener (ifuturelistener <V> Hörer) {if (louser == null) {neue nullPoIterexception ("Hörer"); } if (isdone) {// Wenn Sie notifyListener (Listener) abgeschlossen haben; gib dies zurück; } synchronized (this) {if (! isdone) {listeners.add (Hörer); gib dies zurück; }} notifyListener (Hörer); gib dies zurück; } @Override public ifUture <V> removelistener (ifuturelistener <V> Hörer) {if (louser == null) {neue nullpointerexception ("Hörer"); } if (! isdone) {listener.remove (Hörer); } zurückgeben; } @Override public ifUture <v> wartet die InterruptedException {return acait0 (true); } private ifuture <v> wartete0 (boolean untereinander) löst unterbrochene Ausnahme aus. unterbrochen. "); } boolean unterbrochen = false; synchronisiert (this) {while (! isdone) {try {wait; // Die Sperre veröffentlichen und den Wartezustand eingeben. Warten Sie, bis andere Threads die Benachrichtigung/NotifyAll -Methode des Objekts aufrufen (InterruptedException e) {if (interprptierbar) {throw e; } else {interrupted = true; }}}}} if (unterbrochen) {// Warum müssen wir das Interrupt -Flag hier festlegen? Denn nach der Rückkehr aus der Wait -Methode wird das Interrupt -Flag gelöscht, // hier zurücksetzen, damit andere Codes wissen, dass es hier unterbrochen wird. Thread.currentThread.interrupt; }} zurückgeben; } @Override public boolean wartet (langfristig) (langfristig) InterruptedException {return act0 (TimeUnit.Milliseconds.tonanos (timeoutmillis), wahr); } @Override public boolean wartet (langfristig, Zeiteinheit). } private boolean warte 0 (langfristig, boolean unteruft) unterrubt die InterruptedException {if (isdone) {return true; } if (timeoutnanos <= 0) {return isdone; } if (interruptierbar && thread.interrupted) {neue InterruptedException (toString); } Long start time = timeoutnanos <= 0? 0: System.nanotime; lange Wartezeit = Timeoutnanos; boolean unterbrochen = falsch; Versuchen Sie {synchronized (this) {if (isdone) {return true; } if (WaitTime <= 0) {return isdone; } für (;;) {try {wait time / 1000000, (int) (WaitTime % 1000000)); } catch (InterruptedException e) {if (interprptierbar) {throw e; } else {interrupted = true; }} if (isdone) {return true; } else {WaitTime = timeoutnanos - (System.Nanotime - StartTime); if (WaitTime <= 0) {return isdone; }}}}}} endlich {if (unterbrochen) {thread.currentThread.interrupt; }}} @Override public ifUture <v> acaTUtUnterriptable {try {return acait0 (false); } catch (InterruptedException e) {// Wenn eine Ausnahme hierher ausgelöst wird, kann sie nicht gehandhabt werden. }} @Override public boolean acaituninterriptable (Long TimeoutMillis) {try {return acait0 (TimeUnit.Milliseconds.tonanos (timeoutmillis), false); } catch (InterruptedException e) {throw New Java.lang.Internalerror; }} @Override public boolean acaituninterriptable (Long Timeout, TimeUnit Unit) {try {return acait0 (Einheit.Tonanos (Timeout), false); } catch (InterruptedException e) {throw New Java.lang.Internalerror; }} geschützte Ifuture <V> setFailure (throwable cause) {if (setFailure0 (Ursache)) {NotifyListeners; gib dies zurück; } neue IllegalStateException werfen ("FOLUSIGES Sie bereits:" + this); } private boolean setfailure0 (throwable corre) {if (isdone) {return false; } synchronisiert (this) {if (isdone) {return false; } result = neuer Causeinder (Ursache); notifyall; } Return true; } Protected ifuture <V> setSuccess (Objektergebnis) {if (setSuccess0 (Ergebnis)) {// nach erfolgreich eingestellt; gib dies zurück; } neue IllegalStateException werfen ("FOLUSIGES Sie bereits:" + this); } private boolean setSuccess0 (Objektergebnis) {if (isdone) {return false; } synchronisiert (this) {if (isdone) {return false; } if (result == null) {// Das Ergebnis der normalen Ausführung der asynchronen Operation ist null this.result = success_signal; } else {this.result = result; } notifyAll; } Return true; } private void mellifyListener {für (ifuturelistener <V> l: Hörer) {NotifyListener (l); }} private void mellifyListener (ifuturelistener <V> l) {try {l.operationCompleted (this); } catch (Ausnahme e) {e.printstacktrace; }} private statische Klasse SuccessSignal {} private statische endgültige Klasse Causeinder {endgültige Throwable Cause; Causeinder (Throwable Cause) {this.cause = cause; }}} Wie benutzt man das? Mit der obigen Skelett -Implementierung können wir verschiedene asynchrone Ergebnisse anpassen. Das Folgende ist eine verzögerte Aufgabe:
Package Future.test; Import Future.ifuture; Import Future OperationCompletEd (ifuture <Ganzzahl> Future) löst Ausnahme {System.out.println (Future.getNow);}}); } / *** Verzögerung Addition* @Param Verzögerungsverzögerungsdauer Millisekunden* @param A Addition* @param b Addition* @Return Asynchrones Ergebnis* / public DelayAdDitionFuture add (lange Verzögerung, int a, int b) {delayAdDitionFuture = New DelayAdDitionFuture; neuer Thread (neuer DelayAdDitionTask (Delay, A, B, Future)). Zukunft zurückgeben; } private Klasse DelayAdDitionTask implementiert runnable {private long delay; Privat int a, b; Private DelayadditionFuture Future; public DelayAdDitionTask (Long Delay, Int A, Int B, DelayAdDitionFuture Future) {Super; this.delay = delay; this.a = a; this.b = b; this.future = Future; } @Override public void run {try {thread.sleep (delay); Ganzzahl i = a + b; // todo hier ist der zukünftige für den Abschlussstatus festgelegt (normale Ausführung ist abgeschlossen). SetSuSccess (i); } catch (InterruptedException e) {// Todo hier ist der zukünftige für den Abschlussstatus festgelegten Status (Ausnahme ist abgeschlossen) Future.SetFailure (e.getCause); }}}} Paket Future.test; Import Future.abstractFuture; Import Future } @Override public ifuture <Ganzzahl> setfailure (throwable cause) {return Super.setFailure (Ursache); }} Sie können sehen, dass der Kunde nicht aktiv fragen muss, ob die Zukunft abgeschlossen ist, sondern die operationCompleted -Methode automatisch zurückrufen, wenn die Zukunft abgeschlossen ist. Der Client muss die Logik nur im Rückruf implementieren.
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.