1.実行アクションを遅らせます
タイマー+マップメソッドを使用して実装できます。コードは次のとおりです。
Observable.timer(5、timeunit.milliseconds).map(value-> {return dosomething();})。subscribe(system.out :: println); } 2。送信結果を遅らせます
このシナリオでは、データを生成するアクションをすぐに実行する必要がありますが、結果は送信が遅れます。これは、上記のシナリオとは異なります。
このシナリオは、 Observable.zipを使用して実装できます。
ZIP演算子は、複数の観測可能性によって送信されたデータを順番に組み合わせ、各データは一度しか結合できず、すべて順序付けられます。最終的な結合データの数は、最小データを送信する観測可能なものによって決定されます。
観察可能な各場所にあるデータの場合、お互いを待つ必要があります。つまり、最初の観測可能な場所の最初の場所のデータが生成された後、生成可能な2番目の観測可能な場所の最初の場所のデータを待つ必要があり、各観測可能な場所のデータが生成された後、指定されたルールに従って結合できます。これは本当に私たちが使いたいものです。
ZIPには多くの種類の宣言がありますが、それはほぼ同じです。これは、いくつかの観測可能性に合格し、各観測可能な場所の対応する場所でデータを処理するルールを指定して、新しいデータを生成することです。これが最も単純なものの1つです:
public static <t1、t2、r> observable <r> zip(観測可能<?extends t1> o1、observable <?extends t2> o2、final func2 <?super t1、?super t2、?r> zipfunction);
zipを使用してプッシュアンド送信の実行結果は次のとおりです。
Observable.Zip(Observable.timer(5、timeUnit.milliseconds)、Observable.just(dosomething())、(x、y) - > y).subscribe(system.out :: println));
3.延期を使用して、指定されたスレッドで特定のアクションを実行する
次のコードと同様に、スレッドの実行方法を指定しますが、現在のコードで呼び出されるスレッドでdoSomething()関数はまだ実行されます。
observable.just(dosomething()).subscribeon(schedulers.io()).observeon(schedulers.computation()).subscribe(v-> utils.printlnwiththread(v.toString(););
通常、次の方法を使用して目標を達成します。
Observable.create(s-> {s.onnext(dosomething());}).subscribeon(schedulers.io()).observeon(schedulers.computation()).subscribe(v-> {utils.printlnwiththread(v.tostring();});しかし、実際、Deferを使用することで同じ目標を達成することができます。
延期について
延期オペレーターは、Create、Just、From、および他の演算子と同じです。クラスオペレーターが作成されますが、このオペレーターに関連するすべてのデータは、サブスクライブする場合にのみ有効になります。
声明:
public static <T> Observable <T> Defer(FUNC0 <Observable <T >> ObservableFactory);
DeferのFUNC0で観察可能は、購読したときにのみ作成されます。
効果:
オブザーバーが購読するまで観察可能なものを作成しないでください。各サブスクリプションで新鮮な観測可能を作成します。
言い換えれば、登録時に観測可能は作成されます。
上記の問題はdeferで実装されています。
observable.defer(() - > obervable.just(dosomething())).subscribeon(schedulers.io()).observeon(schedulers.computation()).subscribe(v-> {utils.printlnwiththread(v.tostring();}); 4. Composeを使用してチェーン構造を破らないでください
よく次のコードが表示されます。
observable.just(dosomething()).subscribeon(schedulers.io()).observeon(schedulers.computation()).subscribe(v-> {utils.printlnwithththread(v.toString());上記のコードでは、 subscribeOn(xxx).observeOn(xxx)多くの場所で同じかもしれません。特定の場所に実装する予定がある場合は、次のように書くことができます。
private static <t> observable <t> applyschedulers(observable <t> observable){return observable.subscribeon(schedulers.io()).observeon(schedulers.computation()); }しかし、上記の方法を呼び出す必要があるたびに、それは大まかに次のようになり、最も外側はリンク構造を破るのと同等の関数です。
ApplySchedulers(Observable.From(Somesource).Map(new Func1 <data、data>(){@override public data call(data data){return manipulate(data);}}).subscribe(new Action1 <data>(){@Override public void call(data)Composeオペレーターは、リンク構造を破壊しないという目的を達成するために使用できます。
Composeの声明は次のとおりです。
Public Observable Compose(Transformer <?super t、?extends r>トランス);
その着信パラメーターはトランスインターフェイスであり、出力は観察可能です。実際、トランスはFunc1<Observable<T> 、 Observable<R>> 、つまり、あるタイプの観測可能なものを別のタイプの観測可能に変換できます。
簡単に言えば、Composeは、指定された変換方法(入力パラメータートランス)を介して、オリジナルの観測可能なオリオンを別の観測可能なものに変換できます。
Composeを使用して、次の方法を使用して、スレッドメソッドを指定します。
private static <t> transformer <t、t> applyschedulers(){return new Transformer <T、T>(){@Override public Observable <t> call(Observable <t> observable){return observable.subscribeon(schedulers.io()).observeon(schedulers.compotation(); }}; } observable.just(dosomething())。compose(applyschedulers()).subscribe(v-> {utils.printlnwiththread(v.toString());});この機能ApplySchedulersは、Lambda式を使用して以下にさらに簡素化できます。
private static <t> transformer <t、t> applyschedulers(){return observable-> observabable.subscribeon(schedulers.io()).observeon(schedulers.computation()); } 5.優先度に応じて、異なる実行結果を使用します
上記のタイトルは、おそらく私がはっきりと表現したかったシナリオを表現しなかったでしょう。実際、私が表現したいシナリオは、ネットワークデータを取得する通常のシナリオに似ています。キャッシュがある場合、キャッシュから取得され、NOがある場合、ネットワークから取得されます。
ここでは、キャッシュがある場合、ネットワークからデータを取得するアクションが実行されないことが必要です。
これは、concat+firstを使用して実装できます。
Concatは、いくつかの観測可能性を1つの観測可能なものにマージし、最終的な観測可能なものを返します。そして、これらのデータは、観察可能なものから送信されるようなものです。パラメーターは、複数の観測可能性またはobservalbeを含むイテレーターにすることができます。
新しい観測可能なデータは、元のconcatで観察可能な順序で配置されます。つまり、新しい結果のデータは元の順序でソートされます。
以下は、上記の要件の実装です。
Observable.concat(getDataFromCache()、getDataFromNetwork())。first().Subscribe(v-> System.out.println( "result:"+v)); //キャッシュからデータを取得するprivate static observable <string> getDataFromCache(){return observable.create(s-> {// dosomhing int dosomhing int data valuce = new random()。nextint(); value = value 2; s.oncompleted(); } //ネットワークからデータを取得するprivate static observable <string> getDataFromNetwork(){return observable.create(s-> {for(int i = 0; i <10; i ++){utils.println( "obs2 geneate"+i); s.onnext( "+i);"+i); }上記の実装では、GetDataFromCacheにデータがある場合、GetDataFromNetworkのコードは実行されませんが、これがまさに必要なものです。
注意が必要ないくつかの実装があります。
1.両方の場所からデータを取得できない可能性があります。このシナリオでは、Firstを使用すると、例外nosuchelementExceptionがスローされます。このシナリオの場合は、上記の最初のシナリオをFirstOrdeFaultに置き換える必要があります。
2。上記のgetDataFromCache()では、データがない場合は、直接接続されています。 oncompletedを呼び出してOnerrorを呼び出す場合、上記のconcatの使用は結果が得られません。 concatがエラーを受信すると、マージが停止するためです。したがって、OnErrorを使用する場合は、Concatdelayerrorを使用する必要がありますconcat.concatDelayError concatdelayerrorは、最初にエラーを無視し、最後の処理までエラーを延期します。
要約します
上記は、この記事のコンテンツ全体です。この記事の内容があなたの研究や仕事に役立つことを願っています。ご質問がある場合は、メッセージを残してコミュニケーションをとることができます。