一、推遲執行動作
可以使用timer+map方法實現.代碼如下:
Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{ return doSomething(); }).subscribe(System.out::println); }二、推遲發送執行的結果
這種場景要求產生數據的動作是馬上執行,但是結果推遲發送.這和上面場景的是不一樣的.
這種場景可以使用Observable.zip來實現.
zip操作符將多個Observable發射的數據按順序組合起來,每個數據只能組合一次,而且都是有序的。最終組合的數據的數量由發射數據最少的Observable來決定。
對於各個observable相同位置的數據,需要相互等待,也就說,第一個observable第一個位置的數據產生後,要等待第二個observable第一個位置的數據產生,等各個Observable相同位置的數據都產生後,才能按指定規則進行組合.這真是我們要利用的.
zip有很多種聲明,但大致上是一樣的,就是傳入幾個observable,然後指定一個規則,對每個observable對應位置的數據進行處理,產生一個新的數據, 下面是其中一個最簡單的:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
用zip實現推送發送執行結果如下:
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS) ,Observable.just(doSomething()), (x,y)->y) .subscribe(System.out::println));
三、使用defer在指定線程裡執行某種動作
如下面的代碼,雖然我們指定了線程的運行方式,但是doSomething()這個函數還是在當前代碼調用的線程中執行的.
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->Utils.printlnWithThread(v.toString()););
通常我們採用下面的方法達到目的:
Observable.create(s->{s.onNext(doSomething());}) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{ Utils.printlnWithThread(v.toString()); });但其實我們採用defer也能達到相同的目的.
關於defer
defer 操作符與create、just、from等操作符一樣,是創建類操作符,不過所有與該操作符相關的數據都是在訂閱是才生效的。
聲明:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
defer的Func0裡的Observable是在訂閱(subscribe)的時候才創建的.
作用:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
也就說observable是在訂閱的時候才創建的.
上面的問題用defer實現:
Observable.defer(()->Observable.just(doSomething())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString()); });四、使用compose不要打斷鍊式結構
我們經常看到下面的代碼:
Observable.just(doSomething()) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(v->{Utils.printlnWithThread(v.toString());上面的代碼中, subscribeOn(xxx).observeOn(xxx)可能在很多地方都是一樣的, 如果我們打算把它統一在某一個地方實現, 我們可以這麼寫:
private static <T> Observable<T> applySchedulers(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); }但是這樣每次我們需要調用上面的方法, 大致會像下面這樣,最外面是一個函數,等於打破了鏈接結構:
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() { @Override public Data call(Data data) { return manipulate(data); } }) ).subscribe(new Action1<Data>() { @Override public void call(Data data) { doSomething(data); } });可以使用compose操作符達到不打破鏈接結構的目的.
compose的申明如下:
public Observable compose(Transformer<? super T, ? extends R> transformer);
它的入參是一個Transformer接口,輸出是一個Observable. 而Transformer實際上就是一個Func1<Observable<T> , Observable<R>> ,換言之就是:可以通過它將一種類型的Observable轉換成另一種類型的Observable.
簡單的說,compose可以通過指定的轉化方式(輸入參數transformer),將原來的observable轉化為另外一種Observable.
通過compose, 採用下面方式指定線程方式:
private static <T> Transformer<T, T> applySchedulers() { return new Transformer<T, T>() { @Override public Observable<T> call(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); } }; } Observable.just(doSomething()).compose(applySchedulers()) .subscribe(v->{Utils.printlnWithThread(v.toString()); });函數applySchedulers可以使用lambda表達式進一步簡化為下面為:
private static <T> Transformer<T, T> applySchedulers() { return observable->observable.subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()); }五、按優先級使用不同的執行結果
上面這個標題估計沒表達清楚我想表達的場景. 其實我想表達的場景類似於平常的獲取網絡數據場景:如果緩存有,從緩存獲取,如果沒有,再從網絡獲取.
這裡要求,如果緩存有,不會做從網絡獲取數據的動作.
這個可以採用concat+first實現.
concat將幾個Observable合併成一個Observable,返回最終的一個Observable. 而那些數據就像從一個Observable發出來一樣. 參數可以是多個Observable,也可以是包含Observalbe的Iterator.
新的observable內的數據排列按原來concat裡的observable順序排列,即新結果內的數據是按原來的順序排序的.
下面是上述需求的實現:
Observable.concat(getDataFromCache(),getDataFromNetwork()).first() .subscribe(v->System.out.println("result:"+v)); //從緩存獲取數據private static Observable<String> getDataFromCache(){ return Observable.create(s -> { //dosomething to get data int value = new Random().nextInt(); value = value%2; if (value!=0){ s.onNext("data from cache:"+value); //產生數據} //s.onError(new Throwable("none")); s.onCompleted(); } ); } //從網絡獲取數據private static Observable<String> getDataFromNetwork(){ return Observable.create(s -> { for (int i = 0; i < 10; i++) { Utils.println("obs2 generate "+i); s.onNext("data from network:" + i); //產生數據} s.onCompleted(); } ); }上面的實現,如果getDataFromCache有數據, getDataFromNetwork這裡的代碼是不會執行的, 這正是我們想要的.
上面實現有幾個需要注意:
1、有可能從兩個地方都獲取不到數據, 這種場景下使用first會拋出異常NoSuchElementException,如果是這樣的場景,需要用firstOrDefault替換上面的first.
2、上面getDataFromCache()裡,如果沒有數據,我們直接調用onCompleted,如果不調用onCompleted,而是調用onError,則上述採用concat是得不到任何結果的.因為concat在收到任何一個error,合併就會停止.所以,如果要用onError, 則需要用concatDelayError替代concat.concatDelayError會先忽略error,將error推遲到最後在處理.
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。