1. Verzögern Sie die Ausführungsaktion
Es kann mit der Timer+MAP -Methode implementiert werden. Der Code ist wie folgt:
Observable.Timer (5, TimeUnit.Milliseconds) .MAP (Wert-> {return dosomething ();}). Abonnieren (System.out :: println); } 2. Verzögern Sie das Sendungsergebnis
Dieses Szenario erfordert, dass die Aktion zum Generieren von Daten sofort ausgeführt wird, das Ergebnis wird jedoch beim Senden verzögert. Dies unterscheidet sich vom obigen Szenario.
Dieses Szenario kann mit Observable.zip implementiert werden.
Der Zip -Operator kombiniert die von mehreren Observablen übertragenen Daten, jede Daten können nur einmal kombiniert werden und sind alle geordnet. Die Anzahl der endgültigen kombinierten Daten wird durch das Observable bestimmt, die die geringsten Daten übertragen.
Für Daten an derselben Stelle jeder Beobachtbaren müssen Sie aufeinander warten. Das heißt, nachdem die Daten an der ersten Stelle des ersten Beobachtbaren generiert wurden, müssen Sie auf die Daten am ersten Ort des zweiten beobachtbaren zu erzeugten warten, und nachdem die Daten an derselben Stelle jedes Beobachtbaren generiert wurden, können Sie nach den angegebenen Regeln kombinieren. Das wollen wir wirklich verwenden.
Es gibt viele Arten von Erklärungen im Reißverschluss, aber es ist ungefähr das gleiche, nämlich in mehreren Observablen zu bestehen, dann eine Regel anzugeben, um die Daten an der entsprechenden Position jedes Observablen zu verarbeiten und neue Daten zu generieren. Hier ist einer der einfachsten:
public static <t1, t2, r> beobachtbares <r> ZIP (beobachtbar <? Erweitert T1> O1, Beobachtbar <? T2> O2, endgültige func2 <? Super T1, Super T2, erweitert R> sickfunktion);
Die Ausführungsergebnisse von Push und Senden mit ZIP sind wie folgt:
Observable.zip (Observable.timer (5, TimeUnit.Milliseconds), Observable.just (doSomething ()), (x, y)-> y) .Subscribe (System.out :: println));
3.. Verwenden Sie die Aufhebung, um bestimmte Aktionen im angegebenen Thread auszuführen
Obwohl wir im folgenden Code die laufende Methode des Threads angeben, wird die Funktion doSomething() im Thread, das vom aktuellen Code aufgerufen wird, weiterhin ausgeführt.
Observable.just (doSomething ()) .SubScribeon (ender.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION ()) .SubScribe (v-> utrintlnwiththread (v.toString ());
Normalerweise verwenden wir die folgenden Methoden, um unser Ziel zu erreichen:
Observable.create (s-> {S.ONNext (doSomething ());}) .Subscribeon (planlers.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION ()) .SubScribe (v-> {utils.printlnwiththread (v.tostring ());});Tatsächlich können wir jedoch das gleiche Ziel erreichen, indem wir Aufschub verwenden.
Über Aufhebung
Der Aufschuboperator ist der gleiche wie er das Erstellen von und anderen Betreibern. Es schafft Klassenbetreiber, aber alle Daten, die sich auf diesen Bediener beziehen, werden nur dann wirksam, wenn Sie sich abonnieren.
Stellungnahme:
public static <t> beobachtbar <t> Auf Defer (func0 <beobachtbar <t >> beobachtbares Faktor);
Das beobachtbare in Defers Func0 wird nur beim Abonnieren erstellt.
Wirkung:
Erstellen Sie das Observable erst, wenn sich ein Beobachter abonniert. Erstellen Sie in jedem Abonnement ein frisches Beobachtbar.
Mit anderen Worten, beobachtbar wird beim Abonnieren erstellt.
Das obige Problem wird mit Verschiebung implementiert:
Observable.Defer (()-> Observable.just (doSomething ()) .Subscribeon (ender.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION () .Subscribe (v-> {utils.printlnwiththread (v.toSstring ()); 4. Brechen Sie die Kettenstruktur nicht mit Compose
Wir sehen oft den folgenden Code:
Observable.just (doSomething ()) .Subscribeon (enderulers.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION ()) .SubScribe (v-> {utrintlnwithThread (v.toString ()); Im obigen Code kann subscribeOn(xxx).observeOn(xxx) an vielen Stellen gleich sein. Wenn wir vorhaben, es an einem bestimmten Ort zu implementieren, können wir es so schreiben:
private static <t> Observable <T> applysSchedulers (beobachtbar <t> beobachtbar) {return Observable.SubScribeon (Schedulers.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION ()); }Aber jedes Mal, wenn wir die obige Methode aufrufen müssen, wird es ungefähr wie die folgenden sein, und das äußerste ist eine Funktion, die dem Brechen der Verbindungsstruktur entspricht:
applySchedulers (beobachtbar.from (somesource) .MAP (neue func1 <daten, Data> () {@Override öffentliche Datenaufruf (Datendaten) {return Manipulate (Data);}}).Der Komponierungsoperator kann verwendet werden, um den Zweck zu erreichen, die Verbindungsstruktur nicht zu brechen.
Die Aussage des Komponierens lautet wie folgt:
public Observable Compose (Transformator <? Super t,? Erweitert R> Transformator);
Sein eingehender Parameter ist eine Transformator -Schnittstelle und die Ausgabe ist beobachtbar. Transformator ist tatsächlich ein Func1<Observable<T> Observable<R>> mit anderen Worten: Eine Art von Beobachtbar kann in eine andere Art von Beobachtbar umgewandelt werden.
Einfach ausgedrückt kann Compose das ursprüngliche beobachtbare in ein anderes beobachtbar durch die angegebene Konvertierungsmethode (Eingabeparametertransformator) umwandeln.
Verwenden Sie über Kompose die folgende Methode, um die Thread -Methode anzugeben:
Private static <T> Transformator <t, t> applySSchedulers () {return New Transformator <t, t> () {@Override public Observable <T> Call (Observable <T> Observable) {return Observable.SubScribe () (Schedulers.io ()) .OBSERVEON (plantulers.computation ()); }}; } Observable.just (doSomething ()). Compose (applysSchedulers ()) .SubScribe (v-> {utils.printlnWitHthread (v.toString ());});Die Funktionsanwendungen können mit Lambda -Ausdrücken zu Folgendem weiter vereinfacht werden:
private static <T> Transformator <T, t> applySSchedulers () {Return Observable-> Observable.SubScribeon (Schedulers.io ()) .OBSERVEON (SCHLADULERS.COMPUTATION ()); } 5. Verwenden Sie unterschiedliche Ausführungsergebnisse gemäß Priorität
Der obige Titel drückte wahrscheinlich nicht das Szenario aus, das ich klar ausdrücken wollte. Tatsächlich ähnelt das Szenario, das ich ausdrücken möchte, dem üblichen Szenario, Netzwerkdaten zu erhalten: Wenn ein Cache vorhanden ist, wird es aus dem Cache erhalten, und wenn nein, wird es vom Netzwerk erhalten.
Wenn hier ein Cache vorliegt, wird die Aktion des Erhaltens von Daten aus dem Netzwerk nicht durchgeführt.
Dies kann mit Concat+First implementiert werden.
Concat verschmilzt mehrere Observablen in ein Observable und gibt das endgültige Beobachtbare zurück. Und diese Daten sind wie von einem beobachtbaren. Die Parameter können mehrere Observablen oder Iterator sein, die Beobachtung enthalten.
Die Daten im neuen Beobachtbaren sind in der Reihenfolge der Beobachtbaren in der ursprünglichen Hände angeordnet, dh die Daten im neuen Ergebnis sind in der ursprünglichen Reihenfolge sortiert.
Das Folgende ist die Implementierung der oben genannten Anforderungen:
Observable.concat (getDatafromcache (), getDatafromNetwork ()). First () .SubScribe (v-> System.out.println ("Ergebnis:"+v)); // Daten aus cache private static beobachtbare <string> getDatafromcache () {return Observable.create (s -> {// doomting thating to Abrufenin int value = newInt (). NextInt (); value = value%2; if (value! S.oncompleted (); } // Daten aus dem Netzwerk private statische statische beobachtbare <string> getDatafromNetwork () {return Observable.create (s -> {für (int i = 0; i <10; i ++) {utils.println ("ob2 generieren"+i); }In der obigen Implementierung wird der Code hier in getDatafromNetwork nicht ausgeführt, wenn GetDatafromcache Daten hat, genau das ist, was wir wollen.
Es gibt einige Implementierungen darüber, dass diese Aufmerksamkeit erforderlich ist:
1. Es ist möglich, dass Daten von beiden Stellen nicht erhalten werden können. In diesem Szenario wirft die Verwendung von First eine Ausnahme -NoSuchelementException aus. Wenn dieses Szenario der Fall ist, müssen Sie das erste oben durch FirstorDefault ersetzen.
2. In getDataFromCache() oben, wenn keine Daten vorhanden sind, nennen wir OnCompleted direkt. Wenn wir nicht als On-Completed, sondern OnError anrufen, wird die oben genannte Verwendung von CHOWAT keine Ergebnisse erzielt. Denn wenn Concat einen Fehler erhält, wird der Zusammenschluss gestoppt. Wenn Sie OnError verwenden möchten, müssen Sie daher ConcatDelayError anstelle von concat.concatDelayError ConcatDelayError ignoriert den Fehler zuerst und verschiebt den Fehler bis zur Endverarbeitung.
Zusammenfassen
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, der Inhalt dieses Artikels wird Ihnen bei Ihrem Studium oder Ihrer Arbeit helfen. Wenn Sie Fragen haben, können Sie eine Nachricht zur Kommunikation überlassen.